This repo gives an example of unit test for Spark Structured Streaming queries in Java. IntelliJ is used as IDE, Maven as package manager, and JUnit as test framework.
The query under test is a simple stream enrichment where a stream of incoming events is joined with a reference dataset, here adding human-friendly names to event IDs.
public static Dataset<Row> setupProcessing(SparkSession spark, Dataset<Row> stream, Dataset<Row> reference) {
return stream.join(reference, "Id");
}
The unit test follows the Arrange/Act/Assert pattern.
@Test
void testSetupProcessing() {
Dataset<Row> stream = createStreamingDataFrame();
Dataset<Row> reference = createStaticDataFrame();
stream = Main.setupProcessing(spark, stream, reference);
List<Row> result = processData(stream);
assertEquals(3, result.size());
assertEquals(RowFactory.create(2, 20, "Name2"), result.get(0));
}
The main trick in this repo was to mock the inputs and outputs to isolate the query.
Two options work well:
- Using MemoryStream and defining the data in the code. Specifying a stream schema in
MemoryStream
did not look obvious to me, so I used rows of CSV strings that are parsed into typed columns using a SQLSELECT
expression.
private static Dataset<Row> createStreamingDataFrame() {
MemoryStream<String> input = new MemoryStream<String>(42, spark.sqlContext(), Encoders.STRING());
input.addData(JavaConversions.asScalaBuffer(Arrays.asList(
"2,20",
"3,30",
"1,10")).toSeq());
return input.toDF().selectExpr(
"cast(split(value,'[,]')[0] as int) as Id",
"cast(split(value,'[,]')[1] as int) as Count");
}
- Using a folder a CSV files
Dataset<Row> stream = spark.readStream()
.schema(new StructType()
.add("Id", DataTypes.IntegerType)
.add("Count", DataTypes.IntegerType))
.csv("data\\input\\stream");
Two options here too:
- Using an in-memory
List<Row>
private static Dataset<Row> createStaticDataFrame() {
return spark.createDataFrame(
Arrays.asList(
RowFactory.create(1, "Name1"),
RowFactory.create(2, "Name2"),
RowFactory.create(3, "Name3"),
RowFactory.create(4, "Name4")
),
new StructType()
.add("Id", DataTypes.IntegerType)
.add("Name", DataTypes.StringType));
}
- Using a CSV file
Dataset<Row> reference = spark.read()
.schema(new StructType()
.add("Id", DataTypes.IntegerType)
.add("Name", DataTypes.StringType))
.csv("data\\input\\reference.csv");
There MemorySink is used to collect the output data into an Output
table that is then queried to obtain a List<Row>
.
private static List<Row> processData(Dataset<Row> stream) {
stream.writeStream()
.format("memory")
.queryName("Output")
.outputMode(OutputMode.Append())
.start()
.processAllAvailable();
return spark.sql("select * from Output").collectAsList();
}
The last piece is the Spark session that hosts the data-processing pipeline locally.
@BeforeAll
public static void setUpClass() throws Exception {
spark = SparkSession.builder()
.appName("SparkStructuredStreamingDemo")
.master("local[2]")
.getOrCreate();
}