Performing Spark RDD Operations (Transformation and Actions) on 400K Amazon Tweets - Matthew Kondrak

In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql.functions import split
import pyspark.sql.functions as sq 
from pyspark.sql.functions import concat,col,lit
import warnings
warnings.filterwarnings("ignore")

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('RDD').getOrCreate()

22/05/27 16:57:46 WARN Utils: Your hostname, Matthews-MacBook-Pro-3.local resolves to a loopback address: 127.0.0.1; using 192.168.68.57 instead (on interface en0)
22/05/27 16:57:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/27 16:57:47 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/05/27 16:57:48 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
spark

Reading the data

In [4]:
data_pyspark = spark.read.option('header','true').csv('/Users/matthewkondrak/Desktop/Amazon_Tweets.csv', inferSchema=True)

#size of data
print("Size of the Amazon Tweets Dataset:",(data_pyspark.count(), len(data_pyspark.columns)))

[Stage 2:>                                                          (0 + 8) / 8]

Size of the Amazon Tweets Dataset: (413247, 25)


                                                                                

In [5]:
data_pyspark.columns

['id_str',
 'tweet_created_at',
 'user_screen_name',
 'user_id_str',
 'user_statuses_count',
 'user_favourites_count',
 'user_protected',
 'user_listed_count',
 'user_following',
 'user_description',
 'user_location',
 'user_verified',
 'user_followers_count',
 'user_friends_count',
 'user_created_at',
 'tweet_language',
 'text_',
 'favorite_count',
 'favorited',
 'in_reply_to_screen_name',
 'in_reply_to_status_id_str',
 'in_reply_to_user_id_str',
 'retweet_count',
 'retweeted',
 'text']

Using only important columns for this analysis

In [6]:
data_pyspark1=data_pyspark.select('id_str', 'tweet_created_at', 'user_verified', 'favorite_count', 'retweet_count', 'text_')
data_pyspark1.show()

+--------------------+--------------------+-------------+--------------+-------------+--------------------+
|              id_str|    tweet_created_at|user_verified|favorite_count|retweet_count|               text_|
+--------------------+--------------------+-------------+--------------+-------------+--------------------+
|'793270689780203520'|Tue Nov 01 01:57:...|        False|             0|            0|@AmazonHelp Can y...|
|'793281386912354304'|Tue Nov 01 02:39:...|         True|             0|            0|@SeanEPanjab I'm ...|
|'793501578766319616'|Tue Nov 01 17:14:...|        False|             0|            0|@AmazonHelp It wa...|
|'793501657346682880'|Tue Nov 01 17:15:...|        False|             0|            0|@AmazonHelp I am ...|
|'793502854459879424'|Tue Nov 01 17:19:...|         True|             0|            0|@SeanEPanjab Plea...|
|'793504235400884224'|Tue Nov 01 17:25:...|         True|             0|            0|@SeanEPanjab With...|
|'793511847899070465'|Tue No

Goal 1:

Filtering user_verified to only show TRUE values

For the verified users, grouping by created date, and counting the number of tweets for each date

With the date with highest number of tweets, calculating the sum of favorite_count and retweet_count for each tweet on that day.




In [7]:
#only showing verified users
data_pyspark1 = data_pyspark1.filter(data_pyspark1.user_verified == 'True')
data_pyspark1.show()

+--------------------+--------------------+-------------+--------------+-------------+--------------------+
|              id_str|    tweet_created_at|user_verified|favorite_count|retweet_count|               text_|
+--------------------+--------------------+-------------+--------------+-------------+--------------------+
|'793281386912354304'|Tue Nov 01 02:39:...|         True|             0|            0|@SeanEPanjab I'm ...|
|'793502854459879424'|Tue Nov 01 17:19:...|         True|             0|            0|@SeanEPanjab Plea...|
|'793504235400884224'|Tue Nov 01 17:25:...|         True|             0|            0|@SeanEPanjab With...|
|'793513446633533440'|Tue Nov 01 18:02:...|         True|             0|            0|@SeanEPanjab I'm ...|
|'793301295255945216'|Tue Nov 01 03:59:...|         True|             0|            0|@aakashwangnoo Hi...|
|'793423313674571776'|Tue Nov 01 12:03:...|         True|             0|            0|@aakashwangnoo Hi...|
|'793423314333134850'|Tue No

In [8]:
#counting the Number of Tweets by Verified Users
print("Number of Tweets by Verified Users:", data_pyspark1.count())

[Stage 7:>                                                          (0 + 8) / 8]

Number of Tweets by Verified Users: 171797


                                                                                

In [9]:
#Fixing the tweet_created_at format to a more usable format for grouping
a = split(data_pyspark1['tweet_created_at'], ' ')
data_pyspark2 = data_pyspark1.withColumn('Month', a.getItem(1))
data_pyspark2 = data_pyspark2.withColumn('Date', a.getItem(2))

data_pyspark3 = data_pyspark2.withColumn('tweet_created_at', sq.concat(sq.col('Month'), sq.lit(' '), sq.col('Date')))
data_pyspark3 = data_pyspark3.select('id_str', 'tweet_created_at', 'user_verified', data_pyspark3.favorite_count.cast('int'), data_pyspark3.retweet_count.cast('int'), 'text_')
data_pyspark3.show()

+--------------------+----------------+-------------+--------------+-------------+--------------------+
|              id_str|tweet_created_at|user_verified|favorite_count|retweet_count|               text_|
+--------------------+----------------+-------------+--------------+-------------+--------------------+
|'793281386912354304'|          Nov 01|         True|             0|            0|@SeanEPanjab I'm ...|
|'793502854459879424'|          Nov 01|         True|             0|            0|@SeanEPanjab Plea...|
|'793504235400884224'|          Nov 01|         True|             0|            0|@SeanEPanjab With...|
|'793513446633533440'|          Nov 01|         True|             0|            0|@SeanEPanjab I'm ...|
|'793301295255945216'|          Nov 01|         True|             0|            0|@aakashwangnoo Hi...|
|'793423313674571776'|          Nov 01|         True|             0|            0|@aakashwangnoo Hi...|
|'793423314333134850'|          Nov 01|         True|           

In [10]:
#Counting the number of tweets in each date
tweetcount = data_pyspark3.groupby(data_pyspark3.tweet_created_at).agg(sq.count('id_str').alias('tweet_count'))
tweetcount.show()



+----------------+-----------+
|tweet_created_at|tweet_count|
+----------------+-----------+
|          Nov 27|        208|
|          Nov 01|        370|
|          Nov 06|        357|
|          Dec 10|        979|
|          Nov 04|        355|
|          Dec 17|        221|
|          Nov 12|        720|
|          Dec 03|       1197|
|          Nov 11|        149|
|          Nov 10|         93|
|          Nov 30|        278|
|          Dec 04|        884|
|          Nov 29|        952|
|          Nov 05|       1058|
|          Nov 26|        521|
|          Nov 19|        457|
|          Dec 07|        360|
|          Nov 13|        226|
|          Dec 12|        372|
|          Dec 16|        363|
+----------------+-----------+
only showing top 20 rows



                                                                                

In [11]:
import pyspark
conf = pyspark.SparkConf()
sc = pyspark.SparkContext.getOrCreate(conf=conf)
from pyspark.sql import SQLContext
sqlcontext = SQLContext(sc)

#Finding the date with the highest number of tweets
tweetcount.registerTempTable('tweetcount')
tweetcount = sqlcontext.sql('SELECT * FROM tweetcount order by tweet_count desc')
tweetcount.show()

+----------------+-----------+
|tweet_created_at|tweet_count|
+----------------+-----------+
|          Jan 03|       1536|
|          Jan 10|       1508|
|          Jan 11|       1496|
|          Jan 12|       1410|
|          Jan 06|       1364|
|          Jan 07|       1360|
|          Jan 20|       1336|
|          Mar 02|       1296|
|          Jan 13|       1295|
|          Jan 14|       1290|
|          Jan 21|       1290|
|          Jan 18|       1286|
|          Dec 15|       1276|
|          Jan 24|       1259|
|          Nov 18|       1246|
|          Dec 03|       1197|
|          Jan 02|       1195|
|          Jun 27|       1191|
|          Jul 04|       1190|
|          Jan 19|       1175|
+----------------+-----------+
only showing top 20 rows



In [12]:
import pyspark
conf = pyspark.SparkConf()
sc = pyspark.SparkContext.getOrCreate(conf=conf)
from pyspark.sql import SQLContext
sqlcontext = SQLContext(sc)

#Calculating the sum of “favorite_count” and “retweet_count” for each tweet on that day. Then reporting the text content (“text_”) of the top 100 tweets with highest sum. 
data_pyspark3.registerTempTable('tempdata')
retweets_sum = sqlcontext.sql("SELECT text_,favorite_count+ retweet_count as total from tempdata where tweet_created_at = 'Jan 03' order by total desc limit 100")
retweets_sum.show()

text1 = retweets_sum.toPandas()
a = text1['text_'].tolist()

                                                                                

+--------------------+-----+
|               text_|total|
+--------------------+-----+
|@amazon worst sho...|    5|
|@ItsJosshA We alw...|    3|
|@ItsJosshA Oh no!...|    2|
|@KStefl Sounds li...|    2|
|@Schoey1992 Happy...|    2|
|@ratbones666 You ...|    2|
|@ThorpPerrow Awww...|    2|
|@thedexterouz Hi!...|    2|
|@matt_linsley Ple...|    1|
|@VlSlT Sorry to h...|    1|
|@PPramod2041984 H...|    1|
|@mailstosandeep H...|    1|
|@Elidan_8 Here's ...|    1|
|@joyfulneesh Than...|    1|
|@brooklynnnross I...|    1|
|@DurhamBelle I'm ...|    1|
|@heypardeep We're...|    1|
|@__NaijaDrew Sorr...|    1|
|@magsophazjon Ple...|    1|
|@magsophazjon Hey...|    1|
+--------------------+-----+
only showing top 20 rows



                                                                                

In [13]:
#DataCleaning: Removing stopwords, removing punctuation
import nltk
import pandas as pd
import string
import re
from collections import Counter
from nltk.tokenize import sent_tokenize, word_tokenize
nltk.download('stopwords')
nltk.download('punkt')

def cleaning(textfile):
    y = textfile
    cleaned1=[]
    
    for i in y:
      i.replace("'", " ")
      cleaned1.append(i.translate(str.maketrans(string.punctuation,' '*len(string.punctuation))))                 
    
    output1=[]

    for i in cleaned1:
        output1.append(" ".join([w.lower() for w in i.split() if w.isalpha()]))    
    return output1

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/matthewkondrak/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to
[nltk_data]     /Users/matthewkondrak/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [14]:
#calculating the word frequency of the 100 tweets
cleaned1 = cleaning(a)
from pyspark import SparkContext
sc = SparkContext.getOrCreate()

w = cleaned1
tuple(w)
spark_rdd = sc.parallelize(w)
word_frequency = spark_rdd.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a + b).collect()

#saving to csv
goal1=pd.DataFrame(word_frequency)
goal1.columns = ['word','frequency']
print(goal1.sort_values(by=['frequency'], ascending=False).head(5))
goal1.to_csv(r"/Users/matthewkondrak/desktop/goal1.csv")



    word  frequency
356  the         65
355   to         65
75   you         64
223    t         61
214  for         50


                                                                                

Goal 2:

Using find_text.csv. The csv consists of two columns: "id_str" and "text" with the "text" column empty

Finding out the text content of each tweet according to “id_str” while joining Amazon_Tweets.csv and filling in the “text” column.

In [15]:
#importing find_text.csv
goal2 = spark.read.format("csv").option("header","true").load("/Users/matthewkondrak/Desktop/find_text.csv")
goal2.show()

+--------------------+----+
|              id_str|text|
+--------------------+----+
|'793270689780203520'|null|
|'793281386912354304'|null|
|'793299404975247360'|null|
|'793301295255945216'|null|
|'793315815411978240'|null|
|'793322306848292864'|null|
|'793322433625415680'|null|
|'793365409047023616'|null|
|'793369654878232577'|null|
|'793375905280393216'|null|
|'793376242837823488'|null|
|'793378044052406272'|null|
|'793378188416131072'|null|
|'793379112685568000'|null|
|'793381418395136000'|null|
|'793382930085253121'|null|
|'793383832720474113'|null|
|'793386133434593280'|null|
|'793386974459682816'|null|
|'793390636619759616'|null|
+--------------------+----+
only showing top 20 rows



In [16]:
#creating now table
goal2.registerTempTable('goal2')
data_pyspark.registerTempTable('tweet')

#joining
output2 = sqlcontext.sql("SELECT DISTINCT I.id_str,T.text_ from goal2 I JOIN tweet T on I.id_str = T.id_str")
output2.show()

#saving output
output3 = output2.toPandas()
output3.to_csv(r"/Users/matthewkondrak/Desktop/goal2.csv")

                                                                                

+--------------------+--------------------+
|              id_str|               text_|
+--------------------+--------------------+
|'793382930085253121'|@mybharatraj Hi! ...|
|'793441656984903680'|@AmazonHelp done,...|
|'793517259880861696'|Your customer ser...|
|'793533066157387776'|@flamablebrownie ...|
|'793542659625349121'|So now @AmazonHel...|
|'793579129333510144'|@jpokeefe That's ...|
|'793750855300313088'|@Rajleom10 Hi the...|
|'793813919106228224'|@AmazonHelp I ord...|
|'795621294226231300'|@AmazonHelp is my...|
|'793839943386791936'|@AmazonHelp Just ...|
|'793874173919649794'|@rava_diganta Hey...|
|'793843252810088448'|60 rs product in ...|
|'793848101975056388'|I *never* had any...|
|'794246147212767232'|@RUSSON82 We'd li...|
|'793901849552322560'|@AmazonHelp tryin...|
|'794376470659551232'|@imyashpal Hi the...|
|'794259803744960521'|@AmazonHelp that ...|
|'794253312895942657'|@Not_Domo We're a...|
|'794291925926952960'|I feel like I've ...|
|'794321892291334145'|@Pizza711 

                                                                                