In [1]:
# import findspark
import findspark
# initialize findspark with spark directory
findspark.init("C:\BigData\BigData\spark-3.1.2-bin-hadoop3.2")
# import pyspark
import pyspark
# create spark context
sc = pyspark.SparkContext()
# create spark session 
spark = pyspark.sql.SparkSession(sc)

In [2]:
# import packages
import os 
import pickle

import re
from datetime import datetime

import requests

import pytz

import pandas as pd
import numpy as np

import ast


import pyspark.sql.functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.functions import array_contains


import matplotlib.pyplot as plt
import seaborn as sns

import plotly.graph_objects as go
from plotly.subplots import make_subplots
import plotly.express as px




In [3]:
spark

In [4]:
#set this path to your path, for some reason I have an error 
#reading in all the files
path_json = ".././../data/Topic_vegan/*.json"

In [5]:
df_json = spark.read.json(path_json)

In [6]:
# check the schema of our json dataframe
df_json.printSchema()

root
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |

# The volume of the tweets

First we will look at the total number of tweets

In [7]:
# select interesting features 
twitter_sub = df_json.select(F.col("user.name"),
                                F.col("user.screen_name"),
                                F.col("created_at"), 
                                F.col("full_text"),
                                F.col("user.followers_count"),
                                F.col("entities.hashtags"))

In [8]:
# check
twitter_sub.limit(5).toPandas()

Unnamed: 0,name,screen_name,created_at,full_text,followers_count,hashtags
0,のり/Nori,nori_k_629,Mon Apr 04 10:09:55 +0000 2022,RT @ohmpawatt: เพื่อนๆคิดถึงผมมั้ยยย ถ้าคิดถึง...,139,[]
1,alice,myn4meizalize,Mon Apr 04 10:09:54 +0000 2022,RT @mynameisnanon: คิดถึงกันป่าว ถ้าคิดถึงต้อง...,655,[]
2,Karen Reed 🌸,kandk670,Mon Apr 04 10:09:54 +0000 2022,@trudiebakescake Organic coconut oil in a jar ...,711,[]
3,ハル):),patlnwza55,Mon Apr 04 10:09:52 +0000 2022,RT @ohmpawatt: เพื่อนๆคิดถึงผมมั้ยยย ถ้าคิดถึง...,236,[]
4,alice,myn4meizalize,Mon Apr 04 10:09:52 +0000 2022,RT @ohmpawatt: เพื่อนๆคิดถึงผมมั้ยยย ถ้าคิดถึง...,655,[]


In [8]:
# remove duplicates and retweets
df = twitter_sub.filter(~F.col("full_text").startswith("RT")) \
                               .drop_duplicates() \
                               .cache()
#removing spam accounts 
df = df.drop_duplicates(["full_text", "screen_name"])
                        
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- full_text: string (nullable = true)
 |-- followers_count: long (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- text: string (nullable = true)



In [10]:
# calculate the number of tweets in our dataset 
df.count()

1340938

We have a total of 1340938 distinct tweets

Now we will look at the number of distinct users that tweeted about the subject 

In [12]:
# select the number of distinct users from our dataframe 
numberOfUsers = df.select(F.countDistinct("screen_name").alias("NumberOfUsers"))

In [13]:
numberOfUsers.show()

+-------------+
|NumberOfUsers|
+-------------+
|       577959|
+-------------+



There are a total of 577959 users that posted a tweet about the subject 

There were a total number of 1340938 tweets so this means that each user posts 2,32 tweets on average

Now we will look at the number of tweets per keyword

In [21]:
def nr_of_tweets(keyword, df):
    df = df.filter(df.full_text.contains(keyword))
    return df.count()

In [22]:
tweets_vegan = nr_of_tweets("vegan", df)

In [23]:
print(tweets_vegan)

551736


Next we will look at the number of users that tweeted a certain keyword

In [29]:
def nr_of_users(keyword, df):
    df = df.filter(df.full_text.contains(keyword))
    # select the number of distinct users from our dataframe 
    numberOfUsers = df.select(F.countDistinct("screen_name").alias("NumberOfUsers"))
    numberOfUsers.show()

In [30]:
nr_of_users("vegan", df)

+-------------+
|NumberOfUsers|
+-------------+
|       253142|
+-------------+



# The timing of tweets 

We will look at the timing of a tweet per day, so we will look at what owers users post the most tweets 

In [15]:
# https://developer.twitter.com/en/docs/twitter-ads-api/timezones
# function to convert Twitter date string format
def getDate(date):
    if date is not None:
        return str(datetime.strptime(date,'%a %b %d %H:%M:%S +0000 %Y').replace(tzinfo=pytz.UTC).strftime("%Y-%m-%d %H:%M:%S"))
    else:
        return None

# UDF declaration
date_udf = F.udf(getDate, StringType())

# apply udf
df = df.withColumn('post_created_at', F.to_utc_timestamp(date_udf("created_at"), "UTC"))

In [16]:
df.printSchema()

root
 |-- name: string (nullable = true)
 |-- screen_name: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- full_text: string (nullable = true)
 |-- followers_count: long (nullable = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- indices: array (nullable = true)
 |    |    |    |-- element: long (containsNull = true)
 |    |    |-- text: string (nullable = true)
 |-- post_created_at: timestamp (nullable = true)



In [29]:
freq_hour = df.withColumn("hour", hour(df["post_created_at"]))
freq_hour = freq_hour.groupBy('hour').agg(countDistinct("full_text"))\
            .withColumnRenamed("count(full_text)", "freq") \
            .sort('hour', ascending = True).toPandas()
freq_hour.head(24)

Unnamed: 0,hour,freq
0,0,46810
1,1,46540
2,2,42341
3,3,39957
4,4,38110
5,5,39782
6,6,39893
7,7,43445
8,8,45617
9,9,48261


In [30]:
fig = px.line(freq_hour, x = 'hour', y= 'freq')

# Add figure title
fig.update_layout(
title_text="Tweet Activity Hourly",
title_x = 0.5
    )

# add axes
fig.update_xaxes(title_text="<b>Hour</b>")
fig.update_yaxes(title_text="<b>Amount of tweets</b>")

fig.show()

Now we will look at hourly tweet activity per keyword

In [33]:
def tweet_activity_hourly(keyword, df):
    df = df.filter(df.full_text.contains(keyword))
    freq_hour = df.withColumn("hour", hour(df["post_created_at"]))
    freq_hour = freq_hour.groupBy('hour').agg(countDistinct("full_text"))\
                .withColumnRenamed("count(full_text)", "freq") \
                .sort('hour', ascending = True).toPandas()
    fig = px.line(freq_hour, x = 'hour', y= 'freq')

    # Add figure title
    fig.update_layout(
    title_text="Tweet Activity Hourly",
    title_x = 0.5
    )

    # add axes
    fig.update_xaxes(title_text="<b>Hour</b>")
    fig.update_yaxes(title_text="<b>Amount of tweets</b>")

    fig.show()
    return freq_hour

In [35]:
hourly_tweets_vegan = tweet_activity_hourly("vegan", df)

In [36]:
hourly_tweets_vegan.head(24)

Unnamed: 0,hour,freq
0,0,20074
1,1,19350
2,2,16980
3,3,15056
4,4,13523
5,5,13843
6,6,13615
7,7,16214
8,8,17514
9,9,19386


# The number of folowers of tweeters

Because a user can post more than one tweets about the topic, some users will appear more than once in our dataset. The user can have a different number of followers, becuase the number of followers may have been increased or decreased between the time of the tweets. We decide to take the max of the number of followers for each users.  


In [37]:
# we select the max number of followers for each users 
numberOfFollowers = df.select("name", "followers_count") \
                        .groupBy("name") \
                        .max("followers_count") \
                        .withColumnRenamed("max(followers_count)", "MaxFollowersCount") \
                        .withColumn("MaxFollowersCount", F.round("MaxFollowersCount", 2)) 

In [38]:
# we sort in a decending way
numberOfFollowers = numberOfFollowers.sort("MaxFollowersCount", ascending=False)

In [39]:
# converting to pandas
numberofFollowersPd = numberOfFollowers.toPandas()

In [40]:
# look at the 20 accounts with the largest number of followers
numberofFollowersPd.head(20)

Unnamed: 0,name,MaxFollowersCount
0,Lady Gaga,84876971
1,CNN,58738132
2,The New York Times,53833653
3,BBC News (World),37368190
4,The Economist,26981263
5,Reuters,25387151
6,A.R.Rahman,24192397
7,Nicki Minaj,23311089
8,Fox News,22083552
9,The Wall Street Journal,19901847


We can see that the accounts with the largest number of followers are famous people or news sites. Which shows us that it's an item that's actual at the moment

In [108]:
# now we will look how mucht users have 0 followers
zeroFollowers = numberofFollowersPd[numberofFollowersPd['MaxFollowersCount'] == 0]
zeroFollowers.head()

Unnamed: 0,name,MaxFollowersCount
532066,CookingCompass,0
532067,Geetha Masala,0
532068,Green Field,0
532069,The Warehouse 365,0
532070,Hardy.Legros,0


In [109]:
len(zeroFollowers)

10963

In [111]:
len(zeroFollowers)/len(numberofFollowersPd)

0.02018860871150528

Only 2% of the users have zero followers

In [135]:
# now we will look how mucht users have 0 followers
lessThan1000Followers = numberofFollowersPd[numberofFollowersPd['MaxFollowersCount'] < 1000]
len(lessThan1000Followers)

390215

In [136]:
len(lessThan1000Followers)/len(numberofFollowersPd)

0.7185896149192769

Most users (71,86 %) have less than 1000 followers

In [124]:
# look how much followers our users have on average 
meanFollowers = numberofFollowersPd['MaxFollowersCount'].mean()
meanFollowers

7351.733467273387

Now we will look at the people with the highest number of followers for each keyword 

In [36]:
def nr_of_followers(keyword, df):
    df = df.filter(df.full_text.contains(keyword))
    # we select the max number of followers for each users 
    numberOfFollowers = df.select("name", "followers_count") \
                                        .groupBy("name") \
                                        .max("followers_count") \
                                        .withColumnRenamed("max(followers_count)", "MaxFollowersCount") \
                                        .withColumn("MaxFollowersCount", F.round("MaxFollowersCount", 2))
    
                                        
    # we sort in a decending way
    numberOfFollowers = numberOfFollowers.sort("MaxFollowersCount", ascending=False)
    numberOfFollowers.show()

In [37]:
nr_of_followers("vegan", df)

+--------------------+-----------------+
|                name|MaxFollowersCount|
+--------------------+-----------------+
|           Lady Gaga|         84876971|
|  The New York Times|         53818201|
|       The Economist|         26686134|
|             Reuters|         24739056|
|         Nicki Minaj|         23311089|
|The Wall Street J...|         19901847|
|            detikcom|         18694195|
|      Vogue Magazine|         14688011|
|                CGTN|         13445429|
|       BBC News (UK)|         12979119|
|                 WWE|         11769892|
|       Russell Brand|         11135448|
|                  +a|         10824177|
|        The Guardian|         10564395|
|           BBC Sport|         10306522|
|          TechCrunch|         10275596|
|               WIRED|         10258749|
|      The Daily Show|          9620433|
|    Noticias Caracol|          9489274|
|      The New Yorker|          8994333|
+--------------------+-----------------+
only showing top

In [38]:
nr_of_followers("healthylifestyle", df)

+--------------------+-----------------+
|                name|MaxFollowersCount|
+--------------------+-----------------+
|SHILPA SHETTY KUNDRA|          6459016|
|     Sebastian Rulli|          2918439|
|                 DNA|          2282085|
|            DawnNews|          2189518|
|            India TV|          2064560|
|         News7 Tamil|          1915254|
|        Ranveer Brar|          1743177|
|        Grazia India|          1583486|
|           Gulf News|          1288490|
|            ABP माझा|          1221739|
|         भारत समाचार|          1033717|
|       National Post|           967473|
|         TV9 Marathi|           903591|
|       Khaleej Times|           846718|
|        Tv9 Gujarati|           801750|
|    The Better India|           775174|
| Zee Bihar Jharkhand|           716263|
|          ZEE २४ तास|           695635|
|         Nick Carter|           670354|
|           Hindustan|           602337|
+--------------------+-----------------+
only showing top

# Stock Price Prediction

In [9]:
df_vegn = df.filter(df.full_text.contains('VEGN'))

In [10]:
df_vegn.count()

138

In [11]:
df_vegn.show()

+-----------------+-------------+--------------------+--------------------+---------------+--------------------+
|             name|  screen_name|          created_at|           full_text|followers_count|            hashtags|
+-----------------+-------------+--------------------+--------------------+---------------+--------------------+
|AGORACOM - George|     AGORACOM|Mon Feb 21 18:00:...|#SmallCap 60: Boo...|          11289|[{[0, 9], SmallCa...|
|    Keith Jameson|   keith_jams|Sat Sep 10 12:09:...|VEGNEWS LAUNCHES ...|          10571|[{[44, 56], foods...|
|         Plant&Co|aplantbasedco|Tue Dec 07 22:16:...|Hope you're havin...|           1458|[{[148, 154], veg...|
|AGORACOM - George|     AGORACOM|Mon Sep 12 12:22:...|NEWS... @ElseNutr...|          11850|[{[155, 170], Pla...|
|       Sean Black|    extremesb|Fri Jun 17 00:32:...|@shawnmoniz Happy...|           3806|[{[190, 198], org...|
|         ARandTec|     rand_tec|Wed Aug 31 13:28:...|Nothing to add he...|            199|[{[76

In [20]:
df_Beyond_Meat = df.filter(df.full_text.contains('Beyond Meat'))

In [21]:
df_Beyond_Meat.count()

414

In [18]:
df_bynd = df.filter(df.full_text.contains('BYND'))

In [19]:
df_bynd.count()

119

In [30]:
df_ADM = df_bynd = df.filter(df.full_text.contains('ADM'))

In [31]:
df_ADM.count()

126

In [34]:
df_archer_daniels_midland = df_bynd = df.filter(df.full_text.contains('Archer Daniels Midland'))

In [35]:
df_archer_daniels_midland.count()

1

In [36]:
df_Danone = df_bynd = df.filter(df.full_text.contains('Danone'))

In [37]:
df_Danone.count()

89

In [38]:
df_dollarSign = df_bynd = df.filter(df.full_text.contains('$'))
df_dollarSign.count()

19326

In [39]:
df_Dollar = df_bynd = df.filter(df.full_text.contains('Dollar'))
df_Dollar.count()

204

In [40]:
df_dollar = df_bynd = df.filter(df.full_text.contains('dollar'))
df_dollar.count()

1116