In [1]:
from operator import add
import nltk
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql import SQLContext
import pyspark
import pyspark.sql.functions as f

conf = pyspark.SparkConf()
conf.setMaster("k8s://https://kubernetes.default:443")
conf.set("spark.kubernetes.container.image", "spark-k8s-jupyter:v3.5.0")

conf.set("spark.kubernetes.container.image.pullPolicy", "Never")

conf.set(
    "spark.kubernetes.authenticate.caCertFile",
    "/var/run/secrets/kubernetes.io/serviceaccount/ca.crt",
)

conf.set(
    "spark.kubernetes.authenticate.oauthTokenFile",
    "/var/run/secrets/kubernetes.io/serviceaccount/token",
)

conf.set("spark.kubernetes.authenticate.driver.serviceAccountName", "spark-driver")

conf.set("spark.jars", "/opt/spark/jars/postgresql-42.7.2.jar")

# 2 pods/workers will be created. Can be expanded for larger workloads.
conf.set("spark.executor.instances", "2")
conf.set("spark.executor.cores", "4")
conf.set("spark.executor.memory", "8g")

# The DNS alias for the Spark driver. Required by executors to report status.
conf.set("spark.driver.host", "jupyter-pod")

# Port which the Spark shell should bind to and to which executors will report progress
# conf.set("spark.driver.port", "20200")

# Initialize spark context, create executors
sc = pyspark.SparkContext(conf=conf)

# Create a distributed data set to test to the session
t = sc.parallelize(range(10))

# Calculate the approximate sum of values in the dataset
r = t.sumApprox(3)
print("Approximate sum: %s" % r)

24/02/29 02:04:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
[Stage 0:>                                                        (0 + 10) / 10]

Approximate sum: 45.0




In [2]:
sqlctx = SQLContext(sc)
df = (
    sqlctx.read.format("jdbc")
    .options(
        url="jdbc:postgresql://aws-0-eu-central-1.pooler.supabase.com:5432/postgres?user=postgres.wespevtgopktsywbvwyk&password=10SaPUB5pHAh8QYA",
        dbtable="public.tweets",
        numPartitions=24,
        partitionColumn="tweet_id",
        lowerBound=1_000,
        upperBound=100_000,
    )
    .load()
)



In [3]:
df.count()

                                                                                

656555

In [4]:
df.printSchema()

root
 |-- external_author_id: long (nullable = true)
 |-- author: string (nullable = true)
 |-- content: string (nullable = true)
 |-- region: string (nullable = true)
 |-- language: string (nullable = true)
 |-- publish_date: string (nullable = true)
 |-- harvested_date: string (nullable = true)
 |-- following: string (nullable = true)
 |-- followers: string (nullable = true)
 |-- updates: long (nullable = true)
 |-- post_type: string (nullable = true)
 |-- account_type: string (nullable = true)
 |-- retweet: string (nullable = true)
 |-- account_category: string (nullable = true)
 |-- new_june_2018: string (nullable = true)
 |-- alt_external_id: long (nullable = true)
 |-- tweet_id: long (nullable = true)
 |-- article_url: string (nullable = true)
 |-- tco1_step1: string (nullable = true)
 |-- tco2_step1: string (nullable = true)
 |-- tco3_step1: string (nullable = true)



In [5]:
df.limit(10).show()

                                                                                

+------------------+------+--------------------+-------+--------+---------------+---------------+---------+---------+-------+---------+------------+-------+----------------+-------------+------------------+------------------+--------------------+--------------------+----------+----------+
|external_author_id|author|             content| region|language|   publish_date| harvested_date|following|followers|updates|post_type|account_type|retweet|account_category|new_june_2018|   alt_external_id|          tweet_id|         article_url|          tco1_step1|tco2_step1|tco3_step1|
+------------------+------+--------------------+-------+--------+---------------+---------------+---------+---------+-------+---------+------------+-------+----------------+-------------+------------------+------------------+--------------------+--------------------+----------+----------+
|906000000000000000|10_GOP|"We have a sittin...|Unknown| English|10/1/2017 19:58|10/1/2017 19:59|     1052|     9636|    253|     

In [6]:
regex_remove_urls = r"https?:\/\/(www\.)?[-a-zA-Z0-9@:%._\+~#=]{1,256}\.[a-zA-Z0-9()]{1,6}\b([-a-zA-Z0-9()@:%_\+.~#?&//=]*)"
regex_remove_special_characters = r"[^A-Za-z0-9 ЁёА-я]"

df_no_special_characters = (
    df.withColumn("clean_content", f.regexp_replace("content", regex_remove_urls, " "))
    .withColumn(
        "clean_content", f.regexp_replace("clean_content", regex_remove_special_characters, " ")
    )
    .withColumn("clean_content", f.regexp_replace("clean_content", " +", " "))
    .select("clean_content")
)

tokenizer = Tokenizer(inputCol="clean_content", outputCol="words_token")
tokenized = tokenizer.transform(
    df_no_special_characters.filter(f.col("clean_content").rlike("[^0-9A-Za-z]"))
).select("clean_content", "words_token")

# Once in arrays, we can use the Apache Spark function StopWordsRemover
# A new column "words_clean" is here as an output
remover = StopWordsRemover(inputCol="words_token", outputCol="words_clean")
data_clean = remover.transform(tokenized).select("clean_content", "words_clean")
data_clean.cache()

# Final step : like in the beginning, we can group again words and sort them by the most used
result = (
    data_clean.withColumn("word", f.explode(f.col("words_clean")))
    .groupBy("word")
    .count()
    .sort("count", ascending=False)
)

print("############ most used words in the Tweets are:")
result.show()

############ most used words in the Tweets are:




+--------+------+
|    word| count|
+--------+------+
|        |173633|
|   trump| 55020|
|      rt| 42997|
|       в| 41261|
|    news| 25028|
|     new| 17243|
|      на| 16471|
|     amp| 16168|
|  police| 15789|
|     man| 15216|
|breaking| 15156|
|  people| 14824|
|   obama| 14672|
|       и| 13608|
|   video| 12536|
|      us| 12318|
|       2| 12270|
|    like| 12237|
|     one| 12142|
|     get| 12038|
+--------+------+
only showing top 20 rows



                                                                                

In [7]:
 result.filter(~f.col("word").rlike("[^A-Za-z]|^$")).show()



+---------+-----+
|     word|count|
+---------+-----+
|    trump|55020|
|       rt|42997|
|     news|25028|
|      new|17243|
|      amp|16168|
|   police|15789|
|      man|15216|
| breaking|15156|
|   people|14824|
|    obama|14672|
|    video|12536|
|       us|12318|
|     like|12237|
|      one|12142|
|      get|12038|
|  hillary|11303|
|        m|10287|
|president|10255|
| politics| 9619|
|    black| 9579|
+---------+-----+
only showing top 20 rows



                                                                                

# Vader Sentiment Analysis


In [8]:
import pandas as pd

pd.set_option("max_colwidth", None)

df_pd = data_clean.limit(10).toPandas()
df_pd

Unnamed: 0,clean_content,words_clean
0,We have a sitting Democrat US Senator on trial for corruption and you ve barely heard a peep from the mainstream media nedryun,"[, sitting, democrat, us, senator, trial, corruption, ve, barely, heard, peep, mainstream, media, nedryun]"
1,Marshawn Lynch arrives to game in anti Trump shirt Judging by his sagging pants the shirt should say Lynch vs belt,"[marshawn, lynch, arrives, game, anti, trump, shirt, judging, sagging, pants, shirt, say, lynch, vs, belt]"
2,Daughter of fallen Navy Sailor delivers powerful monologue on anthem protests burns her NFL packers gear BoycottNFL,"[daughter, fallen, navy, sailor, delivers, powerful, monologue, anthem, protests, burns, nfl, packers, gear, boycottnfl]"
3,JUST IN President Trump dedicates Presidents Cup golf tournament trophy to the people of Florida Texas and Puerto Rico,"[president, trump, dedicates, presidents, cup, golf, tournament, trophy, people, florida, texas, puerto, rico]"
4,19 000 RESPECTING our National Anthem StandForOurAnthem,"[19, 000, respecting, national, anthem, standforouranthem]"
5,Dan Bongino Nobody trolls liberals better than Donald Trump Exactly,"[dan, bongino, nobody, trolls, liberals, better, donald, trump, exactly]"
6,,[]
7,SenatorMenendez CarmenYulinCruz Doesn t matter that CNN doesn t report on your crimes This won t change the fact that you re going down,"[, senatormenendez, carmenyulincruz, doesn, matter, cnn, doesn, report, crimes, won, change, fact, re, going]"
8,As much as I hate promoting CNN article here they are admitting EVERYTHING Trump said about PR relief two days ago,"[much, hate, promoting, cnn, article, admitting, everything, trump, said, pr, relief, two, days, ago]"
9,After the genocide remark from San Juan Mayor the narrative has changed though CNN fixes it s reporting constantly,"[genocide, remark, san, juan, mayor, narrative, changed, though, cnn, fixes, reporting, constantly]"


In [9]:
from nltk.sentiment import SentimentIntensityAnalyzer
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, FloatType, IntegerType


def getSentimentScore(tweetText):
    analyzer = SentimentIntensityAnalyzer()
    vs = analyzer.polarity_scores(tweetText)
    return float(vs["compound"])


def getSentiment(score):
    return 1 if score > 0 else 0


def getCleanTweetText(filteredTweetText):
    return " ".join(filteredTweetText)

In [10]:
udfCleanTweetText = udf(getCleanTweetText, StringType())
dfFilteredCleanedTweet = data_clean.withColumn(
    "joined_clean_content", udfCleanTweetText("words_clean")
)
dfFilteredCleanedTweet.select("joined_clean_content").show(5)



+--------------------+
|joined_clean_content|
+--------------------+
| sitting democrat...|
|marshawn lynch ar...|
|daughter fallen n...|
|president trump d...|
|19 000 respecting...|
+--------------------+
only showing top 5 rows



                                                                                

In [11]:
udfSentimentScore = udf(getSentimentScore, FloatType())
dfSentimentScore = dfFilteredCleanedTweet.withColumn(
    "sentimentScore", udfSentimentScore("joined_clean_content")
)
dfSentimentScore.select("joined_clean_content", "sentimentScore").show(5)



+--------------------+--------------+
|joined_clean_content|sentimentScore|
+--------------------+--------------+
| sitting democrat...|           0.0|
|marshawn lynch ar...|       -0.3182|
|daughter fallen n...|       -0.1531|
|president trump d...|           0.0|
|19 000 respecting...|        0.4939|
+--------------------+--------------+
only showing top 5 rows



                                                                                

In [12]:
udfSentiment = udf(getSentiment, IntegerType())
dfSentiment = dfSentimentScore.withColumn("sentiment", udfSentiment("sentimentScore"))
dfSentiment.select("joined_clean_content", "sentimentScore", "sentiment").show(5)



+--------------------+--------------+---------+
|joined_clean_content|sentimentScore|sentiment|
+--------------------+--------------+---------+
| sitting democrat...|           0.0|        0|
|marshawn lynch ar...|       -0.3182|        0|
|daughter fallen n...|       -0.1531|        0|
|president trump d...|           0.0|        0|
|19 000 respecting...|        0.4939|        1|
+--------------------+--------------+---------+
only showing top 5 rows



                                                                                

In [13]:
df_small_sentimet = dfSentiment.limit(10_000)
dfPlotVaderSentiment = df_small_sentimet.groupBy("sentiment").count().toPandas()
dfPlotVaderSentiment

                                                                                

Unnamed: 0,sentiment,count
0,0,8009
1,1,1991
