# Import Liberaries 

In [167]:
import time
import json
import os
import glob
import pandas as pd
from kafka import KafkaProducer
from kafka import KafkaConsumer
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession


In [181]:
def get_files(filepath):
    all_files = []
    for root, dirs, files in os.walk(filepath):
        files = glob.glob(os.path.join(root,'*.json'))
        for f in files :
            all_files.append(os.path.abspath(f))
    
    return all_files

# Get file path and set up producer

In [182]:
files = get_files('meta_data')
producer = KafkaProducer(bootstrap_servers =
  ['localhost:9092'],value_serializer=lambda v: json.dumps(v).encode('utf-8'))

## Produce message and publish to Kafka topic (input_topic)

In [170]:
for file in files:
    with open(file, 'r') as f:
        data = json.load(f)
        producer.send('input_topic', value = data, key =json.dumps(data['userId']).encode('utf-8'))



# Set up enviroment and spark, using spark dataframe to load input_topic


In [171]:
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

sc = SparkSession.builder.appName('Pyspark_kafka_read_write').getOrCreate()

df = sc \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "input_topic") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load() \
    .select("value") \
    .selectExpr("CAST(value AS STRING) as json")




## Set up schema

In [172]:
schema = StructType([ \
    StructField("userId",StringType(),True), \
    StructField("type",StringType(),True), \
    StructField("metadata",StructType([\
                    StructField('messageId', StringType(), True), \
                    StructField('sentAt', LongType(), True), \
                    StructField('timestamp', LongType(), True), \
                    StructField('receivedAt', LongType(), True), \
                    StructField('apiKey', StringType(), True), \
                    StructField('spaceId', StringType(), True), \
                    StructField('version', StringType(), True), \
                    
                    ])),\
    StructField("event", StringType(), True), \
    StructField("eventData", StructType([
            StructField('MovieID', StringType(), True)\
])) \
  ])
 

## Parse the json 

In [173]:
# Parsing and selecting the right column data
df = df.withColumn("jsonData", from_json(col("json"), schema)) \
                .select("jsonData.*")

In [174]:
df.printSchema()

root
 |-- userId: string (nullable = true)
 |-- type: string (nullable = true)
 |-- metadata: struct (nullable = true)
 |    |-- messageId: string (nullable = true)
 |    |-- sentAt: long (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |    |-- receivedAt: long (nullable = true)
 |    |-- apiKey: string (nullable = true)
 |    |-- spaceId: string (nullable = true)
 |    |-- version: string (nullable = true)
 |-- event: string (nullable = true)
 |-- eventData: struct (nullable = true)
 |    |-- MovieID: string (nullable = true)



# Filter and aggregate data

In [175]:
user_info = df.select('userId','metadata.sentAt','metadata.receivedAt')
user_info = user_info.withColumn("seenTime",col('sentAt')+col('receivedAt'))
result = user_info.groupby("userId").agg(min('seenTime').alias('firstSeen'),max('seenTime').alias('lastSeen'))


# Write output topic

In [176]:
query = result.selectExpr("CAST(userId AS STRING) AS key", "to_json(struct(*)) AS value")\
                .write \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "localhost:9092") \
                .option("topic", "output_topic") \
                .option("checkpointLocation", "./check") \
                .save()

In [177]:
consumer = KafkaConsumer ('output_topic',bootstrap_servers = ['localhost:9092'],
value_deserializer=lambda m: json.loads(m.decode('utf-8')),consumer_timeout_ms=5000)