# Batch Stream with Spark

## Spark Configuration
| Setting | Value | Description |
| ---- | ---- | ---- |
| spark.master | spark://spark-master:7077 | - |
| spark.jars.packages | org.apache.spark:spark-avro_2.11:2.4.0,org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0,com.databricks:spark-avro_2.11:4.0.0 | - |

In [1]:
%pyspark


# `from_avro` requires Avro schema in JSON string format.
jsonFormatSchema = open("/opt/workspace/schemas/tweet-summary-event-avro-schema.avsc", "r").read()

print(jsonFormatSchema)

In [2]:
%pyspark

# from pyspark.sql.avro.functions import from_avro, to_avro -> for scala 2.12
import avro.schema
from avro.datafile import DataFileReader, DataFileWriter

schema = avro.schema.parse(jsonFormatSchema)

df = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "kafka:29092")\
  .option("subscribe", "santander-in-turismosdr-tweets")\
  .load()

# 1. Decode the Avro data into a struct;
# 2. Filter by column `favorite_color`;
# 3. Encode the column `name` in Avro format.
output = df\
  .select(from_avro("value", jsonFormatSchema).alias("tweet"))\
  .where('user.favorite_color == "red"')\
  .select(to_avro("user.name").alias("value"))

query = output\
  .writeStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "host1:port1,host2:port2")\
  .option("topic", "santander-in-turismosdr-tweets-summary")\
  .start()