In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.avro.functions import from_avro, to_avro
from pyspark.sql.functions import *
from pyspark.sql.types import *
import json
from pyspark.ml import Pipeline,PipelineModel

In [2]:
#Spark Session creation configured to interact with Kfka and MongoDB
spark = SparkSession.builder.appName("pyspark-notebook").\
config("spark.jars.packages","org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.0,org.apache.spark:spark-avro_2.12:3.0.0,org.mongodb.spark:mongo-spark-connector_2.12:3.0.0").\
config("spark.mongodb.input.uri","mongodb://docker_mongo_1:27017/twitter_db.tweets").\
config("spark.mongodb.output.uri","mongodb://docker_mongo_1:27017/twitter_db.tweets").\
getOrCreate()

22/10/08 14:50:51 WARN Utils: Your hostname, Daos-MacBook-Air.local resolves to a loopback address: 127.0.0.1; using 192.168.182.169 instead (on interface en0)
22/10/08 14:50:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/opt/homebrew/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/daoanhtuan/.ivy2/cache
The jars for the packages stored in: /Users/daoanhtuan/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-bad6dcab-f62c-4d75-8983-b5c9b6a0060c;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.0 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.7.5 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
	found org.apache.spark#spark-avro_2.12;3.0.0 in central
	fo

22/10/08 14:50:53 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/10/08 14:50:55 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [None]:
#Read schema file and create schema of string type
json_schema = ''
with open("schema/out/tweet_schema.json") as f:
    new_schema = StructType.fromJson(json.load(f))
    json_schema = new_schema.simpleString()

In [None]:
#Read data from Kafka topic
json_tweets = spark\
  .readStream\
  .format("kafka")\
  .option("kafka.bootstrap.servers", "ec2-35-180-174-18.eu-west-3.compute.amazonaws.com:9092")\
  .option("subscribe", "twitter_demo")\
  .option("startingOffsets", "earliest")\
  .load()\
  .selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

In [None]:
#Refine raw data red from Kafka topic
refined_tweets = json_tweets\
        .select(from_json("value", json_schema)\
        .alias("data"))\
        .where("data.lang='en'and data.created_at is not null and data.text is not null")\
        .select("data.text",
                from_unixtime(col("data.timestamp_ms")/1000,'yyyy-MM-dd HH:mm:ss').alias("timestamp_ms")) #Translate milliseconds to UTC timestamp
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', r'http\S+', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '@\w+', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', '#', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', 'RT', ''))
refined_tweets = refined_tweets.withColumn('text', regexp_replace('text', ':', ''))

In [None]:
dir = "sentiment/"
model = PipelineModel.load(dir)

In [None]:
def process_row(df, epoch_id):
    """Applies model to the df and writes data to MongoDB

    Parameters
    ----------
    df : DataFrame
        Streaming Dataframe
    epoch_id : int
        Unique id for each micro batch/epoch
    """
    predictions = model.transform(df)
    #predictions.show()
    predictions.select("timestamp_ms","text","prediction").write.format("mongo").mode("append").save()

In [None]:
#Writes streaming dataframe to ForeachBatch console which ingests data to MongoDB
refined_tweets \
    .writeStream \
    .option("checkpointLocation", "checkpoint/data") \
    .foreachBatch(process_row).start().awaitTermination()