In [0]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz
!tar xf spark-2.4.5-bin-hadoop2.7.tgz
!pip install -q findspark

In [0]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [0]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [11]:
#Loading Amazon Dataset 
#Task 1
dataset_amazon = spark.read.csv('Amazon_Responded_Oct05.csv',inferSchema=True, header =True)
dataset_amazon.show()

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|                text|
+--------------------+--------------------+----------------+-----------+-------------------+--

In [12]:
#-------------------------------------------------Data Cleaning Process-------------------------------------------------

#Taking only the 3 columns into consideration and extracting the required date format
from pyspark.sql.functions import concat,col,isnan, when, trim,substring

#3 columns
dataset_sel_col = dataset_amazon.select('tweet_created_at', 'user_screen_name', 'User_id_str')

#Date format
dataset_sel_col = dataset_sel_col.withColumn("tweet_created_at", concat(dataset_sel_col.tweet_created_at.substr(4,8),dataset_sel_col.tweet_created_at.substr(27,32)))
print(dataset_sel_col.count())

#Removing duplicates
df = dataset_sel_col.dropDuplicates()
print(df.count())

#Removing nulls,NaN
df_filtered = df.where(~(trim(col("tweet_created_at")).isNull() | isnan(col("tweet_created_at"))))
df_filtered = df.where(~(trim(col("user_screen_name")).isNull() | isnan(col("user_screen_name"))))
df_filtered = df.where(~(trim(col("User_id_str")).isNull() | isnan(col("User_id_str"))))
print(df_filtered.count())

413247
100563
99876


In [13]:
#Removing True/False values
df = df_filtered.filter(~col('user_screen_name').isin(['False','True']))
df = df.filter(~col('User_id_str').isin(['False','True']))
df = df.filter(~col('tweet_created_at').isin(['False','True']))
print(df.count())

#Removing non-integer User-IDs (We observe blank tweet dates and weird User Id strings for these rows)
df = df.select("tweet_created_at", "user_screen_name", "User_id_str",
  trim(col("User_id_str")).cast("int").isNotNull().alias("Value"))
df = df.where(df['Value'] == 'True')

#Keeping only valid dates by checking year
df = df.select("tweet_created_at", "user_screen_name", "User_id_str",
  substring(trim(col("tweet_created_at")),-4,4).cast("int").isNotNull().alias("Value"))
df = df.where(df['Value'] == 'True')

#Removing User ID = 0
df = df.where(df['User_id_str'] != 0)
print(df.count())

#Checking if all dates are correct by sorting them
df.orderBy("tweet_created_at", ascending=False).show()
#--------------------------------------------------------------------------------------------------

94346
71658
+----------------+----------------+-----------+-----+
|tweet_created_at|user_screen_name|User_id_str|Value|
+----------------+----------------+-----------+-----+
|     Nov 30 2016|   Suburban_Hell|   26014139| true|
|     Nov 30 2016|   TimBrooke2009|   39521459| true|
|     Nov 30 2016|        IctBiker|  448112443| true|
|     Nov 30 2016|       phillym64|  288493025| true|
|     Nov 30 2016|    Ride4TheStar|  323622025| true|
|     Nov 30 2016|       djeterg19|   19749718| true|
|     Nov 30 2016|        JanArrah|   49576954| true|
|     Nov 30 2016|  consultantweet| 1639136594| true|
|     Nov 30 2016|   rajthedreamer|  102948266| true|
|     Nov 30 2016|    DFernandez15|  284341562| true|
|     Nov 30 2016|       melcoug93|  891484453| true|
|     Nov 30 2016|    hellogregory|  148628040| true|
|     Nov 30 2016|       wilmaried|   53214902| true|
|     Nov 30 2016|      jayboy3775|  228248836| true|
|     Nov 30 2016|    rebeccamlane|  516612059| true|
|     Nov 30 201

In [14]:
#Step 1: Find out the users who are active for at least 5 days
import pyspark.sql.functions as f

df_users = df.groupBy('user_screen_name').count().select('user_screen_name', f.col('count').alias('n'))

df_users = df_users.where(df_users['n'] > 5)
print(df_users.count())
df_users.show()

220
+----------------+---+
|user_screen_name|  n|
+----------------+---+
|  Green_JamesBee|  6|
|        RGengage|  9|
| sureshmuthrotil|  6|
|       Tushi_Joy|  7|
|      lupitasahu|  6|
|      vaibhav926|  9|
|  dahnamchandler|  8|
|        amruthhr| 13|
|    boredgirl260|  7|
| Ronakjobanputr1|  6|
|   RealMikeStack|  7|
|           xtye_|  6|
|       Taterpugz|  7|
|        microrao|  6|
|        jcline35|  8|
|       only4ojha|  6|
|       ramscrism|  6|
|  MacqueenNathan|  6|
|      librarykat|  7|
|       dilbil143|  7|
+----------------+---+
only showing top 20 rows



In [15]:
temp = df_users.alias('a').join(df.alias('b'),col('a.user_screen_name') == col('b.user_screen_name'))
output_result = temp.select(['a.user_screen_name', 'b.User_id_str']).dropDuplicates()
                                              
output_result.show()

+----------------+-----------+
|user_screen_name|User_id_str|
+----------------+-----------+
|  sky_regenrated|  483059773|
|   Gentlemen_Sam|  441572163|
|        trallyus|   11702402|
|      Whitjoseph|  148495262|
|       basusagar|   42205044|
|    brian_riback|  424400406|
|   urvashi_mitra|  527489415|
|  ReclusiveCoder|  144280479|
|        kmayyank|  388738029|
|     lahsivarhor|   86762070|
| ranabiswanath12|  543571642|
|    coolfrog1988|   35214799|
|   RealMikeStack| 1529167434|
|         Asqwolf|   62498968|
|   project_gonzo|  134300648|
|   Tushar_Kapila|   17463956|
|       Tushi_Joy|   88709529|
|       deecaltee|  714942588|
|       posterest| 1066180207|
|         ke4ole1|  213060183|
+----------------+-----------+
only showing top 20 rows



In [16]:
#Step 2: Conduct an A/B test on Twitter

#Loading Experiment file
write_df = spark.read.option("header", "false").csv("experiment.txt")
experiment = write_df.select(f.col('_c0').alias("Experiment_User_id"))
print(experiment.count())
experiment.show()

5000
+------------------+
|Experiment_User_id|
+------------------+
|         143515471|
|          85741735|
|          71457972|
|        2908108256|
|         106799492|
|          59156981|
|         902137872|
|         110354554|
|          97424433|
|          62364020|
|        2706101936|
|           5654472|
|         145579921|
|        2502172122|
|         243716471|
|        2610379644|
|         123138418|
|         257376764|
|         269145593|
|         370711133|
+------------------+
only showing top 20 rows



In [17]:
#Join to find which user is active

left_join = experiment.join(output_result, experiment.Experiment_User_id == output_result.User_id_str,how='left')
output_step2 = left_join.select('Experiment_User_id','User_id_str')
print(output_step2.count()) 
output_step2.show()   

5000
+------------------+-----------+
|Experiment_User_id|User_id_str|
+------------------+-----------+
|         106799492|       null|
|        2610379644|       null|
|          62364020|       null|
|         123138418|       null|
|        2908108256|       null|
|         143515471|       null|
|          97424433|       null|
|        2706101936|       null|
|        1510968974|       null|
|         110354554|       null|
|        2502172122|       null|
|         243716471|       null|
|          59156981|       null|
|          71457972|       null|
|         257376764|       null|
|           5654472|       null|
|          85741735|   85741735|
|         370711133|       null|
|         145579921|       null|
|         902137872|       null|
+------------------+-----------+
only showing top 20 rows



In [18]:
from pyspark.sql import functions as F

selection = ['Experiment_User_id', F.when(F.col('User_id_str').isNull(),'No').otherwise('Yes').alias('Whether_active')]

experiment_user = output_step2.select(selection)
print(experiment_user.count())
experiment_user.show()

5000
+------------------+--------------+
|Experiment_User_id|Whether_active|
+------------------+--------------+
|          11798342|            No|
|        1210875679|            No|
|         128257538|            No|
|         137088213|            No|
|         142202059|            No|
|          14291504|            No|
|          16279527|            No|
|         163148814|            No|
|          20110100|            No|
|         211972025|            No|
|        2176836186|            No|
|         234641258|            No|
|        2372082613|            No|
|          26915435|            No|
|          27840175|            No|
|         301131509|            No|
|        3028486809|            No|
|         305743837|            No|
|        3196213653|            No|
|         348844543|            No|
+------------------+--------------+
only showing top 20 rows



In [19]:
#Percentage of active users
output = experiment_user.where(experiment_user['Whether_Active'] == 'Yes').count()/experiment_user.count()*100
print(f'{output}%')

1.2%


In [20]:
#Step 3: Joining the 3 tables to form Amazon_new.csv
#df - Amazon cleaned data
#output_result - Table after Step 1
#experiment_user - Table after Step 2

temp = df.alias('a').join(output_result.alias('b'),col('a.User_id_str') == col('b.User_id_str'))
output_result1 = temp.select(['a.tweet_created_at','a.user_screen_name','a.User_id_str']).dropDuplicates()
print(output_result1.count()) 
#output_result1.show()

output_step2 = experiment_user.where(experiment_user['Whether_Active'] == 'Yes')
print(output_step2.count()) 


inner_join = output_result1.join(output_step2, output_result1.User_id_str == output_step2.Experiment_User_id,how='inner')
output_step3 = inner_join.select('tweet_created_at','user_screen_name','User_id_str').dropDuplicates()

print(output_step3.count()) 
output_step3.show()

1937
60
743
+----------------+----------------+-----------+
|tweet_created_at|user_screen_name|User_id_str|
+----------------+----------------+-----------+
|     Jan 30 2017|          medtek|   18435372|
|     Nov 18 2016|          medtek|   18435372|
|     Dec 22 2016|          medtek|   18435372|
|     Dec 23 2016|          medtek|   18435372|
|     Apr 14 2017|          medtek|   18435372|
|     Apr 27 2017|          medtek|   18435372|
|     Jan 31 2017|          medtek|   18435372|
|     Nov 19 2016|      TheMimiZee| 1387267123|
|     Feb 11 2017|      TheMimiZee| 1387267123|
|     Jan 20 2017|      TheMimiZee| 1387267123|
|     Jan 27 2017|      TheMimiZee| 1387267123|
|     Feb 12 2017|      TheMimiZee| 1387267123|
|     Nov 12 2016|      TheMimiZee| 1387267123|
|     Nov 13 2016|      TheMimiZee| 1387267123|
|     Jan 26 2017|      TheMimiZee| 1387267123|
|     Nov 03 2016|    J_cooper1990|  885336408|
|     Jan 26 2017|    J_cooper1990|  885336408|
|     Jan 20 2017|    J_coop

In [0]:
#Exporting the output 
output = output_step3.toPandas()
output.to_csv('Amazon_new.csv')