# Part 1: Get Twitter tweets

## Download the data

In [1]:
!wget -O tweets.parquet https://huggingface.co/datasets/deberain/ChatGPT-Tweets/resolve/main/data/train-00000-of-00001-c77acc9ef8da1d50.parquet

--2023-05-24 09:36:59--  https://huggingface.co/datasets/deberain/ChatGPT-Tweets/resolve/main/data/train-00000-of-00001-c77acc9ef8da1d50.parquet
Resolving huggingface.co (huggingface.co)... 99.84.191.107, 99.84.191.118, 99.84.191.42, ...
Connecting to huggingface.co (huggingface.co)|99.84.191.107|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://cdn-lfs.huggingface.co/repos/1b/69/1b6969277434e07baea38ea164698d08201e25d040c449e0b9e8250f9bab86c2/415a8636ceda23f018ae5c7e0d3f21c289fab7f28837b1b601114698f9a5eaab?response-content-disposition=attachment%3B+filename*%3DUTF-8%27%27train-00000-of-00001-c77acc9ef8da1d50.parquet%3B+filename%3D%22train-00000-of-00001-c77acc9ef8da1d50.parquet%22%3B&Expires=1685180219&Policy=eyJTdGF0ZW1lbnQiOlt7IlJlc291cmNlIjoiaHR0cHM6Ly9jZG4tbGZzLmh1Z2dpbmdmYWNlLmNvL3JlcG9zLzFiLzY5LzFiNjk2OTI3NzQzNGUwN2JhZWEzOGVhMTY0Njk4ZDA4MjAxZTI1ZDA0MGM0NDllMGI5ZTgyNTBmOWJhYjg2YzIvNDE1YTg2MzZjZWRhMjNmMDE4YWU1YzdlMGQzZjIxYzI4OWZhYjdmMjg4MzdiMWI

## Save the tweets

### Install and start MongoDB

In [2]:
!apt install -qq mongodb
!service mongodb start

The following additional packages will be installed:
  libpcap0.8 libyaml-cpp0.6 mongo-tools mongodb-clients mongodb-server
  mongodb-server-core
The following NEW packages will be installed:
  libpcap0.8 libyaml-cpp0.6 mongo-tools mongodb mongodb-clients mongodb-server
  mongodb-server-core
0 upgraded, 7 newly installed, 0 to remove and 24 not upgraded.
Need to get 55.8 MB of archives.
After this operation, 226 MB of additional disk space will be used.
Selecting previously unselected package libpcap0.8:amd64.
(Reading database ... 122532 files and directories currently installed.)
Preparing to unpack .../0-libpcap0.8_1.9.1-3_amd64.deb ...
Unpacking libpcap0.8:amd64 (1.9.1-3) ...
Selecting previously unselected package libyaml-cpp0.6:amd64.
Preparing to unpack .../1-libyaml-cpp0.6_0.6.2-4ubuntu1_amd64.deb ...
Unpacking libyaml-cpp0.6:amd64 (0.6.2-4ubuntu1) ...
Selecting previously unselected package mongo-tools.
Preparing to unpack .../2-mongo-tools_3.6.3-0ubuntu1_amd64.deb ...
Unpacki

### Install pymongo

In [3]:
!pip install -q pymongo

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m492.9/492.9 kB[0m [31m5.1 MB/s[0m eta [36m0:00:00[0m
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m283.7/283.7 kB[0m [31m15.4 MB/s[0m eta [36m0:00:00[0m
[?25h

Create a dummy database to test

In [4]:
from pymongo import MongoClient
client = MongoClient()

db = client['dummy']
db['chunks'].insert_many([{'Banh xeo': 'Rat ngon'},{'Banh bao': 'Cung ngon'}])

client.list_database_names()

['admin', 'config', 'dummy', 'local']

### Install pyspark

In [5]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget https://downloads.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
!tar -xf spark-3.4.0-bin-hadoop3.tgz
!pip install findspark

--2023-05-24 09:37:53--  https://downloads.apache.org/spark/spark-3.4.0/spark-3.4.0-bin-hadoop3.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f9:3a:2c57::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 388407094 (370M) [application/x-gzip]
Saving to: ‘spark-3.4.0-bin-hadoop3.tgz’


2023-05-24 09:38:05 (29.9 MB/s) - ‘spark-3.4.0-bin-hadoop3.tgz’ saved [388407094/388407094]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


Add environment variables

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "spark-3.4.0-bin-hadoop3"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.4.0,org.apache.kafka:kafka-clients:3.4.0,org.mongodb.spark:mongo-spark-connector_2.12:10.1.1 pyspark-shell'

In [7]:
import findspark
findspark.init()
import pyspark

### Connect pyspark to mongodb

In [8]:
from pyspark.shell import spark
from pyspark import SparkContext, SparkConf

uri = "mongodb://localhost:27017/dummy"
from pyspark.sql import SparkSession

my_spark = SparkSession \
    .builder \
    .appName("csc14112") \
    .config("spark.mongodb.read.connection.uri", uri) \
    .config("spark.mongodb.write.connection.uri", uri) \
    .getOrCreate()

Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /__ / .__/\_,_/_/ /_/\_\   version 3.4.0
      /_/

Using Python version 3.10.11 (main, Apr  5 2023 14:15:10)
Spark context Web UI available at http://0863baabaf3a:4040
Spark context available as 'sc' (master = local[*], app id = local-1684921118054).
SparkSession available as 'spark'.


Test read data from our mongo db

In [9]:
p = my_spark.read.format("mongodb").option("database", "dummy").option("collection", "chunk").load()
p.printSchema()

root



In [10]:
p.show()

++
||
++
++



### Read the parquet file using pyspark

In [11]:
tweets_df = my_spark.read.parquet("tweets.parquet")
tweets_df.head()

Row(Date='2023-02-24 07:59:26+00:00', Tweet='How to hire 100x more productive team members for Free? We just interviewed and hired #chatgpt for free as a team member. \nhttps://t.co/JwlXXK6WKt', Url='https://twitter.com/smnishad/status/1629028212914245632', User='smnishad', UserCreated='2009-03-04 15:50:52+00:00', UserVerified='FALSE', UserFollowers='2524', UserFriends='4966', Retweets='0', Likes='0', Location='New Delhi, India', UserDescription='Account Planning at Adfactors Advertising')

In [12]:
tweets_df.printSchema()

root
 |-- Date: string (nullable = true)
 |-- Tweet: string (nullable = true)
 |-- Url: string (nullable = true)
 |-- User: string (nullable = true)
 |-- UserCreated: string (nullable = true)
 |-- UserVerified: string (nullable = true)
 |-- UserFollowers: string (nullable = true)
 |-- UserFriends: string (nullable = true)
 |-- Retweets: string (nullable = true)
 |-- Likes: string (nullable = true)
 |-- Location: string (nullable = true)
 |-- UserDescription: string (nullable = true)



### Write into MongoDB

In [13]:
tweets_df.write \
.format("mongodb") \
.option("database", "lab4") \
.option("collection", "tweets") \
.mode("overwrite") \
.save()

In [14]:
tweets = my_spark.read \
.format("mongodb") \
.option("database", "lab4") \
.option("collection", "tweets") \
.load()

tweets.show()

+--------------------+-----+--------------------+--------+--------------------+--------------------+---------------+--------------------+--------------------+-------------+-----------+------------+--------------------+
|                Date|Likes|            Location|Retweets|               Tweet|                 Url|           User|         UserCreated|     UserDescription|UserFollowers|UserFriends|UserVerified|                 _id|
+--------------------+-----+--------------------+--------+--------------------+--------------------+---------------+--------------------+--------------------+-------------+-----------+------------+--------------------+
|2023-01-21 08:48:...|    2|     London, England|       1|We asked #ChatGPT...|https://twitter.c...|       AskHowio|2021-10-30 05:49:...|A Progressive App...|          695|        608|       FALSE|646ddb33f4d6a7505...|
|2023-01-21 08:48:...|    0|                null|       0|Speaking of #Chat...|https://twitter.c...|      aloeyuver|2019-01-

# Part 2: Stream tweets to Apache Spark

## Install Kafka

In [15]:
!wget https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
!tar -xf kafka_2.12-3.4.0.tgz
!pip install kafka-python

--2023-05-24 09:39:53--  https://downloads.apache.org/kafka/3.4.0/kafka_2.12-3.4.0.tgz
Resolving downloads.apache.org (downloads.apache.org)... 88.99.95.219, 135.181.214.104, 2a01:4f8:10a:201a::2, ...
Connecting to downloads.apache.org (downloads.apache.org)|88.99.95.219|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 106441367 (102M) [application/x-gzip]
Saving to: ‘kafka_2.12-3.4.0.tgz’


2023-05-24 09:39:57 (26.4 MB/s) - ‘kafka_2.12-3.4.0.tgz’ saved [106441367/106441367]

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting kafka-python
  Downloading kafka_python-2.0.2-py2.py3-none-any.whl (246 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m246.5/246.5 kB[0m [31m16.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: kafka-python
Successfully installed kafka-python-2.0.2


## Run the instances

In [16]:
!./kafka_2.12-3.4.0/bin/zookeeper-server-start.sh -daemon ./kafka_2.12-3.4.0/config/zookeeper.properties
!./kafka_2.12-3.4.0/bin/kafka-server-start.sh -daemon ./kafka_2.12-3.4.0/config/server.properties
!echo "Waiting for 10 secs until kafka and zookeeper services are up and running"
!sleep 10

Waiting for 10 secs until kafka and zookeeper services are up and running


In [17]:
!ps -ef | grep kafka

root        1853    1012 99 09:38 ?        00:01:59 /usr/lib/jvm/java-8-openjdk-amd64/bin/java -cp spark-3.4.0-bin-hadoop3/conf/:/content/spark-3.4.0-bin-hadoop3/jars/* -Xmx1g -XX:+IgnoreUnrecognizedVMOptions --add-opens=java.base/java.lang=ALL-UNNAMED --add-opens=java.base/java.lang.invoke=ALL-UNNAMED --add-opens=java.base/java.lang.reflect=ALL-UNNAMED --add-opens=java.base/java.io=ALL-UNNAMED --add-opens=java.base/java.net=ALL-UNNAMED --add-opens=java.base/java.nio=ALL-UNNAMED --add-opens=java.base/java.util=ALL-UNNAMED --add-opens=java.base/java.util.concurrent=ALL-UNNAMED --add-opens=java.base/java.util.concurrent.atomic=ALL-UNNAMED --add-opens=java.base/sun.nio.ch=ALL-UNNAMED --add-opens=java.base/sun.nio.cs=ALL-UNNAMED --add-opens=java.base/sun.security.action=ALL-UNNAMED --add-opens=java.base/sun.util.calendar=ALL-UNNAMED --add-opens=java.security.jgss/sun.security.krb5=ALL-UNNAMED -Djdk.reflect.useDirectMethodHandle=false org.apache.spark.deploy.SparkSubmit --packages org.apach

## Create kafka topic

In [18]:
!./kafka_2.12-3.4.0/bin/kafka-topics.sh --create --bootstrap-server 127.0.0.1:9092 --replication-factor 1 --partitions 1 --topic tweets

Created topic tweets.


In [19]:
!./kafka_2.12-3.4.0/bin/kafka-topics.sh --describe --bootstrap-server 127.0.0.1:9092 --topic tweets

Topic: tweets	TopicId: ioyePj-CQI2DAjj0okN8Ww	PartitionCount: 1	ReplicationFactor: 1	Configs: 
	Topic: tweets	Partition: 0	Leader: 0	Replicas: 0	Isr: 0


## Producer

In [20]:
from kafka import KafkaProducer
import json
from bson import json_util
import threading
import time
import datetime

# Set up Kafka producer configuration
bootstrap_servers = ['localhost:9092']
producer = KafkaProducer(bootstrap_servers=bootstrap_servers, \
                         value_serializer=lambda v: json_util.dumps(v).encode('utf-8'))

# Stream data from MongoDB and push to Kafka
def push(producer, client):
  for doc in client["lab4"]["tweets"].find({}):
      producer.send('tweets', {"tweet": doc["Tweet"], "date": doc["Date"]})
      producer.flush()
    
p = threading.Thread(target=push, args=(producer, client, ))
p.daemon = True

## Consumer

In [21]:
# Define the Kafka topic to read from
KAFKA_TOPIC = "tweets"

# Create a streaming DataFrame that reads from Kafka
df = my_spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", KAFKA_TOPIC) \
    .option("startingOffsets", "earliest") \
    .load()

In [22]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [23]:
from pyspark.sql.functions import from_csv, from_json, window, col, count, sum, udf, avg
from pyspark.sql.types import StringType, TimestampType, StructType, FloatType, DateType, IntegerType

schema = StructType() \
        .add("tweet", StringType()) \
        .add("date", TimestampType())

a = df.selectExpr("CAST(value AS STRING)") \
  .select(from_json("value", schema).alias("data")) \
  .select("data.*")

a.printSchema()

root
 |-- tweet: string (nullable = true)
 |-- date: timestamp (nullable = true)



## Perform sentiment analysis on tweets

In [24]:
df_schema = StructType() \
            .add('date', DateType()) \
            .add('score_sum', FloatType()) \
            .add('score_count', IntegerType()) \
            .add('score_avg', FloatType())

GLOBAL_DF = my_spark.createDataFrame([], df_schema)
GLOBAL_DF.printSchema()

root
 |-- date: date (nullable = true)
 |-- score_sum: float (nullable = true)
 |-- score_count: integer (nullable = true)
 |-- score_avg: float (nullable = true)



In [25]:
from textblob import TextBlob

def sentiment_analysis(text):
    return TextBlob(text).sentiment.polarity

convertUDF = udf(lambda z: sentiment_analysis(z), FloatType())

def foreach_batch_function(df, epoch_id):
    global GLOBAL_DF
    temp_df = (
        df.select(col("date"), convertUDF(col("tweet")).alias("score"))
        .groupBy(window("date", "1 day"))
        .agg(
            count("score").alias("score_count"),
            sum("score").alias("score_sum"),
            avg("score").alias("score_avg"),
        )
        .select(
            col("window").start.cast(DateType()).alias("date"),
            col("score_sum"),
            col("score_count"),
            col("score_avg"),
        )
    )
    GLOBAL_DF = (
        GLOBAL_DF.unionAll(temp_df)
        .groupBy("date")
        .agg(
            sum("score_sum").alias("score_sum"), sum("score_count").alias("score_count")
        )
        .withColumn("score_avg", col("score_sum") / col("score_count"))
        .orderBy("date")
    )
    GLOBAL_DF.show()

## Visualize the analytic results

In [26]:
!pip install jupyter-dash

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting jupyter-dash
  Downloading jupyter_dash-0.4.2-py3-none-any.whl (23 kB)
Collecting dash (from jupyter-dash)
  Downloading dash-2.9.3-py3-none-any.whl (10.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.2/10.2 MB[0m [31m55.8 MB/s[0m eta [36m0:00:00[0m
Collecting retrying (from jupyter-dash)
  Downloading retrying-1.3.4-py3-none-any.whl (11 kB)
Collecting ansi2html (from jupyter-dash)
  Downloading ansi2html-1.8.0-py3-none-any.whl (16 kB)
Collecting dash-html-components==2.0.0 (from dash->jupyter-dash)
  Downloading dash_html_components-2.0.0-py3-none-any.whl (4.1 kB)
Collecting dash-core-components==2.0.0 (from dash->jupyter-dash)
  Downloading dash_core_components-2.0.0-py3-none-any.whl (3.8 kB)
Collecting dash-table==5.0.0 (from dash->jupyter-dash)
  Downloading dash_table-5.0.0-py3-none-any.whl (3.9 kB)
Collecting jedi>=0.16 (from ipython->jupyter

In [27]:
from jupyter_dash import JupyterDash
from dash import dcc, html
from dash.dependencies import Input, Output
import plotly.graph_objs as go
import random

app = JupyterDash(__name__)
app.layout = html.Div(
    [
        dcc.Graph(id="live-graph", animate=True),
        dcc.Interval(id="graph-update", interval=1 * 60000),
    ]
)

@app.callback(Output("live-graph", "figure"), [Input("graph-update", "n_intervals")])
def update_graph_scatter(input_data):
    global GLOBAL_DF
    df = GLOBAL_DF.toPandas()
    data = go.Scatter(x=df["date"], y=df["score_avg"], name="Scatter", mode="lines+markers")

    return {
        "data": [data],
        "layout": go.Layout(),
    }

## Run the job

In [28]:
app.run_server(port=8081, debug=True, mode='inline') # dash server

Dash is running on http://127.0.0.1:8081/



INFO:dash.dash:Dash is running on http://127.0.0.1:8081/



<IPython.core.display.Javascript object>

In [29]:
p.start() # producer
query = a.writeStream.foreachBatch(foreach_batch_function).start() # consumer and analysis
query.awaitTermination(1200) # run for 20 mins

+----------+-----------------+-----------+-------------------+
|      date|        score_sum|score_count|          score_avg|
+----------+-----------------+-----------+-------------------+
|2023-01-21|66.88184505095705|        512| 0.1306286036151505|
|2023-02-24|61.55293934466317|        500|0.12310587868932635|
+----------+-----------------+-----------+-------------------+

+----------+------------------+-----------+-------------------+
|      date|         score_sum|score_count|          score_avg|
+----------+------------------+-----------+-------------------+
|2023-01-19|26.440381718799472|        191| 0.1384313178994737|
|2023-01-20| 502.6024760621367|       3727|0.13485443414599857|
|2023-01-21|121.58283421001397|        921|0.13201176352878824|
|2023-02-23|508.87416847457644|       4066| 0.1251535092165707|
|2023-02-24|120.75347741739824|       1054|0.11456686661992244|
+----------+------------------+-----------+-------------------+

+----------+------------------+-----------+-

False