Skip to content

Commit

Permalink
Duplicate code for Spark2.
Browse files Browse the repository at this point in the history
  • Loading branch information
angelcervera committed Oct 21, 2020
1 parent 5a8feea commit d843d73
Show file tree
Hide file tree
Showing 15 changed files with 930 additions and 5 deletions.
52 changes: 47 additions & 5 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,8 @@ lazy val commonIOVersion = "2.5"
lazy val logbackVersion = "1.1.7"
lazy val scoptVersion = "3.7.1"
lazy val akkaVersion = "2.5.31"
lazy val sparkVersion = "3.0.1"
lazy val spark3Version = "3.0.1"
lazy val spark2Version = "2.4.7"

// Releases versions
lazy val scala213 = "2.13.3"
Expand Down Expand Up @@ -86,6 +87,8 @@ lazy val root = (project in file("."))
.disablePlugins(AssemblyPlugin)
.aggregate(
core,
spark2,
spark2FatShaded,
spark3,
spark3FatShaded,
commonUtilities,
Expand Down Expand Up @@ -133,6 +136,45 @@ lazy val core = Project(id = "core", base = file("core"))
)
)

lazy val spark2 = Project(id = "spark2", base = file("spark2"))
.enablePlugins(AssemblyPlugin)
.settings(
commonSettings,
crossScalaVersions := Seq(scala212),
enablingPublishingSettings,
coverageConfig,
name := "osm4scala-spark3",
description := "Spark 2 connector for OpenStreetMap Pbf parser.",
bintrayPackage := "osm4scala-spark2",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % spark2Version % Provided
),
assemblyOption in assembly := (assemblyOption in assembly).value.copy(
includeScala = false,
cacheUnzip = false,
cacheOutput = false
),
assemblyShadeRules in assembly := Seq(
ShadeRule
.rename("com.google.protobuf.**" -> "shadeproto.@1")
.inAll
)
)
.dependsOn(core)

lazy val spark2FatShaded = Project(id = "osm4scala-spark2-shaded", base = file("osm4scala-spark2-shaded"))
.disablePlugins(AssemblyPlugin)
.settings(
commonSettings,
crossScalaVersions := Seq(scala212),
enablingPublishingSettings,
disablingCoverage,
name := "osm4scala-spark2-shaded",
description := "Spark 2 connector for OpenStreetMap Pbf parser as shaded fat jar.",
bintrayPackage := "osm4scala-spark2-shaded",
packageBin in Compile := (assembly in (spark2, Compile)).value
)

lazy val spark3 = Project(id = "spark3", base = file("spark3"))
.enablePlugins(AssemblyPlugin)
.settings(
Expand All @@ -141,10 +183,10 @@ lazy val spark3 = Project(id = "spark3", base = file("spark3"))
enablingPublishingSettings,
coverageConfig,
name := "osm4scala-spark3",
description := "Spark 3 connector for OpenStreetMap Pbf 2 parser.",
description := "Spark 3 connector for OpenStreetMap Pbf parser.",
bintrayPackage := "osm4scala-spark3",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided
"org.apache.spark" %% "spark-sql" % spark3Version % Provided
),
assemblyOption in assembly := (assemblyOption in assembly).value.copy(
includeScala = false,
Expand All @@ -167,7 +209,7 @@ lazy val spark3FatShaded = Project(id = "osm4scala-spark3-shaded", base = file("
enablingPublishingSettings,
disablingCoverage,
name := "osm4scala-spark3-shaded",
description := "Spark 3 connector for OpenStreetMap Pbf 2 parser as shaded fat jar.",
description := "Spark 3 connector for OpenStreetMap Pbf parser as shaded fat jar.",
bintrayPackage := "osm4scala-spark3-shaded",
packageBin in Compile := (assembly in (spark3, Compile)).value
)
Expand Down Expand Up @@ -260,7 +302,7 @@ lazy val exampleSparkUtilities = Project(id = "examples-spark-utilities", base =
name := "osm4scala-examples-spark-utilities",
description := "Example of different utilities using osm4scala and Spark.",
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided
"org.apache.spark" %% "spark-sql" % spark3Version % Provided
)
)
.dependsOn(spark3, commonUtilities)
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
com.acervera.osm4scala.spark.OsmPbfFormat
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2020 Ángel Cervera Claudio
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*/

package com.acervera.osm4scala.spark

import java.io.{FilterInputStream, InputStream}

import com.acervera.osm4scala.InputStreamSentinel

/**
* Keep in mind that this is not a thread-safe implementation.
*
* @param in InputStream to keep count.
* @param lengthLimit Limit of bytes to read.
*/
class InputStreamLengthLimit(in: InputStream, lengthLimit: Long)
extends FilterInputStream(in: InputStream) with InputStreamSentinel {

private var counter = 0L

override def continueReading(): Boolean = lengthLimit > counter

override def read(): Int = {
val result = super.read()
if (result != -1) counter += 1
result
}

override def read(b: Array[Byte], off: Int, len: Int): Int = {
val result = super.read(b, off, len)
if (result != -1) counter += result
result
}

override def skip(n: Long): Long = {
val skipped = super.skip(n)
counter += skipped
skipped
}

override def mark(readlimit: Int): Unit = throw new UnsupportedOperationException(
"mark operation is not supported when is wrapped."
)

override def reset(): Unit = throw new UnsupportedOperationException(
"mark operation is not supported when is wrapped."
)

override def markSupported(): Boolean = false

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2020 Ángel Cervera Claudio
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*/

package com.acervera.osm4scala.spark

import java.io.InputStream

import scala.annotation.tailrec

object OSMDataFinder {
private val pattern = Array[Byte](0x0A, 0x07, 0x4F, 0x53, 0x4D, 0x44, 0x61, 0x74, 0x61)
private val blobHeaderLengthSize = 4

implicit class InputStreamEnricher(in: InputStream) {

/**
* Search the first block. Neive search for this first PoC.
* If it work, let's try with KMP Algorithm
*
*/
def firstBlockIndex(): Option[Long] = {

@tailrec
def rec(idx: Long, current: Array[Byte]): Option[Long] =
if (current.sameElements(pattern) && !isFalsePositive()) {
Some(idx)
} else {
in.read() match {
case -1 => None
case i => rec(idx + 1, current.drop(1) :+ i.byteValue())
}
}

/**
* Check that It's OSMData string and a true block as well.
* @return
*/
def isFalsePositive(): Boolean = false // TODO: Need implementation.

/**
* Read next n bytes from the stream. If are not enough, return None.
* @param length Length to read.
* @return Array of bytes with the content, or None if no enough data to fill the buffer.
*/
def readNBytes(length: Int): Option[Array[Byte]] = {
val buffer = Array.fill[Byte](length)(0)
val read = in.read(buffer)
if (read < length) None else Some(buffer)
}

readNBytes(blobHeaderLengthSize) // Ignore length header.
.flatMap(_ => readNBytes(pattern.length)) // Take the first possible BlockHeader
.flatMap(firstPossibleBlock => rec(0, firstPossibleBlock))

}

}
}
102 changes: 102 additions & 0 deletions spark2/src/main/scala/com/acervera/osm4scala/spark/OsmPbfFormat.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,102 @@
/*
* The MIT License (MIT)
*
* Copyright (c) 2020 Ángel Cervera Claudio
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*
*/

package com.acervera.osm4scala.spark

import java.net.URI

import com.acervera.osm4scala.EntityIterator
import com.acervera.osm4scala.spark.OSMDataFinder._
import com.acervera.osm4scala.spark.OsmPbfRowIterator._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FSDataInputStream, FileStatus, Path}
import org.apache.hadoop.mapreduce.Job
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.execution.datasources.{FileFormat, OutputWriterFactory, PartitionedFile}
import org.apache.spark.sql.sources.{DataSourceRegister, Filter}
import org.apache.spark.sql.types._
import org.apache.spark.util.SerializableConfiguration

class OsmPbfFormat extends FileFormat with DataSourceRegister with Logging {

override def shortName(): String = "osm.pbf"

override def inferSchema(sparkSession: SparkSession,
options: Map[String, String],
files: Seq[FileStatus]): Option[StructType] = Some(OsmSqlEntity.schema)

override def prepareWrite(sparkSession: SparkSession,
job: Job,
options: Map[String, String],
dataSchema: StructType): OutputWriterFactory =
throw new UnsupportedOperationException(
s"write is not supported for spark-osm-pbf files. If you need it, please create a issue and try to support the project."
)

override def isSplitable(sparkSession: SparkSession, options: Map[String, String], path: Path): Boolean = true

override protected def buildReader(sparkSession: SparkSession,
dataSchema: StructType,
partitionSchema: StructType,
requiredSchema: StructType,
filters: Seq[Filter],
options: Map[String, String],
hadoopConf: Configuration): PartitionedFile => Iterator[InternalRow] = {

// TODO: OsmSqlEntity.validateSchema(requiredSchema)

val broadcastedHadoopConf = sparkSession.sparkContext.broadcast(new SerializableConfiguration(hadoopConf))

(file: PartitionedFile) =>
{
val path = new Path(new URI(file.filePath))
val fs = path.getFileSystem(broadcastedHadoopConf.value.value)
val status = fs.getFileStatus(path)

def firstBlockOffset(): Option[Long] = {
var pbfIS: FSDataInputStream = null
try {
pbfIS = fs.open(status.getPath)
pbfIS.seek(file.start)
pbfIS.firstBlockIndex()
} finally {
if (pbfIS != null) pbfIS.close()
}
}

firstBlockOffset() match {
case None => Iterator.empty
case Some(offset) =>
val atFirstBlock = fs.open(status.getPath)
atFirstBlock.seek(file.start + offset)
EntityIterator.fromPbf(new InputStreamLengthLimit(atFirstBlock, file.length - offset)).toOsmPbfRowIterator(requiredSchema)
}

}
}

}

0 comments on commit d843d73

Please sign in to comment.