<h1> Putting data in Kafka </h1>

In [1]:
!pip install kafka-python



In [2]:
from json import dumps
from kafka import KafkaProducer
import pandas as pd 
import json 

In [3]:
producer = KafkaProducer(bootstrap_servers=('localhost:9092'),
                         value_serializer=lambda x:
                         dumps(x).encode('utf-8'))

In [4]:
data = pd.read_csv("airlines1.csv")
data.head()

Unnamed: 0.1,Unnamed: 0,Year,Quarter,Month,DayofMonth,DayOfWeek,FlightDate,Reporting_Airline,DOT_ID_Reporting_Airline,IATA_CODE_Reporting_Airline,...,Div4WheelsOff,Div4TailNum,Div5Airport,Div5AirportID,Div5AirportSeqID,Div5WheelsOn,Div5TotalGTime,Div5LongestGTime,Div5WheelsOff,Div5TailNum
0,0,1998,1,1,2,5,1998-01-02,NW,19386,NW,...,,,,,,,,,,
1,1,2009,2,5,28,4,2009-05-28,FL,20437,FL,...,,,,,,,,,,
2,2,2013,2,6,29,6,2013-06-29,MQ,20398,MQ,...,,,,,,,,,,
3,3,2010,3,8,31,2,2010-08-31,DL,19790,DL,...,,,,,,,,,,
4,4,2006,1,1,15,7,2006-01-15,US,20355,US,...,,,,,,,,,,


In [5]:
df = data[["Unnamed: 0", "Year", "Reporting_Airline"]]
df.head()

Unnamed: 0.1,Unnamed: 0,Year,Reporting_Airline
0,0,1998,NW
1,1,2009,FL
2,2,2013,MQ
3,3,2010,DL
4,4,2006,US


In [6]:
df.rename(columns = {'Unnamed: 0': 'id'}, inplace = True)

A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  return super().rename(


In [7]:
df.head()

Unnamed: 0,id,Year,Reporting_Airline
0,0,1998,NW
1,1,2009,FL
2,2,2013,MQ
3,3,2010,DL
4,4,2006,US


In [8]:
dict_data = df.to_dict('records')

In [9]:
dict_data[0]

{'id': 0, 'Year': 1998, 'Reporting_Airline': 'NW'}

In [10]:
df.shape

(50001, 3)

In [11]:
for e in range(1000):
    producer.send("airline-topic",value=dict_data[e], key=json.dumps(dict_data[e]["id"]).encode('utf-8'))

<h1>Reading data in batches from Spark </h1>

In [12]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [13]:
import os
from pyspark.sql import SparkSession

os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.2.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.0 pyspark-shell"

sc = SparkSession.builder.appName('Pyspark_kafka_airline_read_write').getOrCreate()



df = sc \
     .read \
     .format("kafka") \
     .option("kafka.bootstrap.servers","localhost:9092") \
     .option("subscribe", "airline-topic") \
     .option("startingOffsets","earliest") \
     .option("endingOffsets","latest") \
     .load() \
     .select("value") \
     .selectExpr("CAST(value AS STRING) as json")

In [14]:
df.show()

+--------------------+
|                json|
+--------------------+
|{"id": 0, "Year":...|
|{"id": 1, "Year":...|
|{"id": 2, "Year":...|
|{"id": 3, "Year":...|
|{"id": 4, "Year":...|
|{"id": 5, "Year":...|
|{"id": 6, "Year":...|
|{"id": 7, "Year":...|
|{"id": 8, "Year":...|
|{"id": 9, "Year":...|
|{"id": 10, "Year"...|
|{"id": 11, "Year"...|
|{"id": 12, "Year"...|
|{"id": 13, "Year"...|
|{"id": 14, "Year"...|
|{"id": 15, "Year"...|
|{"id": 16, "Year"...|
|{"id": 17, "Year"...|
|{"id": 18, "Year"...|
|{"id": 19, "Year"...|
+--------------------+
only showing top 20 rows



In [16]:
jsonSchema = StructType([StructField("id", StringType(), True), StructField("Year", StringType(), True),
                          StructField("Reporting_Airline",StringType(), True)])

In [17]:
#Parsing and selecting the right column data 
df = df.withColumn("jsonData", from_json(col("json"), jsonSchema)) \
                  .select("jsonData.*")

In [18]:
df.show()

+---+----+-----------------+
| id|Year|Reporting_Airline|
+---+----+-----------------+
|  0|1998|               NW|
|  1|2009|               FL|
|  2|2013|               MQ|
|  3|2010|               DL|
|  4|2006|               US|
|  5|1995|               DL|
|  6|2006|               CO|
|  7|2019|               9E|
|  8|2008|               YV|
|  9|2018|               WN|
| 10|1991|               US|
| 11|2014|               WN|
| 12|1994|               AA|
| 13|2013|               OO|
| 14|2003|               UA|
| 15|1988|               PI|
| 16|2007|               NW|
| 17|2015|               AS|
| 18|2006|               UA|
| 19|2017|               WN|
+---+----+-----------------+
only showing top 20 rows



<h1> Transformation on data </h1>

In [19]:
df.select('Reporting_Airline').groupBy('Reporting_Airline').count().show()

+-----------------+-----+
|Reporting_Airline|count|
+-----------------+-----+
|               UA|  168|
|               EA|    8|
|               PI|   14|
|               NK|    4|
|               PS|    2|
|               AA|  246|
|               NW|  104|
|               EV|   54|
|               B6|   34|
|               HP|   38|
|               TW|   34|
|               DL|  296|
|               OO|  120|
|               F9|   16|
|               YV|   30|
|               TZ|    2|
|               US|  164|
|               MQ|   94|
|               OH|   18|
|               HA|   14|
+-----------------+-----+
only showing top 20 rows



In [27]:
#How many aircrafts are there after 2015
df_filtered = df.select('id', 'Year', 'Reporting_Airline').filter('Year >= 2015')

<h1> Putting data back to Kafka </h1>

In [28]:
query = df_filtered.selectExpr("CAST(id AS STRING) AS key", "to_json(struct(*)) AS value") \
                .write \
                .format("kafka") \
                .option('kafka.bootstrap.servers','localhost:9092') \
                .option("topic","airline-sink")\
                .option("checkpointLocation", "./check") \
                .save()