## Putting data in Kafka

In [13]:
from json import dumps
from kafka import KafkaProducer
import pandas as pd
import json

In [14]:
producer = KafkaProducer(bootstrap_servers=['localhost:9092'],
                         value_serializer=lambda x: 
                         dumps(x).encode('utf-8'))

In [15]:
data = pd.read_csv("airlines1.csv")
data.head()

Unnamed: 0.1,Unnamed: 0,Year,Quarter,Month,DayofMonth,DayOfWeek,FlightDate,Reporting_Airline,DOT_ID_Reporting_Airline,IATA_CODE_Reporting_Airline,...,Div4WheelsOff,Div4TailNum,Div5Airport,Div5AirportID,Div5AirportSeqID,Div5WheelsOn,Div5TotalGTime,Div5LongestGTime,Div5WheelsOff,Div5TailNum
0,0,1998,1,1,2,5,1998-01-02,NW,19386,NW,...,,,,,,,,,,
1,1,2009,2,5,28,4,2009-05-28,FL,20437,FL,...,,,,,,,,,,
2,2,2013,2,6,29,6,2013-06-29,MQ,20398,MQ,...,,,,,,,,,,
3,3,2010,3,8,31,2,2010-08-31,DL,19790,DL,...,,,,,,,,,,
4,4,2006,1,1,15,7,2006-01-15,US,20355,US,...,,,,,,,,,,


In [16]:
df = data[["Unnamed: 0","Year", "Reporting_Airline"]]
df.head()

Unnamed: 0.1,Unnamed: 0,Year,Reporting_Airline
0,0,1998,NW
1,1,2009,FL
2,2,2013,MQ
3,3,2010,DL
4,4,2006,US


In [17]:
df.rename(columns = {'Unnamed: 0':'id'}, inplace = True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df.rename(columns = {'Unnamed: 0':'id'}, inplace = True)


In [18]:
df.head()

Unnamed: 0,id,Year,Reporting_Airline
0,0,1998,NW
1,1,2009,FL
2,2,2013,MQ
3,3,2010,DL
4,4,2006,US


In [19]:
df.shape

(50001, 3)

In [20]:
dict_data = df.to_dict('records')

In [21]:
dict_data[0]

{'id': 0, 'Year': 1998, 'Reporting_Airline': 'NW'}

In [22]:
for e in range(1000):
    producer.send("airline-topic", value=dict_data[e],key=json.dumps(dict_data[e]["id"]).encode('utf-8'))

## Reading data in batches from Spark

In [23]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
!conda install -c cyclus java-jdk

In [None]:
#import library 
import os
from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell'

sc = SparkSession.builder.appName('Pyspark_kafka_airline_read_write').getOrCreate()

df = sc \
    .read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "airline-topic") \
    .option("startingOffsets", "earliest") \
    .option("endingOffsets", "latest") \
    .load() \
    .select("value") \
    .selectExpr("CAST(value AS STRING) as json")


In [None]:
df.show()

In [None]:
jsonSchema = StructType([StructField("id", StringType(), True), StructField("Year", StringType(), True),
                                     StructField("Reporting_Airline", StringType(), True)])

In [None]:
# Parsing and selecting the right column data
df = df.withColumn("jsonData", from_json(col("json"), jsonSchema)) \
                .select("jsonData.*")

In [None]:
df.show()

## Transformation on data

In [None]:
df.select('Reporting_Airline').groupBy('Reporting_Airline').count().show()

In [None]:
# how many aircrafts are there after 2015
df_filtered = df.select('id','Year','Reporting_Airline').filter('Year >= 2015')

## Putting data back to Kafka

In [None]:
query = df_filtered.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value")\
                .write \
                .format("kafka") \
                .option("kafka.bootstrap.servers", "localhost:9092") \
                .option("topic", "airline-sink") \
                .option("checkpointLocation", "./check") \
                .save()