## Assignment 9.3

In [1]:
import os
import shutil
import json
from pathlib import Path

import pandas as pd

from kafka import KafkaProducer, KafkaAdminClient
from kafka.admin.new_topic import NewTopic
from kafka.errors import TopicAlreadyExistsError

from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
from pyspark.sql.functions import window, from_json, col, expr, to_json, struct, when
from pyspark.sql.types import StringType, TimestampType, DoubleType, StructField, StructType
from pyspark.sql.functions import udf

current_dir = Path(os.getcwd()).absolute()
checkpoint_dir = current_dir.joinpath('checkpoints')
joined_checkpoint_dir = checkpoint_dir.joinpath('joined')

if joined_checkpoint_dir.exists():
    shutil.rmtree(joined_checkpoint_dir)

joined_checkpoint_dir.mkdir(parents=True, exist_ok=True)

### Configuration Parameters 

> **TODO:** Change the configuration prameters to the appropriate values for your setup.

In [2]:
config = dict(
    bootstrap_servers=['kafka.kafka.svc.cluster.local:9092'],
    first_name='Josh',
    last_name='Greenert'
)

config['client_id'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)
config['topic_prefix'] = '{}{}'.format(
    config['last_name'], 
    config['first_name']
)

config['locations_topic'] = '{}-locations'.format(config['topic_prefix'])
config['accelerations_topic'] = '{}-accelerations'.format(config['topic_prefix'])
config['joined_topic'] = '{}-joined'.format(config['topic_prefix'])

config

{'bootstrap_servers': ['kafka.kafka.svc.cluster.local:9092'],
 'first_name': 'Josh',
 'last_name': 'Greenert',
 'client_id': 'GreenertJosh',
 'topic_prefix': 'GreenertJosh',
 'locations_topic': 'GreenertJosh-locations',
 'accelerations_topic': 'GreenertJosh-accelerations',
 'joined_topic': 'GreenertJosh-joined'}

### Create Topic Utility Function

The `create_kafka_topic` helps create a Kafka topic based on your configuration settings.  For instance, if your first name is *John* and your last name is *Doe*, `create_kafka_topic('locations')` will create a topic with the name `DoeJohn-locations`.  The function will not create the topic if it already exists. 

In [3]:
def create_kafka_topic(topic_name, config=config, num_partitions=1, replication_factor=1):
    bootstrap_servers = config['bootstrap_servers']
    client_id = config['client_id']
    topic_prefix = config['topic_prefix']
    name = '{}-{}'.format(topic_prefix, topic_name)
    
    admin_client = KafkaAdminClient(
        bootstrap_servers=bootstrap_servers, 
        client_id=client_id
    )
    
    topic = NewTopic(
        name=name,
        num_partitions=num_partitions,
        replication_factor=replication_factor
    )

    topic_list = [topic]
    try:
        admin_client.create_topics(new_topics=topic_list)
        print('Created topic "{}"'.format(name))
    except TopicAlreadyExistsError as e:
        print('Topic "{}" already exists'.format(name))

create_kafka_topic('joined')

Topic "GreenertJosh-joined" already exists


**TODO:** This code is identical to the code used in 9.1 to publish acceleration and location data to the `LastnameFirstname-simple` topic. You will need to add in the code you used to create the `df_accelerations` dataframe. In order to read data from this topic, make sure that you are running the notebook you created in assignment 8 that publishes acceleration and location data to the LastnameFirstname-simple topic.

In [4]:
spark = SparkSession\
    .builder\
    .appName("Assignment09")\
    .getOrCreate()

df_locations = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka.kafka.svc.cluster.local:9092") \
  .option("subscribe", config['locations_topic']) \
  .load()

## TODO: Add code to create the df_accelerations dataframe
df_accelerations = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka.kafka.svc.cluster.local:9092") \
  .option("subscribe", config['accelerations_topic']) \
  .load()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/14 01:41:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/05/14 01:41:11 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/05/14 01:41:11 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


The following code defines a Spark schema for location and acceleration data as well as a user-defined function (UDF) for parsing the location and acceleration JSON data. 

In [5]:
location_schema = StructType([
    StructField('offset', DoubleType(), nullable=True),
    StructField('id', StringType(), nullable=True),
    StructField('ride_id', StringType(), nullable=True),
    StructField('uuid', StringType(), nullable=True),
    StructField('course', DoubleType(), nullable=True),
    StructField('latitude', DoubleType(), nullable=True),
    StructField('longitude', DoubleType(), nullable=True),
    StructField('geohash', StringType(), nullable=True),
    StructField('speed', DoubleType(), nullable=True),
    StructField('accuracy', DoubleType(), nullable=True),
])

acceleration_schema = StructType([
    StructField('offset', DoubleType(), nullable=True),
    StructField('id', StringType(), nullable=True),
    StructField('ride_id', StringType(), nullable=True),
    StructField('uuid', StringType(), nullable=True),
    StructField('x', DoubleType(), nullable=True),
    StructField('y', DoubleType(), nullable=True),
    StructField('z', DoubleType(), nullable=True),
])

udf_parse_acceleration = udf(lambda x: json.loads(x.decode('utf-8')), acceleration_schema)
udf_parse_location = udf(lambda x: json.loads(x.decode('utf-8')), location_schema)

**TODO:**  

* Complete the code to create the `accelerationsWithWatermark` dataframe. 
  * Select the `timestamp` field with the alias `acceleration_timestamp`
  * Use the `udf_parse_acceleration` UDF to parse the JSON values
  * Select the `ride_id` as `acceleration_ride_id`
  * Select the `x`, `y`, and `z` columns
  * Use the same watermark timespan used in the `locationsWithWatermark` dataframe

In [8]:
locationsWithWatermark = df_locations \
  .select(
    col('timestamp').alias('location_timestamp'), 
    udf_parse_location(df_locations['value']).alias('json_value')
   ) \
  .select(
    col('location_timestamp'), 
    col('json_value.ride_id').alias('location_ride_id'),
    col('json_value.speed').alias('speed'),
    col('json_value.latitude').alias('latitude'),
    col('json_value.longitude').alias('longitude'),
    col('json_value.geohash').alias('geohash'),
    col('json_value.accuracy').alias('accuracy')
  ) \
 .withWatermark('location_timestamp', "2 seconds")

accelerationsWithWatermark = df_accelerations \
    .select(
        col("timestamp").alias("acceleration_timestamp"),
        udf_parse_acceleration(df_accelerations["value"]).alias("json_value")
    ) \
    .select(
        col("acceleration_timestamp"),
        col("json_value.ride_id").alias("acceleration_ride_id"),
        col("json_value.x"),
        col("json_value.y"),
        col("json_value.z")
    ) \
    .withWatermark("acceleration_timestamp", "2 seconds")

**TODO:**  

* Complete the code to create the `df_joined` dataframe. See http://spark.apache.org/docs/latest/structured-streaming-programming-guide.html#stream-stream-joins for additional information. 

In [10]:
df_joined = locationsWithWatermark \
    .join(
        accelerationsWithWatermark,
        (locationsWithWatermark["location_ride_id"] == accelerationsWithWatermark["acceleration_ride_id"])
        & (locationsWithWatermark["location_timestamp"] == accelerationsWithWatermark["acceleration_timestamp"]),
        "inner"
    ) \
    .select(
        locationsWithWatermark["location_ride_id"].alias("ride_id"),
        locationsWithWatermark["location_timestamp"].alias("timestamp"),
        locationsWithWatermark["speed"],
        locationsWithWatermark["latitude"],
        locationsWithWatermark["longitude"],
        locationsWithWatermark["geohash"],
        locationsWithWatermark["accuracy"],
        accelerationsWithWatermark["x"],
        accelerationsWithWatermark["y"],
        accelerationsWithWatermark["z"]
    )
df_joined

DataFrame[ride_id: string, timestamp: timestamp, speed: double, latitude: double, longitude: double, geohash: string, accuracy: double, x: double, y: double, z: double]

If you correctly created the `df_joined` dataframe, you should be able to use the following code to create a streaming query that outputs results to the `LastnameFirstname-joined` topic. 

In [11]:
ds_joined = df_joined \
  .withColumn(
    'value', 
    to_json(
        struct(
            'ride_id', 'location_timestamp', 'speed', 
            'latitude', 'longitude', 'geohash', 'accuracy', 
            'acceleration_timestamp', 'x', 'y', 'z'
        )
    )
    ).withColumn(
     'key', col('ride_id')
    ) \
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)") \
  .writeStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "kafka.kafka.svc.cluster.local:9092") \
  .option("topic", config['joined_topic']) \
  .option("checkpointLocation", str(joined_checkpoint_dir)) \
  .start()

try:
    ds_joined.awaitTermination()
except KeyboardInterrupt:
    print("STOPPING STREAMING DATA")

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `location_timestamp` cannot be resolved. Did you mean one of the following? [`timestamp`, `latitude`, `longitude`, `ride_id`, `accuracy`].;
'Project [ride_id#177, timestamp#178-T2000ms, speed#120, latitude#121, longitude#122, geohash#123, accuracy#124, x#145, y#146, z#147, to_json(struct(ride_id, ride_id#177, location_timestamp, 'location_timestamp, speed, speed#120, latitude, latitude#121, longitude, longitude#122, geohash, geohash#123, accuracy, accuracy#124, acceleration_timestamp, 'acceleration_timestamp, x, x#145, y, y#146, z, z#147), Some(Etc/UTC)) AS value#189]
+- Project [location_ride_id#119 AS ride_id#177, location_timestamp#114-T2000ms AS timestamp#178-T2000ms, speed#120, latitude#121, longitude#122, geohash#123, accuracy#124, x#145, y#146, z#147]
   +- Join Inner, ((location_ride_id#119 = acceleration_ride_id#143) AND (location_timestamp#114-T2000ms = acceleration_timestamp#138-T2000ms))
      :- EventTimeWatermark location_timestamp#114: timestamp, 2 seconds
      :  +- Project [location_timestamp#114, json_value#116.ride_id AS location_ride_id#119, json_value#116.speed AS speed#120, json_value#116.latitude AS latitude#121, json_value#116.longitude AS longitude#122, json_value#116.geohash AS geohash#123, json_value#116.accuracy AS accuracy#124]
      :     +- Project [timestamp#12 AS location_timestamp#114, <lambda>(value#8)#115 AS json_value#116]
      :        +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@32b02f2b, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@771d93c8, [kafka.bootstrap.servers=kafka.kafka.svc.cluster.local:9092, subscribe=GreenertJosh-locations], [key#7, value#8, topic#9, partition#10, offset#11L, timestamp#12, timestampType#13], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42ccce36,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> kafka.kafka.svc.cluster.local:9092, subscribe -> GreenertJosh-locations),None), kafka, [key#0, value#1, topic#2, partition#3, offset#4L, timestamp#5, timestampType#6]
      +- EventTimeWatermark acceleration_timestamp#138: timestamp, 2 seconds
         +- Project [acceleration_timestamp#138, json_value#140.ride_id AS acceleration_ride_id#143, json_value#140.x AS x#145, json_value#140.y AS y#146, json_value#140.z AS z#147]
            +- Project [timestamp#33 AS acceleration_timestamp#138, <lambda>(value#29)#139 AS json_value#140]
               +- StreamingRelationV2 org.apache.spark.sql.kafka010.KafkaSourceProvider@4f70db13, kafka, org.apache.spark.sql.kafka010.KafkaSourceProvider$KafkaTable@3b105f35, [kafka.bootstrap.servers=kafka.kafka.svc.cluster.local:9092, subscribe=GreenertJosh-accelerations], [key#28, value#29, topic#30, partition#31, offset#32L, timestamp#33, timestampType#34], StreamingRelation DataSource(org.apache.spark.sql.SparkSession@42ccce36,kafka,List(),None,List(),None,Map(kafka.bootstrap.servers -> kafka.kafka.svc.cluster.local:9092, subscribe -> GreenertJosh-accelerations),None), kafka, [key#21, value#22, topic#23, partition#24, offset#25L, timestamp#26, timestampType#27]
