The GenericStreamDataSource
framework is a utility framework that helps configuring and reading DataFrame
s from streams.
The framework is composed of two classes:
GenericStreamDataSource
, which is created based on aGenericStreamDataSourceConfiguration
class and provides one main function:override def read(implicit spark: SparkSession): Try[DataFrame]
GenericStreamDataSourceConfiguration
: the necessary configuration parameters
Sample code
import org.tupol.spark.io._
import org.tupol.spark.io.streaming.structured._
implicit val sparkSession: SparkSession = ???
val sourceConfiguration: GenericStreamDataSourceConfiguration = ???
val dataframe = GenericStreamDataSource(sourceConfiguration).read
Optionally, one can use the implicit decorator for the SparkSession
available by importing org.tupol.spark.io.implicits._
.
Sample code
import org.tupol.spark.io._
import org.tupol.spark.io.implicits._
import org.tupol.spark.io.streaming.structured._
val sourceConfiguration: GenericStreamDataSourceConfiguration = ???
val dataframe = spark.streamingSource(sourceConfiguration).read
format
Required- the type of the input file and the corresponding source / parser
- possible values are:
socket
kafka
- file sources:
xml
,csv
,json
,parquet
,avro
,orc
,text
,delta
,...
schema
Optional- this is an optional parameter that represents the json Apache Spark schema that should be enforced on the input data
- this schema can be easily obtained from a
DataFrame
by calling theprettyJson
function - due to it's complex structure, this parameter can not be passed as a command line argument,
but it can only be passed through the
application.conf
file - the schema is applied on read only on file streams; for other streams the developer has to find a way to apply it.
schema.path
Optional- this is an optional parameter that represents local path or the class path to the json Apache Spark schema that should be enforced on the input data
- this schema can be easily obtained from a
DataFrame
by calling theprettyJson
function - if this parameter is found the schema will be loaded from the given file, otherwise,
the
schema
parameter is tried
path
Required- For more details check the File Data Source
Warning: Not for production use!
options
Requiredhost
Requiredport
RequiredincludeTimestamp
Optional
options
RequiredkafkaBootstrapServers
Requiredassign
|subscribe
|subscribePattern
Required *startingOffsets
OptionalendingOffsets
OptionalfailOnDataLoss
OptionalkafkaConsumer.pollTimeoutMs
OptionalfetchOffset.numRetries
OptionalfetchOffset.retryIntervalMs
OptionalmaxOffsetsPerTrigger
Optional