# Twitter Sentiment Analysis for the word Euro

Connect Spark to Mongo DB

In [117]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars "/usr/local/spark/jars/mongo-spark-connector_2.12-3.0.2.jar,/usr/local/spark/jars/mongo-java-driver-3.12.9.jar" pyspark-shell'

Note the inferschema is set to false, this makes spark read the entire database and not infer the values of fields from the first set of fields

In [118]:
from pyspark.sql import SparkSession
# fix read bug, basically turn off sampling
spark = SparkSession.builder.appName("TwitterMongo") \
.config("spark.mongodb.input.database", "mongodb://localhost:27017/twitter") \
.config("spark.mongodb.input.uri", "mongodb://localhost:27017/twitter.tweets") \
.config("spark.mongodb.read.sql.inferSchema.mapTypes.enabled", "FALSE") \
.config("spark.mongodb.output.uri","mongodb://localhost:27017/twitter.tweets").getOrCreate()

## Create the Session

And load all of the Twitter data in MongoDB

Print out the twitter tweet schema

In [119]:
# create a spark session
spark = SparkSession \
.builder \
.master("local") \
.appName("ABC") \
.config("spark.driver.memory", "15g") \
.config("spark.mongodb.read.connection.uri", "mongodb://localhost:27017/twitter") \
.config("spark.mongodb.write.connection.uri", "mongodb://localhost:27017/twitter") \
.config('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector:2.12-3.0.2') \
.getOrCreate()
# read data from mongodb collection "questions" into a dataframe "df"
df = spark.read \
.format("com.mongodb.spark.sql.DefaultSource") \
.option("uri", "mongodb://localhost:27017/twitter") \
.option("database", "twitter") \
.option("collection", "tweets") \
.load()
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- oid: string (nullable = true)
 |-- contributors: void (nullable = true)
 |-- coordinates: void (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: integer (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: integer (containsNull = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- embeddable: boolean (nullable = true)
 |    |    |    |    |-- monetizable: boolean (nullable = true)
 |    |    |    |    |-- title: string (

In [120]:
df.show()

2023-05-14 15:56:39,591 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1310.3 KiB
[Stage 358:>                                                        (0 + 1) / 1]

+--------------------+------------+-----------+--------------------+------------------+--------------------+--------------------+--------------------+--------------+---------+------------+----+-------------------+-------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+-----+------------------+-----------+--------------------+-------------------+--------------------+-----------------------+-----------+-------------+---------+--------------------+--------------------+-------------------------------+-------------+---------+--------------------+---------------------+
|                 _id|contributors|coordinates|          created_at|display_text_range|            entities|   extended_entities|      extended_tweet|favorite_count|favorited|filter_level| geo|                 id|             id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_r

                                                                                

Create a spark object of the tweets held in the mongo db 

It is easier to use SQL statements and Pyspark to clean the data rather than writing queries in MongoDB

In [121]:
df.createOrReplaceTempView("tweets")

How many tweets in the DB all together

In [122]:
df = spark.sql("SELECT DISTINCT id FROM tweets")
df.count()

                                                                                

825219

How many tweets by language 


In [123]:
#pip install plotly
import pyspark.pandas as ps
import plotly
dfLang = spark.sql("SELECT DISTINCT lang, CAST(count(id) AS INT) as TweetCount FROM tweets GROUP BY lang \
                   ORDER BY TweetCount DESC LIMIT 10")
dfLang.show()



+----+----------+
|lang|TweetCount|
+----+----------+
|  en|    256216|
|  es|    122069|
|  it|     75037|
|  tr|     72788|
|  fr|     69589|
|  de|     61316|
|  nl|     37689|
| und|     30173|
|  pt|     29393|
|  pl|     18116|
+----+----------+



                                                                                

In [124]:
tempdf = ps.DataFrame(dfLang)

tempdf.plot(kind='bar', x='lang', y='TweetCount')

2023-05-14 15:56:48,668 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:56:48,669 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:56:51,936 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:56:51,937 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

How many tweets by location

In [125]:
dfLoc = spark.sql("SELECT DISTINCT user.location AS Location, CAST(count(id) AS INT) as TweetCount FROM tweets GROUP BY user.location \
                   ORDER BY TweetCount DESC LIMIT 10")
dfLoc.show()



+-----------------+----------+
|         Location|TweetCount|
+-----------------+----------+
|             null|    344908|
|İstanbul, Türkiye|      4911|
|           France|      4338|
|           Italia|      3980|
|           España|      3908|
|           London|      3496|
|          Türkiye|      3374|
|      Deutschland|      2791|
|    Paris, France|      2710|
|  London, England|      2690|
+-----------------+----------+



                                                                                

In [126]:
tempdf = ps.DataFrame(dfLoc)

tempdf.plot(kind='bar', x='Location', y='TweetCount')

2023-05-14 15:57:05,343 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:57:05,344 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:57:17,745 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:57:17,746 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Now limit the dataset to English texts and tweets with the Euro in the text

In [127]:
dfEnTwt = spark.sql("SELECT * FROM tweets WHERE lang = 'en' AND text LIKE '%euro%'")
dfEnTwt.createOrReplaceTempView("en_tweets")
dfEnTwt.show()

2023-05-14 15:57:20,331 WARN scheduler.DAGScheduler: Broadcasting large task binary with size 1313.5 KiB
[Stage 379:>                                                        (0 + 1) / 1]

+--------------------+------------+-----------+--------------------+------------------+--------------------+--------------------+--------------------+--------------+---------+------------+----+-------------------+-------------------+-----------------------+---------------------+-------------------------+-------------------+-----------------------+---------------+----+-----+------------------+-----------+--------------------+-------------------+--------------------+-----------------------+-----------+-------------+---------+--------------------+--------------------+--------------------+-------------+---------+--------------------+---------------------+
|                 _id|contributors|coordinates|          created_at|display_text_range|            entities|   extended_entities|      extended_tweet|favorite_count|favorited|filter_level| geo|                 id|             id_str|in_reply_to_screen_name|in_reply_to_status_id|in_reply_to_status_id_str|in_reply_to_user_id|in_reply_to_use

                                                                                

Now how many tweets in the English language dataset

In [128]:
dfEnTwt.count()

                                                                                

25818

In [129]:
dfLoc = spark.sql("SELECT DISTINCT user.location AS Location, CAST(count(id) AS INT) as TweetCount FROM en_tweets GROUP BY user.location \
                   ORDER BY TweetCount DESC LIMIT 10")
dfLoc.show()



+--------------------+----------+
|            Location|TweetCount|
+--------------------+----------+
|                null|     10214|
|               Spain|       413|
|     London, England|       178|
|      United Kingdom|       152|
|             Ireland|       140|
|       Costa del Sol|       124|
|       United States|       121|
|             she/her|       114|
|              London|       107|
|England, United K...|       106|
+--------------------+----------+



                                                                                

In [130]:
# select tweet id, geo, lang, quoted_status,quoted_status.geo 
# Having a look at some the data
dfOne = spark.sql("SELECT DISTINCT id,  text, quote_count, reply_count, retweet_count, favorite_count, geo, place, lang, \
                  quoted_status,quoted_status.geo, quoted_status.text, user.name, user.location   \
                  FROM en_tweets")

In [131]:
dfOne.show()

2023-05-14 15:57:32,502 ERROR executor.Executor: Exception in task 2.0 in stage 386.0 (TID 2269)
com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast DOCUMENT into a NullType (value: {"type": "Point", "coordinates": [6.45676667, 3.51362833]})
	at com.mongodb.spark.sql.MapFunctions$.convertToDataType(MapFunctions.scala:220)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$documentToRow$1(MapFunctions.scala:37)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at com.mongodb.spark.sql.MapFunctions$.documentToRow(

Py4JJavaError: An error occurred while calling o3267.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 2 in stage 386.0 failed 1 times, most recent failure: Lost task 2.0 in stage 386.0 (TID 2269) (sba22230-Virtual-Machine.mshome.net executor driver): com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast DOCUMENT into a NullType (value: {"type": "Point", "coordinates": [6.45676667, 3.51362833]})
	at com.mongodb.spark.sql.MapFunctions$.convertToDataType(MapFunctions.scala:220)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$documentToRow$1(MapFunctions.scala:37)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at com.mongodb.spark.sql.MapFunctions$.documentToRow(MapFunctions.scala:35)
	at com.mongodb.spark.sql.MongoRelation.$anonfun$buildScan$5(MongoRelation.scala:58)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:192)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:364)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:122)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:96)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2454)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2403)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2402)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2402)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1160)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1160)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2642)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2584)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2573)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: com.mongodb.spark.exceptions.MongoTypeConversionException: Cannot cast DOCUMENT into a NullType (value: {"type": "Point", "coordinates": [6.45676667, 3.51362833]})
	at com.mongodb.spark.sql.MapFunctions$.convertToDataType(MapFunctions.scala:220)
	at com.mongodb.spark.sql.MapFunctions$.$anonfun$documentToRow$1(MapFunctions.scala:37)
	at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at scala.collection.TraversableLike.map(TraversableLike.scala:286)
	at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
	at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
	at com.mongodb.spark.sql.MapFunctions$.documentToRow(MapFunctions.scala:35)
	at com.mongodb.spark.sql.MongoRelation.$anonfun$buildScan$5(MongoRelation.scala:58)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$10.next(Iterator.scala:461)
	at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:514)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.processInputs(TungstenAggregationIterator.scala:192)
	at org.apache.spark.sql.execution.aggregate.TungstenAggregationIterator.<init>(TungstenAggregationIterator.scala:364)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1(HashAggregateExec.scala:122)
	at org.apache.spark.sql.execution.aggregate.HashAggregateExec.$anonfun$doExecute$1$adapted(HashAggregateExec.scala:96)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2(RDD.scala:915)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsWithIndex$2$adapted(RDD.scala:915)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:99)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:52)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:506)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1491)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:509)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)


2023-05-14 15:57:32,530 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 386.0 (TID 2268) (sba22230-Virtual-Machine.mshome.net executor driver): TaskKilled (Stage cancelled)


In [None]:

dfDay= spark.sql("SELECT DISTINCT CAST(substring(created_at, 27, 4) AS INT) as Year, \
          CAST(from_unixtime(unix_timestamp(substring(created_at, 5, 3), 'MMM'), 'MM') As INT) as Month, \
          CAST(substring(created_at, 9, 2) AS INT) as Day, \
          CAST(count(id) AS INT) as TweetCount \
          FROM en_tweets \
          WHERE lang = 'en' AND text LIKE '%euro%' GROUP BY substring(created_at, 27, 4), \
          substring(created_at, 5, 3), \
          substring(created_at, 9, 2)")

dfDay.createOrReplaceTempView("tweetsByDay")

dfDay = spark.sql("SELECT CONCAT(Year, '_', Month, '_', DAY) AS Date, TweetCount  FROM tweetsByDay ORDER BY Year, Month, Day")

dfDay.show()




+---------+----------+
|     Date|TweetCount|
+---------+----------+
| 2021_1_1|        24|
| 2021_1_2|        21|
| 2021_1_3|        24|
| 2021_1_4|        53|
| 2021_1_5|        24|
| 2021_1_6|        14|
|2021_1_26|        31|
|2021_1_27|        38|
|2021_1_28|        33|
|2021_1_29|        38|
|2021_1_30|        16|
|2021_1_31|        38|
| 2021_2_1|        23|
| 2021_2_2|        38|
| 2021_2_3|        34|
| 2021_2_4|        35|
| 2021_2_5|        25|
| 2021_2_6|        19|
| 2021_2_7|        32|
| 2021_2_8|        26|
+---------+----------+
only showing top 20 rows



                                                                                

In [None]:
tempdf = ps.DataFrame(dfDay)

tempdf.plot(kind='bar', x='Date', y='TweetCount')

2023-05-14 15:36:24,925 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:24,926 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:27,722 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:27,768 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:27,831 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:27,831 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to 

In [None]:
dfWeek = spark.sql("SELECT Year, weekofyear(make_date(Year, Month, Day)) as wkofYr , SUM(TweetCount) as TweetCountbyWeek \
                   FROM tweetsByDay \
                   GROUP BY Year, weekofyear(make_date(Year, Month, Day))")

dfWeek.createOrReplaceTempView("tweetsByWeek")
dfWeek = spark.sql("SELECT Year, wkofYr, CONCAT(Year, '_', wkofYr) AS yr_wk, TweetCountbyWeek  FROM tweetsByWeek ORDER BY Year, wkofYr")
dfWeek.show()



+----+------+-------+----------------+
|Year|wkofYr|  yr_wk|TweetCountbyWeek|
+----+------+-------+----------------+
|2021|     1| 2021_1|              91|
|2021|     4| 2021_4|             194|
|2021|     5| 2021_5|             206|
|2021|     6| 2021_6|             194|
|2021|     7| 2021_7|             212|
|2021|     8| 2021_8|             192|
|2021|     9| 2021_9|             137|
|2021|    10|2021_10|             166|
|2021|    11|2021_11|             165|
|2021|    12|2021_12|             212|
|2021|    13|2021_13|             169|
|2021|    14|2021_14|             138|
|2021|    17|2021_17|             128|
|2021|    18|2021_18|             225|
|2021|    19|2021_19|             145|
|2021|    20|2021_20|             264|
|2021|    21|2021_21|             297|
|2021|    22|2021_22|             221|
|2021|    23|2021_23|             486|
|2021|    24|2021_24|             437|
+----+------+-------+----------------+
only showing top 20 rows



                                                                                

In [None]:
tempdf = ps.DataFrame(dfWeek)

tempdf.plot(kind='bar', x='yr_wk', y='TweetCountbyWeek')

2023-05-14 15:36:31,159 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:31,161 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:33,999 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:34,046 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:34,298 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:34,374 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to 

In [None]:
# get the tweet count by month
dfMonth = spark.sql("SELECT Year, Month, SUM(TweetCount) as TweetCountbyMonth \
                     FROM tweetsByDay GROUP BY Year, Month")
dfMonth.createOrReplaceTempView("tweetsByMonth")
dfMonth = spark.sql("SELECT Year, Month, CONCAT(Year, '_', Month) AS MonthYr, TweetCountbyMonth  FROM tweetsByMonth ORDER BY Year, Month ")
dfMonth.show()



+----+-----+-------+-----------------+
|Year|Month|MonthYr|TweetCountbyMonth|
+----+-----+-------+-----------------+
|2021|    1| 2021_1|              354|
|2021|    2| 2021_2|              804|
|2021|    3| 2021_3|              756|
|2021|    4| 2021_4|              324|
|2021|    5| 2021_5|              995|
|2021|    6| 2021_6|             2354|
|2021|    7| 2021_7|             4098|
|2021|    8| 2021_8|              767|
|2021|    9| 2021_9|              806|
|2021|   10|2021_10|              711|
|2021|   11|2021_11|              471|
|2021|   12|2021_12|              831|
|2022|    1| 2022_1|              437|
|2022|    2| 2022_2|              373|
|2022|    3| 2022_3|             1495|
|2022|    4| 2022_4|             1438|
|2022|    5| 2022_5|             1375|
|2022|    6| 2022_6|             1163|
|2022|    7| 2022_7|             2405|
|2022|    8| 2022_8|              937|
+----+-----+-------+-----------------+
only showing top 20 rows



                                                                                

In [None]:
tempdf = ps.DataFrame(dfMonth)

tempdf.plot(kind='bar', x='MonthYr', y='TweetCountbyMonth')

2023-05-14 15:36:38,617 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:38,619 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:41,386 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:41,442 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:41,478 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
2023-05-14 15:36:41,531 WARN window.WindowExec: No Partition Defined for Window operation! Moving all data to 

### Next up is text clean up 

In [135]:
#%pip install wordcloud
#%pip install vadersentiment
## sentiment analysis ref: https://www.geeksforgeeks.org/python-sentiment-analysis-using-vader/?ref=lbp
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# function to print sentiments
# of the sentence.
def sentiment_scores(sentence):
 
    # Create a SentimentIntensityAnalyzer object.
    sid_obj = SentimentIntensityAnalyzer()
 
    # polarity_scores method of SentimentIntensityAnalyzer
    # object gives a sentiment dictionary.
    # which contains pos, neg, neu, and compound scores.
    sentiment_dict = sid_obj.polarity_scores(sentence)
     
    print("Overall sentiment dictionary is : ", sentiment_dict)
    #print("sentence was rated as ", sentiment_dict['neg']*100, "% Negative")
    #print("sentence was rated as ", sentiment_dict['neu']*100, "% Neutral")
    #print("sentence was rated as ", sentiment_dict['pos']*100, "% Positive")
 
    #print("Sentence Overall Rated As", end = " ")
 
    # decide sentiment as positive, negative and neutral
    if sentiment_dict['compound'] >= 0.05 :
        print("Positive")
 
    elif sentiment_dict['compound'] <= - 0.05 :
       print("Negative")
 
    else :
        print("Neutral")
    
    return sentiment_dict['compound']

In [134]:
dfText = spark.sql("SELECT DISTINCT id, text AS text FROM en_tweets")
dfText.createOrReplaceTempView("text")
dfText.show()


                                                                                

+-------------------+--------------------+
|                 id|                text|
+-------------------+--------------------+
|1345422585002160149|I really like Wol...|
|1355814040275091467|RT @TheEconomist:...|
|1356840789104619521|Top story: Spotif...|
|1346785087887728640|It costs 0 $/rs/p...|
|1354474836723163139|RT @LauraHuhtasaa...|
|1354744404653858817|RT @MECenquiries:...|
|1355037259372883968|For Philly, euro ...|
|1356308640983089153|RT @PolaLem: Univ...|
|1345412531263852545|A whooping 1,3 tr...|
|1355164539705290767|RT @stpaddyofassi...|
|1345022138022064129|@engrare Don't fo...|
|1346147180361474053|@vileblackouts im...|
|1355833896093540352|RT @TheEconomist:...|
|1358570922719006722|@bet365 so i have...|
|1344997718763167744|Why have we not b...|
|1346804314568847361|RT @euromaestro: ...|
|1353987471218978817|RT @BIS_org: The ...|
|1345758351615995906|I spend 15 euro's...|
|1346326243596095488|@altieuro Wht do ...|
|1358467600225828870|My kind, thoughtf...|
+----------

Count the number of words in the tweets 

In [136]:
from pyspark.sql.functions import * 
from pyspark.sql.types import StringType, ArrayType
# heavy reliance on SQL functions in the following code

dfWord = dfText.withColumn("Word", explode(split(col("text"), ' '))).groupBy("Word").count().orderBy(desc("count"))

dfWord.show()



+------------+-----+
|        Word|count|
+------------+-----+
|         the|15923|
|          RT|13707|
|        euro|13091|
|          to| 8480|
|           a| 7288|
|         and| 6068|
|          in| 6065|
|          of| 5711|
|         for| 5186|
|          is| 4801|
|          on| 3108|
|@vivo_europe| 2993|
|   @EURO2020| 2992|
|         you| 2651|
|           I| 2525|
|        that| 2101|
|         The| 2065|
|          at| 2047|
|          by| 1838|
|            | 1804|
+------------+-----+
only showing top 20 rows



                                                                                

Count the number of characters including spaces

In [137]:
dfChar = spark.sql("SELECT text, LENGTH(text) AS char FROM text ORDER BY char DESC")
dfChar.show(5)



+--------------------+----+
|                text|char|
+--------------------+----+
|RT @ChemSystemsCh...| 156|
|Internationals (w...| 155|
|RT @GA_Contest: $...| 152|
|RT @eurociu: Voca...| 152|
|RT @trade1311: pe...| 152|
+--------------------+----+
only showing top 5 rows



                                                                                

Check for special characters i.e. Hashtags

In [138]:
dfSpecChar = spark.sql("SELECT text, regexp_extract_all(text, '(#\\\\w+)', 1) AS Hashtags FROM text WHERE text like '%#%' ")
dfSpecChar.show()

[Stage 405:>                                                        (0 + 1) / 1]

+--------------------+--------------------+
|                text|            Hashtags|
+--------------------+--------------------+
|Daily Best &amp; ...|[#performers, #co...|
|Are you intereste...|[#euro, #microbio...|
|No reprieve for t...|  [#forex, #trading]|
|RT @KfW_int: Two-...|           [#Europe]|
|RT @eurovanya: Mo...|       [#Eurovision]|
|RT @scotlandcoerv...|         [#euro2020]|
|RT @nanon_diary: ...|[#MyPrecious, #my...|
|RT @EXOLAceTeamPH...|[#DONT_FIGHT_THE_...|
|RT @nexta_tv: 🇱?...|[#Lithuania, #Ukr...|
|@EURO2020 @vivo_e...|[#BTS, #Butter, #...|
|RT @EURO2020: For...|         [#EURO2020]|
|@WEURO2022 @euron...|        [#WEURO2022]|
|RT @euro_pinkploy...|  [#KAZZ2022xEUPINK]|
|RT @stasisnet: 💶...|       [#euro, #sta]|
|RT @stasisnet: 💶...|       [#euro, #sta]|
|RT @stasisnet: 💶...|       [#euro, #sta]|
|RT @stasisnet: 💶...|       [#euro, #sta]|
|RT @classic_jerse...|[#COYBIG, #ybig, ...|
|#UN had a vote on...|      [#UN, #racism]|
|RT @euroweeklynew...|        [#World

                                                                                

Check for upper case 

In [139]:
dfUpper = spark.sql("SELECT text FROM text WHERE translate(text, 'ABCDEFGHIJKLMNOPQRSTUVWXYZ', '') <> text")
dfUpper.show()




+--------------------+
|                text|
+--------------------+
|I really like Wol...|
|It costs 0 $/rs/p...|
|A whooping 1,3 tr...|
|@engrare Don't fo...|
|Why have we not b...|
|RT @euromaestro: ...|
|I spend 15 euro's...|
|@altieuro Wht do ...|
|Peugeot 107 1.0 6...|
|RT @Cointelegraph...|
|Woman In Portugal...|
|@eurowtff I order...|
|RT @euroweeklynew...|
|RT @WorldWideWob:...|
|Volkswagen Fox 1....|
|@Louis_Tomlinson ...|
|RT @AntonSpisak: ...|
|RT @EHFEURO: BUIL...|
|RT @TeddyCleps: S...|
|@illiam_william T...|
+--------------------+
only showing top 20 rows



                                                                                

Check for numbers 

In [140]:
dfNum = spark.sql("SELECT text FROM text WHERE translate(text, '0123456789', '') <> text")
dfNum.show()



+--------------------+
|                text|
+--------------------+
|RT @TheEconomist:...|
|Top story: Spotif...|
|It costs 0 $/rs/p...|
|RT @MECenquiries:...|
|@nglinsman @amliv...|
|Daily Best &amp; ...|
|@gazzacritch86 We...|
|For Philly, euro ...|
|RT @PolaLem: Univ...|
|A whooping 1,3 tr...|
|RT @fab_moran: St...|
|@vileblackouts im...|
|RT @TheEconomist:...|
|@bet365 so i have...|
|I remember my bro...|
|Hosestly @Revolut...|
|Woke up to some A...|
|Why have we not b...|
|RT @thepainterfly...|
|@SJK415 @SimonLaf...|
+--------------------+
only showing top 20 rows



                                                                                

In [141]:

pattern = r'[^a-zA-Z0-9\s]'

dfText = spark.sql("SELECT * FROM text")
dfText = dfText.withColumn("text", regexp_replace('text', pattern, ''))
dfText.take(5)

                                                                                

[Row(id=1345422585002160149, text='I really like Wolves this season hoping they can push for euros again'),
 Row(id=1346785087887728640, text='It costs 0 rspesoeuropound to admit that youre biased for a player No one will send d you jail Oh sorry my httpstcoO5eXgj5xoR'),
 Row(id=1354474836723163139, text='RT LauraHuhtasaari I wonder if the defects in the euro are intentional because they serve a purpose  accelerate federal development whe'),
 Row(id=1354744404653858817, text='RT MECenquiries study in poland and get access to the rest of europe through your schenghen visa\nActNow httpstcofqOg4UCeEd'),
 Row(id=1355037259372883968, text='For Philly euro is barely 2  Not the run for snow lovers')]

In [142]:
dfText.createOrReplaceTempView("text") # this is cleaned txt
dfText = spark.sql("SELECT LOWER(TRIM(text)) AS text FROM text")

dfText.take(5)

                                                                                

[Row(text='i really like wolves this season hoping they can push for euros again'),
 Row(text='rt theeconomist the eus 750bn recovery fund is being disbursed too slowly so there is still too little stimulus in the euro area https'),
 Row(text='top story spotify sube un euro el precio del plan familiar  androidsis httpstcomvwqtmpc6w see more httpstcodtoynfgg07'),
 Row(text='rt lettertojack portugal are going to the next euro and the world cup with arguably the most balanced international team and top 3 most'),
 Row(text='rt gemhostofficial 25  350k  ends in 3 hours \n\n rt amp follow eurofloki  rt their')]

In [143]:
#now build a word cloud
from wordcloud import WordCloud, STOPWORDS
dfText.createOrReplaceTempView("text") # this is the trimmed txt
dfText = spark.sql("SELECT text FROM text")
dfText.show(5)

+--------------------+
|                text|
+--------------------+
|i really like wol...|
|rt theeconomist t...|
|top story spotify...|
|rt lettertojack p...|
|rt gemhostofficia...|
+--------------------+
only showing top 5 rows



                                                                                

### Tokenize and Stem the tweets

In [144]:
#%pip install nltk

In [145]:
# stackoverflow ref: https://stackoverflow.com/questions/53579444

from pyspark.ml.feature import Tokenizer, StopWordsRemover, CountVectorizer, IDF, StringIndexer
from nltk.stem.snowball import SnowballStemmer

In [146]:
# Tokenize text
tokenizer = Tokenizer(inputCol="text", outputCol="tokens")
dfText = tokenizer.transform(dfText).select("text","tokens")

dfText.show(5)



+--------------------+--------------------+
|                text|              tokens|
+--------------------+--------------------+
|i really like wol...|[i, really, like,...|
|it costs 0 rspeso...|[it, costs, 0, rs...|
|rt laurahuhtasaar...|[rt, laurahuhtasa...|
|rt mecenquiries s...|[rt, mecenquiries...|
|for philly euro i...|[for, philly, eur...|
+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [147]:
# Remove stopwords
remover = StopWordsRemover(inputCol="tokens", outputCol="filtered")
dfText = remover.transform(dfText).select("text","tokens","filtered")

dfText.show(5)



+--------------------+--------------------+--------------------+
|                text|              tokens|            filtered|
+--------------------+--------------------+--------------------+
|i really like wol...|[i, really, like,...|[really, like, wo...|
|rt theeconomist t...|[rt, theeconomist...|[rt, theeconomist...|
|top story spotify...|[top, story, spot...|[top, story, spot...|
|it costs 0 rspeso...|[it, costs, 0, rs...|[costs, 0, rspeso...|
|rt laurahuhtasaar...|[rt, laurahuhtasa...|[rt, laurahuhtasa...|
+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [148]:
# stem the words
stemmer = SnowballStemmer(language='english')
stemmer_udf = udf(lambda tokens: [stemmer.stem(token) for token in tokens], ArrayType(StringType()))
dfText = dfText.withColumn("filtered_stemmed", stemmer_udf("filtered"))
dfText.show(5)



+--------------------+--------------------+--------------------+--------------------+
|                text|              tokens|            filtered|    filtered_stemmed|
+--------------------+--------------------+--------------------+--------------------+
|i really like wol...|[i, really, like,...|[really, like, wo...|[realli, like, wo...|
|it costs 0 rspeso...|[it, costs, 0, rs...|[costs, 0, rspeso...|[cost, 0, rspesoe...|
|a whooping 13 tri...|[a, whooping, 13,...|[whooping, 13, tr...|[whoop, 13, trill...|
|engrare dont forg...|[engrare, dont, f...|[engrare, dont, f...|[engrar, dont, fo...|
|vileblackouts im ...|[vileblackouts, i...|[vileblackouts, i...|[vileblackout, im...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [149]:
# Filter out short words
filterShortWords = udf(lambda row: [x for x in row if len(x) >= 4], ArrayType(StringType()))
dfText = dfText.withColumn("filtered_stemmed", filterShortWords("filtered_stemmed"))

dfText.show(5)


[Stage 434:>                                                        (0 + 1) / 1]

+--------------------+--------------------+--------------------+--------------------+
|                text|              tokens|            filtered|    filtered_stemmed|
+--------------------+--------------------+--------------------+--------------------+
|i really like wol...|[i, really, like,...|[really, like, wo...|[realli, like, wo...|
|rt theeconomist t...|[rt, theeconomist...|[rt, theeconomist...|[theeconomist, 75...|
|top story spotify...|[top, story, spot...|[top, story, spot...|[stori, spotifi, ...|
|it costs 0 rspeso...|[it, costs, 0, rs...|[costs, 0, rspeso...|[cost, rspesoeuro...|
|rt laurahuhtasaar...|[rt, laurahuhtasa...|[rt, laurahuhtasa...|[laurahuhtasaari,...|
+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows



                                                                                

In [150]:
dfText.count()


                                                                                

25754