Skip to content
Permalink
Browse files

Small Improvements

- `build.sbt` cleanup
- added `spark.sql.warehouse.dir` to `SharedSparkSession`
- small documentation improvements and reformatting
  • Loading branch information
tupol committed Aug 22, 2019
1 parent f436540 commit 9b3d606b977896707446b5f3b81e1c9233bc72cf
@@ -7,7 +7,7 @@ scalaVersion := "2.11.12"

val scalaUtilsVersion = "0.2.0"

val sparkVersion = "2.3.2"
val sparkVersion = "2.4.3"
val sparkXmlVersion = "0.4.1"
val sparkAvroVersion = "4.0.0"

@@ -23,21 +23,22 @@ lazy val providedDependencies = Seq(
"org.apache.spark" %% "spark-sql" % sparkVersion force(),
"org.apache.spark" %% "spark-mllib" % sparkVersion force(),
"org.apache.spark" %% "spark-streaming" % sparkVersion force(),
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion,
"org.apache.spark" %% "spark-streaming-kafka-0-10" % sparkVersion
)

libraryDependencies ++= providedDependencies.map(_ % "provided")

lazy val excludeJars = ExclusionRule(organization = "net.jpountz.lz4", name = "lz4")
// Jackson dependencies over Spark and Kafka Versions can be tricky; for Spark 2.4.x we need this override
dependencyOverrides += "com.fasterxml.jackson.core" % "jackson-databind" % "2.6.7"

libraryDependencies ++= Seq(
"org.tupol" %% "scala-utils" % scalaUtilsVersion,
"com.h2database" % "h2" % "1.4.197" % "test",
"org.scalatest" %% "scalatest" % "3.0.5" % "test",
"org.scalacheck" %% "scalacheck" % "1.14.0" % "test",
"com.h2database" % "h2" % "1.4.197" % "test",
"net.manub" %% "scalatest-embedded-kafka" % "0.14.0" % "test" excludeAll(excludeJars),
"org.apache.spark" %% "spark-sql-kafka-0-10" % sparkVersion % "test" excludeAll(excludeJars),
"net.manub" %% "scalatest-embedded-kafka" % "2.0.0" % "test",
"org.apache.spark" %% "spark-avro" % "2.4.0" % "test",
"com.databricks" %% "spark-xml" % sparkXmlVersion % "test",
"com.databricks" %% "spark-avro" % sparkAvroVersion % "test"
@@ -48,7 +48,7 @@ case class FileStreamDataSinkConfiguration private (
format: FormatType,
private val path: String,
private val genericConfig: GenericStreamDataSinkConfiguration,
private val checkpointLocation: Option[String] = None)
private val checkpointLocation: Option[String])
extends FormatAwareStreamingSinkConfiguration {
private val options = genericConfig.options ++
Map(
@@ -63,6 +63,12 @@ object FileStreamDataSinkConfiguration extends Configurator[FileStreamDataSinkCo
import org.tupol.utils.config._
import scalaz.syntax.applicative._

def apply(
path: String,
genericConfig: GenericStreamDataSinkConfiguration,
checkpointLocation: Option[String] = None): FileStreamDataSinkConfiguration =
new FileStreamDataSinkConfiguration(genericConfig.format, path, genericConfig, checkpointLocation)

def validationNel(config: Config): ValidationNel[Throwable, FileStreamDataSinkConfiguration] = {

val format = config.extract[FormatType]("format").ensure(
@@ -78,11 +78,6 @@ case class FileStreamDataSource(configuration: FileStreamDataSourceConfiguration
}
}

/**
* Basic configuration for the `FileDataSource`
* @param path
* @param sourceConfiguration
*/
case class FileStreamDataSourceConfiguration(path: String, sourceConfiguration: SourceConfiguration)
extends FormatAwareStreamingSourceConfiguration {
/** Get the format type of the input file. */
@@ -64,22 +64,27 @@ case class GenericStreamDataSink(configuration: GenericStreamDataSinkConfigurati
}
}

/** FileDataSink trait that is data aware, so it can perform a write call with no arguments */
/** GenericStreamDataAwareSink is "data aware", so it can perform a write call with no arguments */
case class GenericStreamDataAwareSink(configuration: GenericStreamDataSinkConfiguration, data: DataFrame)
extends DataAwareSink[GenericStreamDataSinkConfiguration, StreamingQuery] {
override def sink: DataSink[GenericStreamDataSinkConfiguration, StreamingQuery] = GenericStreamDataSink(configuration)
}

case class GenericStreamDataSinkConfiguration(format: FormatType, options: Map[String, String],
case class GenericStreamDataSinkConfiguration(format: FormatType, options: Map[String, String] = Map(),
queryName: Option[String] = None, trigger: Option[Trigger] = None,
partitionColumns: Seq[String] = Seq(), outputMode: Option[String] = None)
extends FormatAwareStreamingSinkConfiguration {
override def toString: String = {
val optionsStr = if (options.isEmpty) "" else options.map { case (k, v) => s"$k: '$v'" }.mkString(" ", ", ", " ")
s"format: '$format', options: {$optionsStr}, query name: ${queryName.getOrElse("not specified")}, " +
s"format: '$format', " +
s"partition columns: [${partitionColumns.mkString(", ")}], " +
s"options: {$optionsStr}, " +
s"query name: ${queryName.getOrElse("not specified")}, " +
s"output mode: ${outputMode.getOrElse("not specified")}, " +
s"trigger: ${trigger.getOrElse("not specified")}"
}
}

object GenericStreamDataSinkConfiguration extends Configurator[GenericStreamDataSinkConfiguration] {
import com.typesafe.config.Config
import org.tupol.utils.config._
@@ -62,7 +62,7 @@ case class GenericStreamDataSource(configuration: GenericStreamDataSourceConfigu
}
}

case class GenericStreamDataSourceConfiguration(format: FormatType, options: Map[String, String],
case class GenericStreamDataSourceConfiguration(format: FormatType, options: Map[String, String] = Map(),
schema: Option[StructType] = None) extends FormatAwareStreamingSourceConfiguration {
override def toString: String = {
val optionsStr = if (options.isEmpty) "" else options.map { case (k, v) => s"$k: '$v'" }.mkString(" ", ", ", " ")
@@ -45,7 +45,7 @@ case class KafkaStreamDataAwareSink(configuration: KafkaStreamDataSinkConfigurat
override def sink: DataSink[KafkaStreamDataSinkConfiguration, StreamingQuery] = KafkaStreamDataSink(configuration)
}

case class KafkaStreamDataSinkConfiguration private (
case class KafkaStreamDataSinkConfiguration(
private val kafkaBootstrapServers: String,
private val genericConfig: GenericStreamDataSinkConfiguration,
private val topic: Option[String] = None,
@@ -24,21 +24,14 @@ SOFTWARE.
package org.tupol.spark.io.streaming.structured

import com.typesafe.config.Config
import org.apache.spark.sql.streaming.DataStreamReader
import org.apache.spark.sql.types.{ StringType, StructType }
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.{ DataFrame, SparkSession }
import org.tupol.spark.Logging
import org.tupol.spark.io.FormatType._
import org.tupol.spark.io.sources.{ ColumnNameOfCorruptRecord, SourceConfiguration }
import org.tupol.spark.io.{ DataSource, DataSourceException, FormatAwareDataSourceConfiguration, FormatType }
import org.tupol.utils._
import org.tupol.spark.io._
import org.tupol.spark.io.{ DataSource, FormatType, _ }
import org.tupol.utils.config.Configurator
import scalaz.Alpha.S
import scalaz.{ NonEmptyList, ValidationNel }

import scala.util.{ Failure, Success, Try }

case class KafkaStreamDataSource(configuration: KafkaStreamDataSourceConfiguration) extends DataSource[KafkaStreamDataSourceConfiguration] with Logging {

val genericConfiguration = GenericStreamDataSourceConfiguration(configuration.format, configuration.options, configuration.schema)
@@ -48,8 +41,6 @@ case class KafkaStreamDataSource(configuration: KafkaStreamDataSourceConfigurati

/**
* Basic configuration for the `KafkaDataSource`
* @param path
* @param sourceConfiguration
*/
case class KafkaStreamDataSourceConfiguration(
kafkaBootstrapServers: String,
@@ -70,6 +70,12 @@ package object structured {
}
}

/**
* A Kafka subscription is defined by it's type (e.g. subscribe or subscribe patterns) and the
* subscription itself (e.g. the actual topic name that this subscription is defining).
* @param subscriptionType the type of the subscription (e.g. assign, subscribe or subscribe patterns)
* @param subscription the topic name to subscribe to
*/
case class KafkaSubscription(subscriptionType: String, subscription: String)
implicit val KafkaSubscriptionExtractor = new Extractor[KafkaSubscription] {
val AcceptableValues = Seq("assign", "subscribe", "subscribePattern")
@@ -1,5 +1,9 @@
package org.tupol.spark

import java.io.File
import java.util.UUID

import org.apache.commons.io.FileUtils
import org.apache.spark.sql.{ SQLContext, SparkSession }
import org.apache.spark.{ SparkConf, SparkContext }
import org.scalatest.{ BeforeAndAfterAll, Suite }
@@ -15,6 +19,8 @@ trait SharedSparkSession extends BeforeAndAfterAll {
def master = "local[2]"
def appName = this.getClass.getSimpleName

val TempDir = Option(System.getProperty("java.io.tmpdir")).getOrElse("/tmp")

@transient private var _spark: SparkSession = _

implicit lazy val spark: SparkSession = _spark
@@ -23,7 +29,11 @@ trait SharedSparkSession extends BeforeAndAfterAll {

implicit lazy val sqlContext: SQLContext = spark.sqlContext

def sparkConfig: Map[String, String] = Map("spark.driver.host" -> "localhost") //See https://issues.apache.org/jira/browse/SPARK-19394
private var _sparkConfig: Map[String, String] = _

def sparkConfig: Map[String, String] = Map(
"spark.driver.host" -> "localhost" //See https://issues.apache.org/jira/browse/SPARK-19394
)

def createSparkSession(conf: SparkConf): SparkSession =
SparkSession.builder.config(conf).getOrCreate()
@@ -39,6 +49,9 @@ trait SharedSparkSession extends BeforeAndAfterAll {
.setAppName(appName)

sparkConfig.foreach { case (k, v) => conf.setIfMissing(k, v) }
def warehouseDirName = s"$TempDir/spark-warehouse-${UUID.randomUUID().toString}.test.temp"
def warehouseDirPath = new java.io.File(warehouseDirName).getAbsolutePath
conf.set("spark.sql.warehouse.dir", warehouseDirPath)

_spark = createSparkSession(conf)

@@ -50,6 +63,8 @@ trait SharedSparkSession extends BeforeAndAfterAll {
Try(_spark.close())
_spark = null
}
sparkConfig.get("spark.sql.warehouse.dir")
.map(name => FileUtils.deleteQuietly(new File(name)))
} finally {
super.afterAll()
System.clearProperty("spark.driver.port")
@@ -39,8 +39,8 @@ class FileStreamDataSinkSpec extends FunSuite with Matchers with Eventually with
val steamingQuery = Try(data.streamingSink(sinkConfig).write)
steamingQuery shouldBe a[Success[_]]

val sourceData = spark.createDataFrame(TestData)
eventually {
val sourceData = spark.createDataFrame(TestData)
val writtenData: DataFrame = spark.read.json(testPath1)
writtenData.comapreWith(sourceData).areEqual(false) shouldBe true
}
@@ -22,7 +22,6 @@ class FormatAwareStreamingSinkConfigurationSpec extends FunSuite with Matchers w
val config = ConfigFactory.parseString(configStr)

val expected = FileStreamDataSinkConfiguration(
Text,
path = "INPUT_PATH",
genericConfig = GenericStreamDataSinkConfiguration(Text, Map()))
val result = config.extract[FormatAwareStreamingSinkConfiguration]("input")
@@ -41,7 +40,6 @@ class FormatAwareStreamingSinkConfigurationSpec extends FunSuite with Matchers w
val config = ConfigFactory.parseString(configStr)

val expected = FileStreamDataSinkConfiguration(
Json,
path = "INPUT_PATH",
genericConfig = GenericStreamDataSinkConfiguration(Json, Map()))
val result = config.extract[FormatAwareStreamingSinkConfiguration]("input")
@@ -57,9 +57,9 @@ class GenericKafkaStreamDataSourceSpec extends FunSuite

test("Fail gracefully") {
val TestOptions = Map(
"kafka.bootstrap.servers" -> s"unknown_host:0000000",
"subscribe" -> topic,
"startingOffsets" -> "earliest")
"kafka.bootstrap.servers" -> s"unknown_host_garbage_string:0000000",
"NO-LEGAL-SUBSCRIPTION-TYPE" -> topic,
"NO-STARTING-OFFSETS" -> "garbage")
val inputConfig = GenericStreamDataSourceConfiguration(FormatType.Kafka, TestOptions, None)
an[Exception] shouldBe thrownBy(spark.source(inputConfig).read)
}
@@ -4,15 +4,15 @@ import net.manub.embeddedkafka.{ EmbeddedKafka, EmbeddedKafkaConfig }
import org.apache.spark.sql.streaming.Trigger
import org.scalatest._
import org.scalatest.concurrent.Eventually
import org.scalatest.time.{ Millis, Span }
import org.scalatest.time.{ Seconds, Span }
import org.tupol.spark.SharedSparkSession
import org.tupol.spark.implicits._

class KafkaStreamDataSourceSpec extends FunSuite
with Matchers with GivenWhenThen with Eventually with BeforeAndAfter
with SharedSparkSession with EmbeddedKafka {

implicit override val patienceConfig = PatienceConfig(timeout = scaled(Span(10000, Millis)))
implicit override val patienceConfig = PatienceConfig(timeout = scaled(Span(10, Seconds)))

implicit val config = EmbeddedKafkaConfig()
val topic = "testTopic"
@@ -48,10 +48,11 @@ class KafkaStreamDataSourceSpec extends FunSuite
streamingQuery.stop
}
}

test("Fail gracefully") {
val inputConfig = KafkaStreamDataSourceConfiguration(
"unknown_host:0000000",
KafkaSubscription("subscribe", topic), Some("earliest"))
KafkaSubscription("ILLEGAL-SUBSCRIPTION-TYPE", "UNKNOWN-TOPIC"), Some("earliest"))
an[Exception] shouldBe thrownBy(spark.source(inputConfig).read)
}

0 comments on commit 9b3d606

Please sign in to comment.
You can’t perform that action at this time.