# Batch job to read Kafka topic to Spark and save it to Parquet file

This notebook can be used to read and save data from Kafka Topic to prevent data loss after Kafka 7 day retention period.

Some cleaning to data is also done and user can modify what data to save. Data in Kafka topic is in JSON format. Twitter JSON structure can be studied from [Twitter Developer pages](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/intro-to-tweet-json.html) and fields to save can be specified later in this notebook. 

This notebook introduces also some basics of Apache Spark so there are some extra data printing and schema manipulation presented with explanations.

In [1]:
# Import spark sql kafka package from Maven repository
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0 pyspark-shell'

In [2]:
# Import necessary functions to use Spark SQL and json format
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField, StructType , LongType, StringType
from pyspark.sql.functions import from_json, col
import json

In [3]:
# Create Spark Session 
spark = SparkSession \
    .builder \
    .appName("Kafka topic to Parquet") \
    .getOrCreate()

In [4]:
# Create batch job to subscribe to the topic twitter-tweets and get all data from it (between the earliest and the latest offsets) 
df = spark \
  .read \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "my-cluster-kafka:9092") \
  .option("subscribe", "twitter-tweets") \
  .option("startingOffsets", "earliest") \
  .option("endingOffsets", "latest") \
  .load()

---
## Some data exploration to understand why we need to alter schema and do datatype transformations 

In [5]:
# Counting tweets
df.count()

50143

In [6]:
# Sample of content and schema of data
df.show(5)
df.printSchema()

+----+--------------------+--------------+---------+------+--------------------+-------------+
| key|               value|         topic|partition|offset|           timestamp|timestampType|
+----+--------------------+--------------+---------+------+--------------------+-------------+
|null|[7B 22 63 72 65 6...|twitter-tweets|        2|     0|2019-07-22 13:28:...|            0|
|null|[7B 22 63 72 65 6...|twitter-tweets|        2|     1|2019-07-22 13:28:...|            0|
|null|[7B 22 63 72 65 6...|twitter-tweets|        2|     2|2019-07-22 13:28:...|            0|
|null|[7B 22 63 72 65 6...|twitter-tweets|        2|     3|2019-07-22 13:28:...|            0|
|null|[7B 22 63 72 65 6...|twitter-tweets|        2|     4|2019-07-22 13:28:...|            0|
+----+--------------------+--------------+---------+------+--------------------+-------------+
only showing top 5 rows

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- p

Raw Kafka topic data includes tweets in 'value' column in binary format.
We want to take just that 'value' column and change data type to string

In [7]:
jsonDF = df.selectExpr("CAST(value AS STRING)")

Data from 'value' column is now string in 'jsonDF' DataFrame and because Twitter data is JSON we need to extract that JSON and get what data we want from it.

**If you want to save more fields, consult [Twitter Developer pages](https://developer.twitter.com/en/docs/tweets/data-dictionary/overview/intro-to-tweet-json.html) API documentation and modify schema below.**

In [8]:
# First create schema of our data with proper datatypes and then convert dataframe to use that schema
schema = StructType([StructField('created_at', StringType(), True),
                     StructField('id', LongType(), True),
                     StructField('user', StructType([StructField('id', LongType(), True),
                                         StructField('name', StringType(), True),
                                         StructField('screen_name', StringType(), True)]), True),
                     StructField('text', StringType(), True)])

tweetNestedDF = jsonDF.select(from_json(col("value"), schema).alias("tweet"))

tweetNestedDF.printSchema()

root
 |-- tweet: struct (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- id: long (nullable = true)
 |    |-- user: struct (nullable = true)
 |    |    |-- id: long (nullable = true)
 |    |    |-- name: string (nullable = true)
 |    |    |-- screen_name: string (nullable = true)
 |    |-- text: string (nullable = true)



In [9]:
# Let's flatten the structure and change also some column names to avoid confusion with two different id column
tweetFlattenedDf = tweetNestedDF.selectExpr("tweet.created_at", "tweet.id as tweet_id", "tweet.user.id as user_id", "tweet.user.name as user_name", "tweet.user.screen_name", "tweet.text")

tweetFlattenedDf.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- tweet_id: long (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_name: string (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- text: string (nullable = true)



In [None]:
# Show first 5 items from dataframe
tweetFlattenedDf.show(5, truncate=False)

## Save data to disk

Saving data to Parquet file. Kafka retention period is only 7 days by default so data should be saved for later use. Please note that write will fail if file already exists. Change file name if you run this repeatedly.

In [None]:
tweetFlattenedDf.write.parquet("data/tweets.parquet")