In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'

from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T

from settings import KAFKA_BOOTSTRAP_SERVER, SCHEMA_GREEN_RIDE

### Init a Spark session

In [None]:
spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

### Read from Kafka

In [None]:
df_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVER) \
    .option("kafka.security.protocol", "SASL_SSL") \
    .option("kafka.sasl.mechanisms", "PLAIN") \
    .option("kafka.sasl.jaas.config", \
            "kafkashaded.org.apache.kafka.common.security.plain.PlainLoginModule required username='{}' password='{}';" \
            .format("<FILL>", "<FILL") \
           ) \
    .option("kafka.ssl.endpoint.identification.algorithm", "https") \
    .option("kafka.session.timeout.ms", 4500) \
    .option("subscribe", "rides_green") \
    .option("startingOffset", "earliest") \
    .option("checkpoingLocation", "checkpoint") \
    .load()


#
# security.protocol=SASL_SSL
# sasl.mechanisms=PLAIN
# sasl.username={{ CLUSTER_API_KEY }}
# sasl.password={{ CLUSTER_API_SECRET }}
#
#

In [None]:
df_stream.printSchema()

### Parse Green Ride from Kafka

In [None]:
assert df_stream.isStreaming is True, "DataFrame doesn't receive streaming data"

In [None]:
df = df_stream.selectExpr("CAST(key as STRING)", "CAST(value as STRING)")

In [None]:
col = F.split(df["key"], ",")

In [None]:
for idx, field in enumerate(SCHEMA_GREEN_RIDE):
    df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))

In [None]:
df.select([field.name for field in SCHEMA_GREEN_RIDE])

In [None]:
df.writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime="5 seconds") \
    .option("truncate", False) \
    .start()