Spark Analysis on 400K Amazon Tweets - Matthew Kondrak

In [1]:
import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import functions
import pyspark.sql.functions as func
from pyspark.sql.types import DecimalType
from pyspark.sql.functions import lit
from pyspark.sql.functions import *

In [2]:
from pyspark.sql import SparkSession
spark=SparkSession.builder.appName('RDD').getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

22/05/05 23:37:22 WARN Utils: Your hostname, Matthews-MacBook-Pro-3.local resolves to a loopback address: 127.0.0.1; using 192.168.68.57 instead (on interface en0)
22/05/05 23:37:22 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/05/05 23:37:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Reading the data

In [3]:
spark.read.option('header','true').csv('/Users/matthewkondrak/Desktop/Amazon_Tweets.csv',inferSchema=True)
data_pyspark = spark.read.option('header','true').csv('/Users/matthewkondrak/Desktop/Amazon_Tweets.csv', inferSchema=True)

#size of data
print("Size of the Amazon Tweets Dataset:",(data_pyspark.count(), len(data_pyspark.columns)))

                                                                                

Size of the Amazon Tweets Dataset: (413247, 25)


In [4]:
data_pyspark.columns

['id_str',
 'tweet_created_at',
 'user_screen_name',
 'user_id_str',
 'user_statuses_count',
 'user_favourites_count',
 'user_protected',
 'user_listed_count',
 'user_following',
 'user_description',
 'user_location',
 'user_verified',
 'user_followers_count',
 'user_friends_count',
 'user_created_at',
 'tweet_language',
 'text_',
 'favorite_count',
 'favorited',
 'in_reply_to_screen_name',
 'in_reply_to_status_id_str',
 'in_reply_to_user_id_str',
 'retweet_count',
 'retweeted',
 'text']

Using only important columns for this analysis

In [5]:
data_pyspark.select('tweet_created_at', 'user_screen_name', 'user_id_str')
data_pyspark1=data_pyspark.select('tweet_created_at', 'user_screen_name', 'user_id_str')
data_pyspark1.show()

+--------------------+----------------+-----------+
|    tweet_created_at|user_screen_name|user_id_str|
+--------------------+----------------+-----------+
|Tue Nov 01 01:57:...|     SeanEPanjab|  143515471|
|Tue Nov 01 02:39:...|      AmazonHelp|   85741735|
|Tue Nov 01 17:14:...|     SeanEPanjab|  143515471|
|Tue Nov 01 17:15:...|     SeanEPanjab|  143515471|
|Tue Nov 01 17:19:...|      AmazonHelp|   85741735|
|Tue Nov 01 17:25:...|      AmazonHelp|   85741735|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|
|Tue Nov 01 18:02:...|      AmazonHelp|   85741735|
|Tue Nov 01 03:51:...|   aakashwangnoo|   71457972|
|Tue Nov 01 03:59:...|      AmazonHelp|   85741735|
|Tue Nov 01 11:00:...|   aakashwangnoo|   71457972|
|Tue Nov 01 12:03:...|      AmazonHelp|   85741735|
|Tue Nov 01 12:03:...|      AmazonHelp|   85741735|
|Tue Nov 01 14:57:...|   aakashwangnoo|   71457972|
|Tue Nov 01 16:38:...|      AmazonHelp|   85741735|
|Tue Nov 01 

Task 1:

Finding the Twitter users that are active on twitter in the dataset. 

Only counting the users that have posted in the last 5 days

Additionally, saving their "user_screen_name" and "user_id_str" in a dataframe "daily_active_users"

In [6]:
#fixing the tweet_created_at format to a more usable format for analysis

a = split(data_pyspark1['tweet_created_at'], ' ')
daily_active_users = data_pyspark1.withColumn('Month', a.getItem(1))
daily_active_users = daily_active_users.withColumn('Date', a.getItem(2))
daily_active_users = daily_active_users.withColumn('tweet_created_at', func.concat(func.col('Month'), func.lit(' '), func.col('Date')))

#daily_active_users.select("tweet_created_at").show()

In [7]:
#finding users with created posts in at least 5 days and counting
daily_active_users1 = daily_active_users.groupBy("user_screen_name").agg(func.countDistinct("tweet_created_at"))
daily_active_users2 = daily_active_users1.filter("count(DISTINCT tweet_created_at) >= 5")
daily_active_users2.show()

[Stage 10:>                                                         (0 + 2) / 2]

+----------------+-----------------------+
|user_screen_name|count(tweet_created_at)|
+----------------+-----------------------+
|  Green_JamesBee|                      6|
|   That_Musician|                      5|
| PritiYa80985097|                     10|
|    Speechlesstx|                      5|
|      nsvaluto74|                      5|
|        RGengage|                      9|
| sureshmuthrotil|                      6|
|       Tushi_Joy|                      7|
|   ajaygupta1974|                      5|
|      lupitasahu|                      6|
| SardarGurvinde2|                      5|
| Rajeshk76856612|                      5|
|        RISSHHII|                      5|
|      vaibhav926|                      9|
|    TaiKamiya101|                      5|
|  dahnamchandler|                      8|
|           opjha|                      5|
|        amruthhr|                     13|
|    boredgirl260|                      7|
| Ronakjobanputr1|                      6|
+----------

                                                                                

In [8]:
#inner join
daily_active_users3 = daily_active_users.join(daily_active_users2, ["user_screen_name"], "inner")
new_daily_active_users = daily_active_users3.select("user_screen_name", "user_id_str")
new_daily_active_users = new_daily_active_users.withColumn("user_id_str", new_daily_active_users.user_id_str.cast(DecimalType(18, 0)))

In [9]:
#showing all active users
new_daily_active_users.show()
print("Size of Daily Active Users:",(new_daily_active_users.count(), len(new_daily_active_users.columns)))

#saving to csv
dailyactiveusers=new_daily_active_users.toPandas()
dailyactiveusers.to_csv(r"/Users/matthewkondrak/Desktop/daily_active_users.csv")

                                                                                

+----------------+-----------+
|user_screen_name|user_id_str|
+----------------+-----------+
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|           False|       null|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|           False|       null|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|      AmazonHelp|   85741735|
|           False|        157|
|      AmazonHelp|   85741735|
+----------------+-----------+
only showing top 20 rows



                                                                                

Size of Daily Active Users: (199011, 2)


                                                                                

TASK 2: 

Conducting a sample A/B test on Twitter. 

Experiment.txt file includes "user_id_str", which are users that are selected as potential experiment targets. 

Goal: Creating a dataframe "experiment_user" to document the selected users and determine if they are active users

In [10]:
import pandas as pd
experimenttxt = pd.read_csv("/Users/matthewkondrak/Desktop/experiment.txt", sep=",", header=None)
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")
experiment_txt = spark.createDataFrame(experimenttxt)

#Renaming column
experiment_txt = experiment_txt.withColumnRenamed("0", "user_id_str")

print("Size of Experiments.txt File:",(experiment_txt.count(), len(experiment_txt.columns)))

Size of Experiments.txt File: (5000, 1)


In [11]:
#joining previous task
experiment_txt1 = new_daily_active_users.join(experiment_txt, ["user_id_str"], "outer")

In [12]:
#counting the number of times user_id_str and saving to new column "whether_active"
counting = experiment_txt1.groupBy(["user_id_str"]).agg(func.count(func.lit(1)).alias('whether_active'))
experimentuser = counting.withColumn("whether_active",when(counting["whether_active"] > 1, "yes").otherwise("no"))
experimentuser.show()
print("Size of Dataframe:",(experimentuser.count(), len(experimentuser.columns)))        

new_experimentuser=experimentuser.toPandas()
new_experimentuser.to_csv(r"/Users/matthewkondrak/Desktop/experiment_user.csv")

                                                                                

+-----------+--------------+
|user_id_str|whether_active|
+-----------+--------------+
|         75|           yes|
|        189|           yes|
|        215|           yes|
|        239|           yes|
|        325|           yes|
|        334|           yes|
|   23068174|           yes|
|   24038624|            no|
|   25805398|            no|
|   38289347|           yes|
|   39395899|           yes|
|   54311555|            no|
|   71308901|            no|
|   72342053|            no|
|   84303818|            no|
|  124404304|            no|
|  153994363|            no|
|  241391028|           yes|
|  263063213|            no|
|  293677963|           yes|
+-----------+--------------+
only showing top 20 rows



                                                                                

Size of Dataframe: (7035, 2)


                                                                                

In [13]:
#Calculating the percentage of active users

active_count = experimentuser.filter("Whether_active == 'yes'")
active_count = active_count.count() 
overall = experimentuser.count()
result = (active_count/overall) * 100
print("Percentage of Active Users:", result, "%")

                                                                                

Percentage of Active Users: 26.21179815209666 %


TASK 3

Using new revised experiment target list with the "final_experiment.csv" file.

In this file, several users were removed and a new column "info" has been added that includes whether the user is a female (F) or male (M). 

Goal: Joing the dataframes from Task 1 & 2 and saving the result in a "final_experiment" dataframe

In [14]:
#loading file in
spark.read.option('header','true').csv('/Users/matthewkondrak/desktop/final_experiment.csv',inferSchema=True)
final_exp = spark.read.option('header','true').csv('/Users/matthewkondrak/desktop/final_experiment.csv', inferSchema=True)
final_exp.show()
print("Size of final_exierments file:",(final_exp.count(), len(final_exp.columns))) 

+-------------+----+--------------+----------------+
|  user_id_str|info|whether_active|user_screen_name|
+-------------+----+--------------+----------------+
|   6.236402E7|   F|          null|            null|
|2.706101936E9|   M|          null|            null|
|    5654472.0|   F|          null|            null|
| 1.45579921E8|   F|          null|            null|
|2.502172122E9|   M|          null|            null|
| 2.43716471E8|   F|          null|            null|
|2.610379644E9|   M|          null|            null|
| 1.23138418E8|   M|          null|            null|
| 2.57376764E8|   F|          null|            null|
| 2.69145593E8|   M|          null|            null|
| 3.70711133E8|   F|          null|            null|
|1.510968974E9|   F|          null|            null|
|3.526380922E9|   M|          null|            null|
| 1.63413904E8|   F|          null|            null|
|  1.6980347E7|   M|          null|            null|
|1.209614366E9|   M|          null|           

Joining all 3 tables

In [15]:
#dropping columns and joining
final_exp = final_exp.drop("whether_active","user_screen_name")
final_experiment = final_exp.join(experimentuser, ["user_id_str"],"left").join(new_daily_active_users, ["user_id_str"], "left")

#blank user_screen_name are replaced with "not Found"
final_experiment = final_experiment.withColumn('user_screen_name', when(final_experiment.user_screen_name.isNull(), 
lit('Not Found')).otherwise(final_experiment.user_screen_name))
final_experiment.show()
print("Final Size of Dataframe:",(final_experiment.count(), len(final_experiment.columns))) 

                                                                                

+-------------+----+--------------+----------------+
|  user_id_str|info|whether_active|user_screen_name|
+-------------+----+--------------+----------------+
|   6.236402E7|   F|            no|       Not Found|
|3.526380922E9|   M|            no|       Not Found|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwritegarima|
|3.285473358E9|   F|           yes|    iwriteg

                                                                                

Final Size of Dataframe: (6060, 4)


In [16]:
#Finally Exporting the result to a .csv file
finalexperiment=final_experiment.toPandas()
finalexperiment.to_csv(r"/Users/matthewkondrak/Desktop/final_experiment2.csv")

                                                                                