### 0. Spark Setup

In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1,org.apache.spark:spark-avro_2.12:3.3.1 pyspark-shell'

In [None]:
from pyspark.sql import SparkSession
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark = SparkSession \
    .builder \
    .appName("Spark-Notebook") \
    .getOrCreate()

### 1. Reading from Kafka Stream

through `readStream`

#### 1.1 Raw Kafka Stream

In [None]:
# default for startingOffsets is "latest"
df_kafka_raw = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
    .option("subscribe", "rides_csv") \
    .option("startingOffsets", "earliest") \
    .option("checkpointLocation", "checkpoint") \
    .load()

In [None]:
df_kafka_raw.printSchema()

#### 1.2 Encoded Kafka Stream

In [None]:
df_kafka_encoded = df_kafka_raw.selectExpr("CAST(key AS STRING)","CAST(value AS STRING)")

In [None]:
df_kafka_encoded.printSchema()

#### 1.3 Structure Streaming DataFrame

In [None]:
def parse_ride_from_kafka_message(df_raw, schema):
    """ take a Spark Streaming df and parse value col based on <schema>, return streaming df cols in schema """
    assert df_raw.isStreaming is True, "DataFrame doesn't receive streaming data"

    df = df_raw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

    # split attributes to nested array in one Column
    col = F.split(df['value'], ', ')

    # expand col to multiple top-level columns
    for idx, field in enumerate(schema):
        df = df.withColumn(field.name, col.getItem(idx).cast(field.dataType))
    return df.select([field.name for field in schema])

In [None]:
ride_schema = T.StructType(
    [T.StructField("vendor_id", T.IntegerType()),
     T.StructField('tpep_pickup_datetime', T.TimestampType()),
     T.StructField('tpep_dropoff_datetime', T.TimestampType()),
     T.StructField("passenger_count", T.IntegerType()),
     T.StructField("trip_distance", T.FloatType()),
     T.StructField("payment_type", T.IntegerType()),
     T.StructField("total_amount", T.FloatType()),
     ])

In [None]:
df_rides = parse_ride_from_kafka_message(df_raw=df_kafka_raw, schema=ride_schema)

In [None]:
df_rides.printSchema()

In [None]:
df_rides.show()

### 2 Sink Operation & Streaming Query

through `writeStream`

---
**Output Sinks**
- File Sink: stores the output to the directory
- Kafka Sink: stores the output to one or more topics in Kafka
- Foreach Sink:
- (for debugging) Console Sink, Memory Sink

Further details can be found in [Output Sinks](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-sinks)

---
There are three types of **Output Modes**:
- Complete: The whole Result Table will be outputted to the sink after every trigger. This is supported for aggregation queries.
- Append (default): Only new rows are added to the Result Table
- Update: Only updated rows are outputted

[Output Modes](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#output-modes) differs based on the set of transformations applied to the streaming data. 

--- 
**Triggers**

The [trigger settings](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#triggers) of a streaming query define the timing of streaming data processing. Spark streaming support micro-batch streamings schema and you can select following options based on requirements.

- default-micro-batch-mode
- fixed-interval-micro-batch-mode
- one-time-micro-batch-mode
- available-now-micro-batch-mode


#### Console and Memory Sink

In [None]:
def sink_console(df, output_mode: str = 'complete', processing_time: str = '5 seconds'):
    write_query = df.writeStream \
        .outputMode(output_mode) \
        .trigger(processingTime=processing_time) \
        .format("console") \
        .option("truncate", False) \
        .start()
    return write_query # pyspark.sql.streaming.StreamingQuery

In [None]:
write_query = sink_console(df_rides, output_mode='append')

In [None]:
def sink_memory(df, query_name, query_template):
    write_query = df \
        .writeStream \
        .queryName(query_name) \
        .format('memory') \
        .start()
    query_str = query_template.format(table_name=query_name)
    query_results = spark.sql(query_str)
    return write_query, query_results

In [None]:
query_name = 'vendor_id_counts'
query_template = 'select count(distinct(vendor_id)) from {table_name}'
write_query, df_vendor_id_counts = sink_memory(df=df_rides, query_name=query_name, query_template=query_template)

In [None]:
print(type(write_query)) # pyspark.sql.streaming.StreamingQuery
write_query.status

In [None]:
df_vendor_id_counts.show()

In [None]:
write_query.stop()

#### Kafka Sink

To write stream results to `kafka-topic`, the stream dataframe has at least a column with name `value`.

Therefore before starting `writeStream` in kafka format, dataframe needs to be updated accordingly.

More information regarding kafka sink expected data structure [here](https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html#writing-data-to-kafka)


In [None]:
def prepare_dataframe_to_kafka_sink(df, value_columns, key_column=None):
    columns = df.columns
    df = df.withColumn("value", F.concat_ws(', ',*value_columns))    
    if key_column:
        df = df.withColumnRenamed(key_column,"key")
        df = df.withColumn("key",df.key.cast('string'))
    return df.select(['key', 'value'])
    
def sink_kafka(df, topic, output_mode='append'):
    write_query = df.writeStream \
        .format("kafka") \
        .option("kafka.bootstrap.servers", "localhost:9092,broker:29092") \
        .outputMode(output_mode) \
        .option("topic", topic) \
        .option("checkpointLocation", "checkpoint") \
        .start()
    return write_query

In [None]:
import json
import time 

from kafka import KafkaProducer


In [None]:
def json_serializer(data):
    return json.dumps(data).encode('utf-8')

server = 'localhost:9092'

producer = KafkaProducer(
    bootstrap_servers=[server],
    value_serializer=json_serializer
)

producer.bootstrap_connected()

In [None]:
t0 = time.time()

topic_name = 'test-topic'

for i in range(10):
    message = {'number': i}
    producer.send(topic_name, value=message)
    print(f"Sent: {message}")
    time.sleep(0.05)
t2 = time.time()
print(f'before flush it took {(t2 - t0):.2f} seconds')
producer.flush()

t1 = time.time()
print(f'after flush it took {(t1 - t0):.2f} seconds')

In [None]:
import pandas as pd
df_green = pd.read_csv(
        '../green_tripdata_2019-10.csv.gz', sep=',', compression='gzip'
        )

df_green = df_green[['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'PULocationID', 'DOLocationID', 'passenger_count', 'trip_distance', 'tip_amount']]

In [None]:


topic_name = 'green-trips'
t0 = time.time()
for row in df_green.itertuples(index=False):
    row_dict = {col: getattr(row, col) for col in row._fields}
    
    # Convert the dictionary to a string to send to Kafka
    message_value = json.dumps(row_dict)
    print(message_value)
    # Send the data to the Kafka topic
    producer.send(topic=topic_name, value=message_value)
t2 = time.time()
print(f'before flush it took {(t2 - t0):.2f} seconds')
producer.flush()

In [None]:
import pyspark
from pyspark.sql import SparkSession

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession \
    .builder \
    .master("local[*]") \
    .appName("GreenTripsConsumer") \
    .config("spark.jars.packages", kafka_jar_package) \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

In [None]:
green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips2") \
    .option("startingOffsets", "earliest") \
    .load()


In [None]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

In [None]:
query.stop()

In [None]:
# from pyspark.sql import types

# schema = types.StructType() \
#     .add("lpep_pickup_datetime", types.StringType()) \
#     .add("lpep_dropoff_datetime", types.StringType()) \
#     .add("PULocationID", types.IntegerType()) \
#     .add("DOLocationID", types.IntegerType()) \
#     .add("passenger_count", types.DoubleType()) \
#     .add("trip_distance", types.DoubleType()) \
#     .add("tip_amount", types.DoubleType())

from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType

# Define the schema for parsing the JSON data
schema = StructType([
    StructField("lpep_pickup_datetime", StringType(), True),
    StructField("lpep_dropoff_datetime", StringType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("passenger_count", DoubleType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True)
])

In [None]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

In [None]:
def peek(mini_batch, batch_id):
    first_row = mini_batch.take(1)

    if first_row:
        print(first_row[0])

query = green_stream.writeStream.foreachBatch(peek).start()

In [None]:

from pyspark.sql import functions as F

def debug_and_process(batch_df, batch_id):
    print(f"Batch ID: {batch_id}")
    batch_df.select(F.col("value").cast('STRING')).show(truncate=False)
    parsed_batch = batch_df.select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
    .select("data.*")
   

    parsed_batch.show(truncate=False)

query = green_stream \
    .writeStream \
    .foreachBatch(debug_and_process) \
    .start()



In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

schema = types.StructType([
    types.StructField('lpep_pickup_datetime', types.StringType(), True),
    types.StructField('lpep_dropoff_datetime', types.StringType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),    
    types.StructField('passenger_count', types.DoubleType(), True),
    types.StructField('trip_distance', types.DoubleType(), True),
    types.StructField('tip_amount', types.DoubleType(), True)
])

pyspark_version = pyspark.__version__
kafka_jar_package = f"org.apache.spark:spark-sql-kafka-0-10_2.12:{pyspark_version}"

spark = SparkSession.builder.master("local[*]").appName('GreenTripsConsumer').\
    config("spark.jars.packages", kafka_jar_package).getOrCreate()

green_stream = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "green-trips") \
    .option("startingOffsets", "earliest")\
    .option("checkpointLocation", "checkpoint") \
    .load()

def debug_and_process(batch_df, batch_id):
    batch_df.select(F.col("value").cast('STRING')).show(truncate=False)
    parsed_batch = batch_df.select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
    .select("data.*")
    
    print(f"Batch ID: {batch_id}")
    parsed_batch.show(truncate=False)

query = green_stream \
    .writeStream \
    .foreachBatch(debug_and_process) \
    .start()

query.awaitTermination()

In [None]:
def op_windowed_groupby(df, window_duration, slide_duration):
    df_windowed_aggregation = df.groupBy(
        F.window(timeColumn=df.tpep_pickup_datetime, windowDuration=window_duration, slideDuration=slide_duration),
        df.vendor_id
    ).count()
    return df_windowed_aggregation

    This is how you can do it:

    # Add a column "timestamp" using the current_timestamp function
    # Group by:
    #     5 minutes window based on the timestamp column (F.window(col("timestamp"), "5 minutes"))
    #     "DOLocationID"
    # Order by count

In [None]:
from pyspark.sql import functions as F

green_stream = green_stream \
  .select(F.from_json(F.col("value").cast('STRING'), schema).alias("data")) \
  .select("data.*")

    # Add a column "timestamp" using the current_timestamp function
    # Group by:
    #     5 minutes window based on the timestamp column (F.window(col("timestamp"), "5 minutes"))
    #     "DOLocationID"
    # Order by count

popular_destinations = green_stream \
    .withColumn("timestamp", F.current_timestamp()) \
    .groupBy(
        F.window("timestamp", "5 minutes"),
        "DOLocationID"
    ) \
    .count() \
    .orderBy(F.col("count").desc())

In [None]:
query = popular_destinations \
    .writeStream \
    .outputMode("complete") \
    .format("console") \
    .option("truncate", "false") \
    .start()

query.awaitTermination()