In [1]:
# Get the environment ready
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [2]:
# Install Spark
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.6.tgz # Get Spark Installer version

In [4]:
!tar xf spark-2.4.7-bin-hadoop2.6.tgz # Untarring Spark Installer

In [5]:
!pip install -q findspark

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.6"

In [7]:
!pip install -q pyspark

In [8]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Colab is ready to run Pyspark!

In [9]:
from pyspark import SparkContext
sc =SparkContext.getOrCreate()

In [10]:
# Read Amazon_Responded_Oct05 data 
data = spark.read.format("csv").option("header","true").load("drive/My\ Drive/Colab\ Notebooks/Amazon_Responded_Oct05.csv")

In [11]:
data.show(5)

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|                text|
+--------------------+--------------------+----------------+-----------+-------------------+--

In [12]:
new_data = data.select( "tweet_created_at", "user_screen_name", "user_id_str", "text_")

In [13]:
new_rdd = new_data.withColumn("day", new_data.tweet_created_at.substr(5,6))

In [14]:
new_data.show(5)

+--------------------+----------------+-----------+--------------------+
|    tweet_created_at|user_screen_name|user_id_str|               text_|
+--------------------+----------------+-----------+--------------------+
|Tue Nov 01 01:57:...|     SeanEPanjab|  143515471|@AmazonHelp Can y...|
|Tue Nov 01 02:39:...|      AmazonHelp|   85741735|@SeanEPanjab I'm ...|
|Tue Nov 01 17:14:...|     SeanEPanjab|  143515471|@AmazonHelp It wa...|
|Tue Nov 01 17:15:...|     SeanEPanjab|  143515471|@AmazonHelp I am ...|
|Tue Nov 01 17:19:...|      AmazonHelp|   85741735|@SeanEPanjab Plea...|
+--------------------+----------------+-----------+--------------------+
only showing top 5 rows



In [15]:
expression = r"\w{3} \d{2}"
df = new_rdd.filter(new_rdd["day"].rlike(expression ))

In [16]:
df.show()

+--------------------+----------------+-----------+--------------------+------+
|    tweet_created_at|user_screen_name|user_id_str|               text_|   day|
+--------------------+----------------+-----------+--------------------+------+
|Tue Nov 01 01:57:...|     SeanEPanjab|  143515471|@AmazonHelp Can y...|Nov 01|
|Tue Nov 01 02:39:...|      AmazonHelp|   85741735|@SeanEPanjab I'm ...|Nov 01|
|Tue Nov 01 17:14:...|     SeanEPanjab|  143515471|@AmazonHelp It wa...|Nov 01|
|Tue Nov 01 17:15:...|     SeanEPanjab|  143515471|@AmazonHelp I am ...|Nov 01|
|Tue Nov 01 17:19:...|      AmazonHelp|   85741735|@SeanEPanjab Plea...|Nov 01|
|Tue Nov 01 17:25:...|      AmazonHelp|   85741735|@SeanEPanjab With...|Nov 01|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|@AmazonHelp It wa...|Nov 01|
|Tue Nov 01 17:55:...|     SeanEPanjab|  143515471|@AmazonHelp if it...|Nov 01|
|Tue Nov 01 18:02:...|      AmazonHelp|   85741735|@SeanEPanjab I'm ...|Nov 01|
|Tue Nov 01 03:51:...|   aakashwangnoo| 

In [17]:
df.createOrReplaceTempView("Amazon") # Rename the data

## **TASKS**

### **Step 1**


Create a dataframe “daily_active_users”. Find out the users who are active in at least five listed days (i.e., created posts in at least 5 days) in Amazon_Responded_Oct05.csv and save their “user_screen_name” and “user_id_str” in the dataframe. 

In [18]:
# Let's find the total number of days in Amazon dataset
spark.sql("SELECT COUNT(DISTINCT(day)) FROM Amazon WHERE day NOT LIKE 'has%' ").show()

+-------------------+
|count(DISTINCT day)|
+-------------------+
|                279|
+-------------------+



In [19]:
# From the above query, we know that users posted for 279 days. Now, let's find users that posted on ALL DAYS 
spark.sql("SELECT user_screen_name, user_id_str FROM Amazon GROUP BY user_id_str, user_screen_name HAVING COUNT(DISTINCT(day))= 279").show()

+----------------+-----------+
|user_screen_name|user_id_str|
+----------------+-----------+
|      AmazonHelp|   85741735|
+----------------+-----------+



In [20]:
# Next, we find users who are active in at least five listed days (i.e., created posts in at least 5 days)
daily_active_users = spark.sql("SELECT user_screen_name, user_id_str, COUNT(DISTINCT(day)) as days_active FROM Amazon GROUP BY user_id_str, user_screen_name HAVING COUNT(DISTINCT(day)) >= 5 ORDER BY user_screen_name")
daily_active_users.show()

+----------------+------------------+-----------+
|user_screen_name|       user_id_str|days_active|
+----------------+------------------+-----------+
|          0xJAKE|        4146949275|          5|
|         2SixTwo|        3789082213|          7|
| 353839052d72499|        3025211876|          9|
|   5thLineTalent|          24237942|          5|
|          AW0079|          21064452|         13|
|  Abhinav1bansal|        3315608602|          5|
|        AgentDie|        2200871844|          7|
|  Ahmed_Awad1212|          62705229|          5|
|       AjeetKarn|         278977971|          5|
|    AkiraSakura2|        1064415878|          6|
|  Akshay_goplani|        2889927415|         10|
| Alfonsiawarrior|        2333955555|          7|
|         Alokzgh|733116597825495040|          5|
|      AmazonHelp|          85741735|        279|
|          Amez_W|        1341265634|          5|
|        AmpersUK|         214049826|          5|
| AmyUnitedStates|        2953709027|          5|


### **Step 2**

A company would like to conduct an A/B test on Twitter. The experiment.txt file includes the user_id_str they selected as potential experiment targets. Please create a dataframe “experiment_user” to document the selected user id and whether they are active users (join the dataframe from step 1).

In [21]:
experiment = spark.read.csv("/content/drive/My Drive/MIS/Fall 2020/IDS 561 Big data/experiment.txt")

In [22]:
experiment= experiment.selectExpr("_c0 as userid")

In [23]:
experiment.show(5)
experiment.dtypes

+----------+
|    userid|
+----------+
| 143515471|
|  85741735|
|  71457972|
|2908108256|
| 106799492|
+----------+
only showing top 5 rows



[('userid', 'string')]

In [24]:
#Join the output tables
join = (experiment.userid == daily_active_users.user_id_str)

In [25]:
#Perform a full join on the tables
df2 = daily_active_users.join(experiment, join, how="full")

In [26]:
from pyspark.sql import functions as F

In [27]:
df2 = df2.withColumn("Whether_active", F.when(F.col("user_id_str") == F.col("userid"), "yes").otherwise("no"))

In [28]:
# Clean data by filtering null screen names
df2= df2.filter(df2.user_screen_name.isNotNull())
df2.show()

+----------------+------------------+-----------+------------------+--------------+
|user_screen_name|       user_id_str|days_active|            userid|Whether_active|
+----------------+------------------+-----------+------------------+--------------+
|          0xJAKE|        4146949275|          5|              null|            no|
|         2SixTwo|        3789082213|          7|              null|            no|
| 353839052d72499|        3025211876|          9|              null|            no|
|   5thLineTalent|          24237942|          5|              null|            no|
|          AW0079|          21064452|         13|          21064452|           yes|
|  Abhinav1bansal|        3315608602|          5|              null|            no|
|        AgentDie|        2200871844|          7|              null|            no|
|  Ahmed_Awad1212|          62705229|          5|              null|            no|
|       AjeetKarn|         278977971|          5|              null|        

In [29]:
new_user = df2.drop("user_id_str", "userid")

In [30]:
# Show if a user is active or not
new_user = new_user.selectExpr("user_screen_name as user_id_str", "Whether_active")
new_user.show(20) # Show only 20 records

+---------------+--------------+
|    user_id_str|Whether_active|
+---------------+--------------+
|         0xJAKE|            no|
|        2SixTwo|            no|
|353839052d72499|            no|
|  5thLineTalent|            no|
|         AW0079|           yes|
| Abhinav1bansal|            no|
|       AgentDie|            no|
| Ahmed_Awad1212|            no|
|      AjeetKarn|            no|
|   AkiraSakura2|            no|
| Akshay_goplani|            no|
|Alfonsiawarrior|            no|
|        Alokzgh|            no|
|     AmazonHelp|           yes|
|         Amez_W|           yes|
|       AmpersUK|           yes|
|AmyUnitedStates|            no|
|   AndyMRoberts|           yes|
|          Aosc2|           yes|
|       Arthi305|           yes|
+---------------+--------------+
only showing top 20 rows



In [31]:
# Filter out users who are active users and who are not
active = new_user.filter(new_user.Whether_active.contains("yes")).count()
inactive = new_user.filter(new_user.Whether_active.contains("no")).count()

In [32]:
# Calculate the percentage of active users
percentage_of_active_users = (active/(active+inactive))*100

In [33]:
percentage_of_active_users

20.404721753794266

### **Step 3**

Perform a 3-table join task

In [34]:
data.show(10)

+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+-------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_user_id_str|retweet_count|retweeted|                text|
+--------------------+--------------------+----------------+-----------+-------------------+--

In [35]:
new_user.show(10)

+---------------+--------------+
|    user_id_str|Whether_active|
+---------------+--------------+
|         0xJAKE|            no|
|        2SixTwo|            no|
|353839052d72499|            no|
|  5thLineTalent|            no|
|         AW0079|           yes|
| Abhinav1bansal|            no|
|       AgentDie|            no|
| Ahmed_Awad1212|            no|
|      AjeetKarn|            no|
|   AkiraSakura2|            no|
+---------------+--------------+
only showing top 10 rows



In [36]:
daily_active_users.show(10)

+----------------+-----------+-----------+
|user_screen_name|user_id_str|days_active|
+----------------+-----------+-----------+
|          0xJAKE| 4146949275|          5|
|         2SixTwo| 3789082213|          7|
| 353839052d72499| 3025211876|          9|
|   5thLineTalent|   24237942|          5|
|          AW0079|   21064452|         13|
|  Abhinav1bansal| 3315608602|          5|
|        AgentDie| 2200871844|          7|
|  Ahmed_Awad1212|   62705229|          5|
|       AjeetKarn|  278977971|          5|
|    AkiraSakura2| 1064415878|          6|
+----------------+-----------+-----------+
only showing top 10 rows



In [38]:
# Joining the 3 tables 
joinA= (new_user.user_id_str == daily_active_users.user_screen_name)
joinB= (daily_active_users.user_screen_name == data.user_screen_name)

dfA = daily_active_users.join(new_user, joinA, how="inner").join(data, joinB, how="right")
dfB= dfA.filter(df.user_screen_name.isNotNull())
dfB.show()

+----------------+-----------+-----------+--------------+--------------+--------------------+--------------------+----------------+-----------+-------------------+---------------------+--------------+-----------------+--------------+--------------------+------------------+-------------+--------------------+------------------+--------------------+--------------+--------------------+--------------+---------+-----------------------+-------------------------+-----------------------+-------------+---------+--------------------+
|user_screen_name|user_id_str|days_active|   user_id_str|Whether_active|              id_str|    tweet_created_at|user_screen_name|user_id_str|user_statuses_count|user_favourites_count|user_protected|user_listed_count|user_following|    user_description|     user_location|user_verified|user_followers_count|user_friends_count|     user_created_at|tweet_language|               text_|favorite_count|favorited|in_reply_to_screen_name|in_reply_to_status_id_str|in_reply_to_

In [39]:
import pandas as pd
df_final = dfB.toPandas()

In [40]:
df_final.to_csv("/content/drive/My Drive/MIS/Fall 2020/IDS 561 Big data/Amazon_new.csv", header = True, index=False)