In [1]:
from kafka import KafkaConsumer, KafkaProducer
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
import pyspark.sql.types as T
import json
import pandas as pd

In [2]:
spark = (
    SparkSession.builder.master("local[1]")
    .appName("Data Transformation")
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.1")
    .getOrCreate()
)

In [3]:
spark

In [4]:
titanic_schema = T.StructType([
    T.StructField("Timestamp", T.TimestampType(), True),
    T.StructField("string_columns", T.StructType([
        T.StructField("Name", T.StringType(), True),
        T.StructField("Sex", T.StringType(), True),
        T.StructField("Age", T.StringType(), True),
        T.StructField("Ticket", T.StringType(), True),
        T.StructField("Fare", T.StringType(), True),
        T.StructField("Embarked", T.StringType(), True)
    ]), True),
    T.StructField("numeric_columns", T.StructType([
        T.StructField("PassengerId", T.IntegerType(), True),
        T.StructField("Survived", T.IntegerType(), True),
        T.StructField("Pclass", T.IntegerType(), True),
        T.StructField("SibSp", T.IntegerType(), True),
        T.StructField("Parch", T.IntegerType(), True)
    ]), True)
])

In [5]:
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "topic_nested") \
  .option("failOnDataLoss", "true") \
  .load()
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
df= df.withColumn("message_content", F.from_json(F.col("value").cast("string"), titanic_schema))
df_minimal = df.select("message_content.*")

In [7]:
df_minimal.show()

+-------------------+--------------------+----------------+
|          Timestamp|      string_columns| numeric_columns|
+-------------------+--------------------+----------------+
|2020-01-01 13:45:25|{Braund, Mr. Owen...| {1, 0, 3, 1, 0}|
|2020-01-01 13:44:48|{Cumings, Mrs. Jo...| {2, 1, 1, 1, 0}|
|2020-01-01 13:38:11|{Heikkinen, Miss....| {3, 1, 3, 0, 0}|
|2020-01-01 13:32:00|{Futrelle, Mrs. J...| {4, 1, 1, 1, 0}|
|2020-01-01 13:36:30|{Allen, Mr. Willi...| {5, 0, 3, 0, 0}|
|2020-01-01 13:31:39|{Moran, Mr. James...| {6, 0, 3, 0, 0}|
|2020-01-01 13:37:31|{McCarthy, Mr. Ti...| {7, 0, 1, 0, 0}|
|2020-01-01 13:49:08|{Palsson, Master....| {8, 0, 3, 3, 1}|
|2020-01-01 13:33:42|{Johnson, Mrs. Os...| {9, 1, 3, 0, 2}|
|2020-01-01 13:32:53|{Nasser, Mrs. Nic...|{10, 1, 2, 1, 0}|
|2020-01-01 13:32:23|{Sandstrom, Miss....|{11, 1, 3, 1, 1}|
|2020-01-01 13:30:12|{Bonnell, Miss. E...|{12, 1, 1, 0, 0}|
|2020-01-01 13:33:34|{Saundercock, Mr....|{13, 0, 3, 0, 0}|
|2020-01-01 13:30:20|{Andersson, Mr. A..

In [16]:
new_df= df_minimal.select("Timestamp","string_columns.Name","string_columns.Sex","string_columns.Age","string_columns.Ticket","string_columns.Fare","string_columns.Embarked","numeric_columns.PassengerId","numeric_columns.Survived","numeric_columns.Pclass","numeric_columns.SibSp","numeric_columns.Parch")

In [17]:
new_df.show()

+-------------------+--------------------+------+----+----------------+-------+--------+-----------+--------+------+-----+-----+
|          Timestamp|                Name|   Sex| Age|          Ticket|   Fare|Embarked|PassengerId|Survived|Pclass|SibSp|Parch|
+-------------------+--------------------+------+----+----------------+-------+--------+-----------+--------+------+-----+-----+
|2020-01-01 13:45:25|Braund, Mr. Owen ...|  male|  22|       A/5 21171|   7.25|       S|          1|       0|     3|    1|    0|
|2020-01-01 13:44:48|Cumings, Mrs. Joh...|female|  38|        PC 17599|71.2833|       C|          2|       1|     1|    1|    0|
|2020-01-01 13:38:11|Heikkinen, Miss. ...|female|  26|STON/O2. 3101282|  7.925|       S|          3|       1|     3|    0|    0|
|2020-01-01 13:32:00|Futrelle, Mrs. Ja...|female|  35|          113803|   53.1|       S|          4|       1|     1|    1|    0|
|2020-01-01 13:36:30|Allen, Mr. Willia...|  male|  35|          373450|   8.05|       S|         

In [18]:
df_remove_duplicates = new_df.dropDuplicates()
df_remove_duplicates.show()

+-------------------+--------------------+------+----+-----------------+--------+--------+-----------+--------+------+-----+-----+
|          Timestamp|                Name|   Sex| Age|           Ticket|    Fare|Embarked|PassengerId|Survived|Pclass|SibSp|Parch|
+-------------------+--------------------+------+----+-----------------+--------+--------+-----------+--------+------+-----+-----+
|2020-01-01 13:46:21|"Lovell, Mr. John...|  male|  20|        A/5 21173|    7.25|       S|        228|       0|     3|    0|    0|
|2020-01-01 13:36:37|Cardeza, Mr. Thom...|  male|  36|         PC 17755|512.3292|       C|        680|       1|     1|    0|    1|
|2020-01-01 13:36:29|Alexander, Mr. Wi...|  male|  26|             3474|  7.8875|       S|        811|       0|     3|    0|    0|
|2020-01-01 13:39:42|    Calic, Mr. Petar|  male|  17|           315086|  8.6625|       S|        501|       0|     3|    0|    0|
|2020-01-01 13:32:00|Futrelle, Mrs. Ja...|female|  35|           113803|    53.1|  

In [19]:
Num_duplicates_dropped=new_df.count() - df_remove_duplicates.count()

In [21]:
Num_duplicates_dropped

891

In [22]:
drop_null_df=df_remove_duplicates.na.drop(subset=["Age","Embarked"])

In [23]:
drop_null_df.show()

+-------------------+--------------------+------+---+-----------------+--------+--------+-----------+--------+------+-----+-----+
|          Timestamp|                Name|   Sex|Age|           Ticket|    Fare|Embarked|PassengerId|Survived|Pclass|SibSp|Parch|
+-------------------+--------------------+------+---+-----------------+--------+--------+-----------+--------+------+-----+-----+
|2020-01-01 13:46:21|"Lovell, Mr. John...|  male| 20|        A/5 21173|    7.25|       S|        228|       0|     3|    0|    0|
|2020-01-01 13:36:37|Cardeza, Mr. Thom...|  male| 36|         PC 17755|512.3292|       C|        680|       1|     1|    0|    1|
|2020-01-01 13:36:29|Alexander, Mr. Wi...|  male| 26|             3474|  7.8875|       S|        811|       0|     3|    0|    0|
|2020-01-01 13:39:42|    Calic, Mr. Petar|  male| 17|           315086|  8.6625|       S|        501|       0|     3|    0|    0|
|2020-01-01 13:32:00|Futrelle, Mrs. Ja...|female| 35|           113803|    53.1|       S| 

In [24]:
Num_nulls_dropped=df_remove_duplicates.count() - drop_null_df.count()

In [25]:
Num_nulls_dropped

179

In [26]:
drop_col_df=drop_null_df.drop("Pclass","SibSp","Parch")
drop_col_df.show()

+-------------------+--------------------+------+---+-----------------+--------+--------+-----------+--------+
|          Timestamp|                Name|   Sex|Age|           Ticket|    Fare|Embarked|PassengerId|Survived|
+-------------------+--------------------+------+---+-----------------+--------+--------+-----------+--------+
|2020-01-01 13:46:21|"Lovell, Mr. John...|  male| 20|        A/5 21173|    7.25|       S|        228|       0|
|2020-01-01 13:36:37|Cardeza, Mr. Thom...|  male| 36|         PC 17755|512.3292|       C|        680|       1|
|2020-01-01 13:36:29|Alexander, Mr. Wi...|  male| 26|             3474|  7.8875|       S|        811|       0|
|2020-01-01 13:39:42|    Calic, Mr. Petar|  male| 17|           315086|  8.6625|       S|        501|       0|
|2020-01-01 13:32:00|Futrelle, Mrs. Ja...|female| 35|           113803|    53.1|       S|          4|       1|
|2020-01-01 13:34:35|Jansson, Mr. Carl...|  male| 21|           350034|  7.7958|       S|        392|       1|
|

In [27]:
drop_col_df.printSchema()

root
 |-- Timestamp: timestamp (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: string (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: string (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)



In [36]:
final_dataframe=drop_col_df.withColumn("Age",F.col("Age").cast(T.IntegerType())).withColumn("Fare",F.col("Fare").cast(T.FloatType()))

In [37]:
final_dataframe.printSchema()

root
 |-- Timestamp: timestamp (nullable = true)
 |-- Name: string (nullable = true)
 |-- Sex: string (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Ticket: string (nullable = true)
 |-- Fare: float (nullable = true)
 |-- Embarked: string (nullable = true)
 |-- PassengerId: integer (nullable = true)
 |-- Survived: integer (nullable = true)



In [38]:
final_dataframe.write.json("transformed_data")