In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from pyspark.ml import Pipeline,PipelineModel

In [2]:
#Spark Session creation configured to interact with Kfka and MongoDB
spark = SparkSession.builder.appName("pyspark-notebook").\
config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-avro_2.12:3.0.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
config("spark.mongodb.input.uri","mongodb://customdockerfile-mongo-1:27017/twitter_data.worldcup").\
config("spark.mongodb.output.uri","mongodb://customdockerfile-mongo-1:27017/twitter_data.worldcup").\
getOrCreate()

Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
:: loading settings :: url = jar:file:/usr/local/lib/python3.7/dist-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-255f47db-8661-43e7-8c4b-210f2da8431f;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in

In [3]:
#Read schema file and create schema of string type
json_schema = ''
with open("schema/out/tweet_schema_worldcup.json") as f:
    new_schema = StructType.fromJson(json.load(f))
    json_schema = new_schema.simpleString()

In [4]:
#Read data from Kafka topic
json_tweets = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "192.168.56.1:9092")\
  .option("subscribe", "twitter_demo")\
  .option("startingOffsets", "earliest")\
  .option("failOnDataLoss", "false") \
  .load()\
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [5]:
json_tweets.printSchema()

root
 |-- key: string (nullable = true)
 |-- value: string (nullable = true)



In [6]:
print(json_schema)

struct<binding_values:string,bookmark_count:bigint,community_note:string,conversation_id:string,creation_date:string,expanded_url:string,extended_entities:struct<media:array<struct<additional_media_info:struct<description:string,embeddable:boolean,monetizable:boolean,source_user:struct<user_results:struct<result:struct<__typename:string,has_graduated_access:boolean,id:string,is_blue_verified:boolean,legacy:struct<can_dm:boolean,can_media_tag:boolean,created_at:string,default_profile:boolean,default_profile_image:boolean,description:string,entities:struct<description:struct<urls:array<string>>,url:struct<urls:array<struct<display_url:string,expanded_url:string,indices:array<bigint>,url:string>>>>,fast_followers_count:bigint,favourites_count:bigint,followers_count:bigint,friends_count:bigint,has_custom_timelines:boolean,is_translator:boolean,listed_count:bigint,location:string,media_count:bigint,name:string,normal_followers_count:bigint,pinned_tweet_ids_str:array<string>,possibly_sensiti

In [7]:
#Refine raw data red from Kafka topic
#refined_tweets = json_tweets\
#        .select(from_json("value", json_schema)\
#        .alias("data"))\
#        .where("data.lang='en'and data.created_at is not null and data.text is not null")\
#        .select("data.text",
#                from_unixtime(col("data.timestamp_ms")/1000,'yyyy-MM-dd HH:mm:ss').alias("timestamp_ms")) #Translate milliseconds to UTC timestamp
#refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', r'http\S+', ''))
#refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '@\w+', ''))
#refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '#', ''))
#refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', 'RT', ''))
#refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', ':', ''))

In [7]:
#Refine raw data red from Kafka topic
refined_tweets = json_tweets\
        .select(from_json("value", json_schema)\
        .alias("data"))\
        .where("data.language='en'and data.creation_date is not null and data.text is not null")\
        .select("data.text",
                from_unixtime(col("data.timestamp")/1000,'yyyy-MM-dd HH:mm:ss').alias("timestamp")) #Translate milliseconds to UTC timestamp
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', r'http\S+', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '@\w+', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '#', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', 'RT', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', ':', ''))

24/11/26 09:58:00 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [8]:
dir = "sentiment/"
model = PipelineModel.load(dir)

                                                                                

In [9]:
def process_row(df, epoch_id):
    """Applies model to the df and writes data to MongoDB

    Parameters
    ----------
    df : DataFrame
        Streaming Dataframe
    epoch_id : int
        Unique id for each micro batch/epoch
    """
    predictions = model.transform(df)
    #predictions.show()
    predictions.select("timestamp","text","prediction").write.format("mongo").mode("append").save()

In [None]:
#Writes streaming dataframe to ForeachBatch console which ingests data to MongoDB
refined_tweets \
    .writeStream \
    .option("checkpointLocation", "checkpoint/data") \
    .foreachBatch(process_row).start().awaitTermination()