diff --git a/build.sbt b/build.sbt index f649ca0..79117d0 100644 --- a/build.sbt +++ b/build.sbt @@ -7,7 +7,7 @@ scalaVersion := "2.11.12" val scalaUtilsVersion = "0.2.0" -val sparkVersion = "2.3.2" +val sparkVersion = "2.4.3" val sparkXmlVersion = "0.4.1" val sparkAvroVersion = "4.0.0" @@ -23,12 +23,14 @@ lazy val providedDependencies = Seq( "org.apache.spark" %% "spark-sql" % sparkVersion force(), "org.apache.spark" %% "spark-mllib" % sparkVersion force(), "org.apache.spark" %% "spark-streaming" % sparkVersion force(), - "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion + "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion, + "org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion ) libraryDependencies ++= providedDependencies.map(_ % "provided") -lazy val excludeJars = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4") +// Jackson dependencies over Spark and Kafka Versions can be tricky; for Spark 2.4.x we need this override +dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7" libraryDependencies ++= Seq( "org.tupol" %% "scala-utils" % scalaUtilsVersion, @@ -36,8 +38,7 @@ libraryDependencies ++= Seq( "org.scalatest" %% "scalatest" % "3.0.5" % "test", "org.scalacheck" %% "scalacheck" % "1.14.0" % "test", "com.h2database" % "h2" % "1.4.197" % "test", - "net.manub" %% "scalatest-embedded-kafka" % "0.14.0" % "test" excludeAll(excludeJars), - "org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "test" excludeAll(excludeJars), + "net.manub" %% "scalatest-embedded-kafka" % "2.0.0" % "test", "org.apache.spark" %% "spark-avro" % "2.4.0" % "test", "com.databricks" %% "spark-xml" % sparkXmlVersion % "test", "com.databricks" %% "spark-avro" % sparkAvroVersion % "test" diff --git a/src/main/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSink.scala b/src/main/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSink.scala index 34a7129..4d400cd 100644 --- a/src/main/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSink.scala +++ b/src/main/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSink.scala @@ -48,7 +48,7 @@ case class FileStreamDataSinkConfiguration private ( format: FormatType, private val path: String, private val genericConfig: GenericStreamDataSinkConfiguration, - private val checkpointLocation: Option[String] = None) + private val checkpointLocation: Option[String]) extends FormatAwareStreamingSinkConfiguration { private val options = genericConfig.options ++ Map( @@ -63,6 +63,12 @@ object FileStreamDataSinkConfiguration extends Configurator[FileStreamDataSinkCo import org.tupol.utils.config._ import scalaz.syntax.applicative._ + def apply( + path: String, + genericConfig: GenericStreamDataSinkConfiguration, + checkpointLocation: Option[String] = None): FileStreamDataSinkConfiguration = + new FileStreamDataSinkConfiguration(genericConfig.format, path, genericConfig, checkpointLocation) + def validationNel(config: Config): ValidationNel[Throwable, FileStreamDataSinkConfiguration] = { val format = config.extract[FormatType]("format").ensure( diff --git a/src/main/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSource.scala b/src/main/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSource.scala index 43d4b1a..8264b00 100644 --- a/src/main/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSource.scala +++ b/src/main/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSource.scala @@ -78,11 +78,6 @@ case class FileStreamDataSource(configuration: FileStreamDataSourceConfiguration } } -/** - * Basic configuration for the `FileDataSource` - * @param path - * @param sourceConfiguration - */ case class FileStreamDataSourceConfiguration(path: String, sourceConfiguration: SourceConfiguration) extends FormatAwareStreamingSourceConfiguration { /** Get the format type of the input file. */ diff --git a/src/main/scala/org/tupol/spark/io/streaming/structured/GenericStreamDataSink.scala b/src/main/scala/org/tupol/spark/io/streaming/structured/GenericStreamDataSink.scala index cbc5ea9..41eef00 100644 --- a/src/main/scala/org/tupol/spark/io/streaming/structured/GenericStreamDataSink.scala +++ b/src/main/scala/org/tupol/spark/io/streaming/structured/GenericStreamDataSink.scala @@ -64,22 +64,27 @@ case class GenericStreamDataSink(configuration: GenericStreamDataSinkConfigurati } } -/** FileDataSink trait that is data aware, so it can perform a write call with no arguments */ +/** GenericStreamDataAwareSink is "data aware", so it can perform a write call with no arguments */ case class GenericStreamDataAwareSink(configuration: GenericStreamDataSinkConfiguration, data: DataFrame) extends DataAwareSink[GenericStreamDataSinkConfiguration, StreamingQuery] { override def sink: DataSink[GenericStreamDataSinkConfiguration, StreamingQuery] = GenericStreamDataSink(configuration) } -case class GenericStreamDataSinkConfiguration(format: FormatType, options: Map[String, String], +case class GenericStreamDataSinkConfiguration(format: FormatType, options: Map[String, String] = Map(), queryName: Option[String] = None, trigger: Option[Trigger] = None, partitionColumns: Seq[String] = Seq(), outputMode: Option[String] = None) extends FormatAwareStreamingSinkConfiguration { override def toString: String = { val optionsStr = if (options.isEmpty) "" else options.map { case (k, v) => s"$k: '$v'" }.mkString(" ", ", ", " ") - s"format: '$format', options: {$optionsStr}, query name: ${queryName.getOrElse("not specified")}, " + + s"format: '$format', " + + s"partition columns: [${partitionColumns.mkString(", ")}], " + + s"options: {$optionsStr}, " + + s"query name: ${queryName.getOrElse("not specified")}, " + + s"output mode: ${outputMode.getOrElse("not specified")}, " + s"trigger: ${trigger.getOrElse("not specified")}" } } + object GenericStreamDataSinkConfiguration extends Configurator[GenericStreamDataSinkConfiguration] { import com.typesafe.config.Config import org.tupol.utils.config._ diff --git a/src/main/scala/org/tupol/spark/io/streaming/structured/GenericStreamDataSource.scala b/src/main/scala/org/tupol/spark/io/streaming/structured/GenericStreamDataSource.scala index 2ca8e25..0e0a824 100644 --- a/src/main/scala/org/tupol/spark/io/streaming/structured/GenericStreamDataSource.scala +++ b/src/main/scala/org/tupol/spark/io/streaming/structured/GenericStreamDataSource.scala @@ -62,7 +62,7 @@ case class GenericStreamDataSource(configuration: GenericStreamDataSourceConfigu } } -case class GenericStreamDataSourceConfiguration(format: FormatType, options: Map[String, String], +case class GenericStreamDataSourceConfiguration(format: FormatType, options: Map[String, String] = Map(), schema: Option[StructType] = None) extends FormatAwareStreamingSourceConfiguration { override def toString: String = { val optionsStr = if (options.isEmpty) "" else options.map { case (k, v) => s"$k: '$v'" }.mkString(" ", ", ", " ") diff --git a/src/main/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSink.scala b/src/main/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSink.scala index d0bfe67..7687eb4 100644 --- a/src/main/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSink.scala +++ b/src/main/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSink.scala @@ -45,7 +45,7 @@ case class KafkaStreamDataAwareSink(configuration: KafkaStreamDataSinkConfigurat override def sink: DataSink[KafkaStreamDataSinkConfiguration, StreamingQuery] = KafkaStreamDataSink(configuration) } -case class KafkaStreamDataSinkConfiguration private ( +case class KafkaStreamDataSinkConfiguration( private val kafkaBootstrapServers: String, private val genericConfig: GenericStreamDataSinkConfiguration, private val topic: Option[String] = None, diff --git a/src/main/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSource.scala b/src/main/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSource.scala index a8f0732..694051e 100644 --- a/src/main/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSource.scala +++ b/src/main/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSource.scala @@ -24,21 +24,14 @@ SOFTWARE. package org.tupol.spark.io.streaming.structured import com.typesafe.config.Config -import org.apache.spark.sql.streaming.DataStreamReader -import org.apache.spark.sql.types.{ StringType, StructType } +import org.apache.spark.sql.types.StructType import org.apache.spark.sql.{ DataFrame, SparkSession } import org.tupol.spark.Logging import org.tupol.spark.io.FormatType._ -import org.tupol.spark.io.sources.{ ColumnNameOfCorruptRecord, SourceConfiguration } -import org.tupol.spark.io.{ DataSource, DataSourceException, FormatAwareDataSourceConfiguration, FormatType } -import org.tupol.utils._ -import org.tupol.spark.io._ +import org.tupol.spark.io.{ DataSource, FormatType, _ } import org.tupol.utils.config.Configurator -import scalaz.Alpha.S import scalaz.{ NonEmptyList, ValidationNel } -import scala.util.{ Failure, Success, Try } - case class KafkaStreamDataSource(configuration: KafkaStreamDataSourceConfiguration) extends DataSource[KafkaStreamDataSourceConfiguration] with Logging { val genericConfiguration = GenericStreamDataSourceConfiguration(configuration.format, configuration.options, configuration.schema) @@ -48,8 +41,6 @@ case class KafkaStreamDataSource(configuration: KafkaStreamDataSourceConfigurati /** * Basic configuration for the `KafkaDataSource` - * @param path - * @param sourceConfiguration */ case class KafkaStreamDataSourceConfiguration( kafkaBootstrapServers: String, diff --git a/src/main/scala/org/tupol/spark/io/streaming/structured/package.scala b/src/main/scala/org/tupol/spark/io/streaming/structured/package.scala index f6e1228..c9fbb19 100644 --- a/src/main/scala/org/tupol/spark/io/streaming/structured/package.scala +++ b/src/main/scala/org/tupol/spark/io/streaming/structured/package.scala @@ -70,6 +70,12 @@ package object structured { } } + /** + * A Kafka subscription is defined by it's type (e.g. subscribe or subscribe patterns) and the + * subscription itself (e.g. the actual topic name that this subscription is defining). + * @param subscriptionType the type of the subscription (e.g. assign, subscribe or subscribe patterns) + * @param subscription the topic name to subscribe to + */ case class KafkaSubscription(subscriptionType: String, subscription: String) implicit val KafkaSubscriptionExtractor = new Extractor[KafkaSubscription] { val AcceptableValues = Seq("assign", "subscribe", "subscribePattern") diff --git a/src/test/scala/org/tupol/spark/SharedSparkSession.scala b/src/test/scala/org/tupol/spark/SharedSparkSession.scala index 346d057..7f2b295 100644 --- a/src/test/scala/org/tupol/spark/SharedSparkSession.scala +++ b/src/test/scala/org/tupol/spark/SharedSparkSession.scala @@ -1,5 +1,9 @@ package org.tupol.spark +import java.io.File +import java.util.UUID + +import org.apache.commons.io.FileUtils import org.apache.spark.sql.{ SQLContext, SparkSession } import org.apache.spark.{ SparkConf, SparkContext } import org.scalatest.{ BeforeAndAfterAll, Suite } @@ -15,6 +19,8 @@ trait SharedSparkSession extends BeforeAndAfterAll { def master = "local[2]" def appName = this.getClass.getSimpleName + val TempDir = Option(System.getProperty("java.io.tmpdir")).getOrElse("/tmp") + @transient private var _spark: SparkSession = _ implicit lazy val spark: SparkSession = _spark @@ -23,7 +29,11 @@ trait SharedSparkSession extends BeforeAndAfterAll { implicit lazy val sqlContext: SQLContext = spark.sqlContext - def sparkConfig: Map[String, String] = Map("spark.driver.host" -> "localhost") //See https://issues.apache.org/jira/browse/SPARK-19394 + private var _sparkConfig: Map[String, String] = _ + + def sparkConfig: Map[String, String] = Map( + "spark.driver.host" -> "localhost" //See https://issues.apache.org/jira/browse/SPARK-19394 + ) def createSparkSession(conf: SparkConf): SparkSession = SparkSession.builder.config(conf).getOrCreate() @@ -39,6 +49,9 @@ trait SharedSparkSession extends BeforeAndAfterAll { .setAppName(appName) sparkConfig.foreach { case (k, v) => conf.setIfMissing(k, v) } + def warehouseDirName = s"$TempDir/spark-warehouse-${UUID.randomUUID().toString}.test.temp" + def warehouseDirPath = new java.io.File(warehouseDirName).getAbsolutePath + conf.set("spark.sql.warehouse.dir", warehouseDirPath) _spark = createSparkSession(conf) @@ -50,6 +63,8 @@ trait SharedSparkSession extends BeforeAndAfterAll { Try(_spark.close()) _spark = null } + sparkConfig.get("spark.sql.warehouse.dir") + .map(name => FileUtils.deleteQuietly(new File(name))) } finally { super.afterAll() System.clearProperty("spark.driver.port") diff --git a/src/test/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSinkSpec.scala b/src/test/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSinkSpec.scala index 01294b1..9f3fe34 100644 --- a/src/test/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSinkSpec.scala +++ b/src/test/scala/org/tupol/spark/io/streaming/structured/FileStreamDataSinkSpec.scala @@ -39,8 +39,8 @@ class FileStreamDataSinkSpec extends FunSuite with Matchers with Eventually with val steamingQuery = Try(data.streamingSink(sinkConfig).write) steamingQuery shouldBe a[Success[_]] + val sourceData = spark.createDataFrame(TestData) eventually { - val sourceData = spark.createDataFrame(TestData) val writtenData: DataFrame = spark.read.json(testPath1) writtenData.comapreWith(sourceData).areEqual(false) shouldBe true } diff --git a/src/test/scala/org/tupol/spark/io/streaming/structured/FormatAwareStreamingSinkConfigurationSpec.scala b/src/test/scala/org/tupol/spark/io/streaming/structured/FormatAwareStreamingSinkConfigurationSpec.scala index 6b7af96..0817f7f 100644 --- a/src/test/scala/org/tupol/spark/io/streaming/structured/FormatAwareStreamingSinkConfigurationSpec.scala +++ b/src/test/scala/org/tupol/spark/io/streaming/structured/FormatAwareStreamingSinkConfigurationSpec.scala @@ -22,7 +22,6 @@ class FormatAwareStreamingSinkConfigurationSpec extends FunSuite with Matchers w val config = ConfigFactory.parseString(configStr) val expected = FileStreamDataSinkConfiguration( - Text, path = "INPUT_PATH", genericConfig = GenericStreamDataSinkConfiguration(Text, Map())) val result = config.extract[FormatAwareStreamingSinkConfiguration]("input") @@ -41,7 +40,6 @@ class FormatAwareStreamingSinkConfigurationSpec extends FunSuite with Matchers w val config = ConfigFactory.parseString(configStr) val expected = FileStreamDataSinkConfiguration( - Json, path = "INPUT_PATH", genericConfig = GenericStreamDataSinkConfiguration(Json, Map())) val result = config.extract[FormatAwareStreamingSinkConfiguration]("input") diff --git a/src/test/scala/org/tupol/spark/io/streaming/structured/GenericKafkaStreamDataSourceSpec.scala b/src/test/scala/org/tupol/spark/io/streaming/structured/GenericKafkaStreamDataSourceSpec.scala index 7e25c2c..098b8bd 100644 --- a/src/test/scala/org/tupol/spark/io/streaming/structured/GenericKafkaStreamDataSourceSpec.scala +++ b/src/test/scala/org/tupol/spark/io/streaming/structured/GenericKafkaStreamDataSourceSpec.scala @@ -57,9 +57,9 @@ class GenericKafkaStreamDataSourceSpec extends FunSuite test("Fail gracefully") { val TestOptions = Map( - "kafka.bootstrap.servers" -> s"unknown_host:0000000", - "subscribe" -> topic, - "startingOffsets" -> "earliest") + "kafka.bootstrap.servers" -> s"unknown_host_garbage_string:0000000", + "NO-LEGAL-SUBSCRIPTION-TYPE" -> topic, + "NO-STARTING-OFFSETS" -> "garbage") val inputConfig = GenericStreamDataSourceConfiguration(FormatType.Kafka, TestOptions, None) an[Exception] shouldBe thrownBy(spark.source(inputConfig).read) } diff --git a/src/test/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSourceSpec.scala b/src/test/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSourceSpec.scala index cdc3f5b..72fdbb3 100644 --- a/src/test/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSourceSpec.scala +++ b/src/test/scala/org/tupol/spark/io/streaming/structured/KafkaStreamDataSourceSpec.scala @@ -4,7 +4,7 @@ import net.manub.embeddedkafka.{ EmbeddedKafka, EmbeddedKafkaConfig } import org.apache.spark.sql.streaming.Trigger import org.scalatest._ import org.scalatest.concurrent.Eventually -import org.scalatest.time.{ Millis, Span } +import org.scalatest.time.{ Seconds, Span } import org.tupol.spark.SharedSparkSession import org.tupol.spark.implicits._ @@ -12,7 +12,7 @@ class KafkaStreamDataSourceSpec extends FunSuite with Matchers with GivenWhenThen with Eventually with BeforeAndAfter with SharedSparkSession with EmbeddedKafka { - implicit override val patienceConfig = PatienceConfig(timeout = scaled(Span(10000, Millis))) + implicit override val patienceConfig = PatienceConfig(timeout = scaled(Span(10, Seconds))) implicit val config = EmbeddedKafkaConfig() val topic = "testTopic" @@ -48,10 +48,11 @@ class KafkaStreamDataSourceSpec extends FunSuite streamingQuery.stop } } + test("Fail gracefully") { val inputConfig = KafkaStreamDataSourceConfiguration( "unknown_host:0000000", - KafkaSubscription("subscribe", topic), Some("earliest")) + KafkaSubscription("ILLEGAL-SUBSCRIPTION-TYPE", "UNKNOWN-TOPIC"), Some("earliest")) an[Exception] shouldBe thrownBy(spark.source(inputConfig).read) }