In [1]:
import os

# Setting Configurations - JAVA_HOME and SPARK_HOME Variable

os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-18"
os.environ["SPARK_HOME"] = r"C:\Spark\spark-3.2.1-bin-hadoop3.2"

In [2]:
# Import required modules

from pyspark.sql import functions
import pandas as pd

## Create a local Spark Session

In [3]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").config("spark.driver.memory", "15g").appName("SparkSession").getOrCreate()

In [4]:
#Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

## Spark DataFrame/SQL Transformations and Actions.

## STEP 1

In [5]:
# Reading the dataset using spark

data = pd.read_csv('Amazon_Responded_Oct05.csv', sep=",", error_bad_lines=False)
data.head()



  exec(code_obj, self.user_global_ns, self.user_ns)
b'Skipping line 209619: expected 25 fields, saw 26\nSkipping line 209620: expected 25 fields, saw 26\n'


Unnamed: 0,id_str,tweet_created_at,user_screen_name,user_id_str,user_statuses_count,user_favourites_count,user_protected,user_listed_count,user_following,user_description,...,tweet_language,text_,favorite_count,favorited,in_reply_to_screen_name,in_reply_to_status_id_str,in_reply_to_user_id_str,retweet_count,retweeted,text
0,'793270689780203520',Tue Nov 01 01:57:25 +0000 2016,SeanEPanjab,143515471.0,51287,4079,False,74,False,Content marketer; Polyglot; Beard aficionado; ...,...,en,@AmazonHelp Can you please DM me? A product I ...,0,False,AmazonHelp,,85741735.0,0,False,
1,'793281386912354304',Tue Nov 01 02:39:55 +0000 2016,AmazonHelp,85741735.0,2225450,11366,False,796,False,We answer Amazon support questions 7 days a we...,...,en,"@SeanEPanjab I'm sorry, we're unable to DM you...",0,False,SeanEPanjab,7.932707e+17,143515471.0,0,False,
2,'793501578766319616',Tue Nov 01 17:14:53 +0000 2016,SeanEPanjab,143515471.0,51287,4079,False,74,False,Content marketer; Polyglot; Beard aficionado; ...,...,en,@AmazonHelp It was purchased on https://t.co/g...,0,False,AmazonHelp,7.932814e+17,85741735.0,0,False,@AmazonHelp It was purchased on https://t.co/g...
3,'793501657346682880',Tue Nov 01 17:15:12 +0000 2016,SeanEPanjab,143515471.0,51287,4079,False,74,False,Content marketer; Polyglot; Beard aficionado; ...,...,en,"@AmazonHelp I am following you now, if it help...",0,False,AmazonHelp,7.932814e+17,85741735.0,0,False,
4,'793502854459879424',Tue Nov 01 17:19:57 +0000 2016,AmazonHelp,85741735.0,2225450,11366,False,796,False,We answer Amazon support questions 7 days a we...,...,en,@SeanEPanjab Please give us a call/chat so we ...,0,False,SeanEPanjab,7.935016e+17,143515471.0,0,False,@SeanEPanjab Please give us a call/chat so we ...


In [6]:
#Then convert Pandas Dataframe to Spark Dataframe
data = spark.createDataFrame(data)

In [7]:
# Select the needed columns from the dataset

df = data.select('tweet_created_at','user_screen_name', 'user_id_str')
df.head(5)

[Row(tweet_created_at='Tue Nov 01 01:57:25 +0000 2016', user_screen_name='SeanEPanjab', user_id_str=143515471.0),
 Row(tweet_created_at='Tue Nov 01 02:39:55 +0000 2016', user_screen_name='AmazonHelp', user_id_str=85741735.0),
 Row(tweet_created_at='Tue Nov 01 17:14:53 +0000 2016', user_screen_name='SeanEPanjab', user_id_str=143515471.0),
 Row(tweet_created_at='Tue Nov 01 17:15:12 +0000 2016', user_screen_name='SeanEPanjab', user_id_str=143515471.0),
 Row(tweet_created_at='Tue Nov 01 17:19:57 +0000 2016', user_screen_name='AmazonHelp', user_id_str=85741735.0)]

In [8]:
# Creating a dataframe with formatted date (MONTH DAY YEAR)

dataframe_with_formatted_date = df.rdd.map(lambda x:
    (x.tweet_created_at.split(" ")[1] + " " + x.tweet_created_at.split(" ")[2] + " " + x.tweet_created_at.split(" ")[-1],
     x.user_screen_name,
     x.user_id_str
    ))

dataframe_with_formatted_date = dataframe_with_formatted_date.toDF(['tweet_created_at','user_screen_name', 'user_id_str'])
dataframe_with_formatted_date.show(10)
dataframe_with_formatted_date.count()

+----------------+----------------+------------+
|tweet_created_at|user_screen_name| user_id_str|
+----------------+----------------+------------+
|     Nov 01 2016|     SeanEPanjab|1.43515471E8|
|     Nov 01 2016|      AmazonHelp| 8.5741735E7|
|     Nov 01 2016|     SeanEPanjab|1.43515471E8|
|     Nov 01 2016|     SeanEPanjab|1.43515471E8|
|     Nov 01 2016|      AmazonHelp| 8.5741735E7|
|     Nov 01 2016|      AmazonHelp| 8.5741735E7|
|     Nov 01 2016|     SeanEPanjab|1.43515471E8|
|     Nov 01 2016|     SeanEPanjab|1.43515471E8|
|     Nov 01 2016|      AmazonHelp| 8.5741735E7|
|     Nov 01 2016|   aakashwangnoo| 7.1457972E7|
+----------------+----------------+------------+
only showing top 10 rows



378132

In [9]:
# Group the dataframe by user_screen_name and then find the user names which have alteast posted tweets in 5 different days.
# Then we can find daily_active_users

dataframe_with_formatted_date.createOrReplaceTempView("test")
daily_active_users = spark.sql("select user_screen_name, user_id_str from test group by user_screen_name, user_id_str having COUNT (DISTINCT tweet_created_at) > 4")
daily_active_users.show()
daily_active_users.count()

+----------------+--------------------+
|user_screen_name|         user_id_str|
+----------------+--------------------+
|       sundevesh|         2.0443112E8|
|   anishsangamam|        1.40738682E8|
|   adit_morzaria|        5.86301042E8|
|        cocobobi|        3.00813035E8|
|       neerajap3|        1.60073361E8|
|   pagetbrewster|8.273753192083005...|
| RachelWhisker87|        3.55860475E8|
|   OctavioDelano|       2.347968828E9|
|       bala43212|         3.8803772E7|
|      vishal2323|          9.631293E7|
|   TechyFarooqui|       3.006076339E9|
|        TCMuffin|         3.5591749E7|
|          MtnrMS|       3.309102108E9|
|  martinbrossman|          1.411963E7|
|      chickenb00|         1.6193077E7|
|  rishabgundecha|       1.227418134E9|
|      Kar4change|8.160374434681978...|
|          azkite|         3.9689052E8|
|         RCausey|         3.7534186E7|
|       sanjanind|         6.6622722E7|
+----------------+--------------------+
only showing top 20 rows



593

In [10]:
# Writing daily_active_users dataframe to csv file

daily_active_users.toPandas().to_csv('daily_active_users.csv', index=False)

## STEP 2

In [11]:
# Reading the text file and then convert to a dataframe by giving column name as user_id_str

experiment_data = spark.read.csv('experiment.txt', header=False)
experiment_df = experiment_data.toDF('user_id_str')

In [12]:
experiment_df.show()

+-----------+
|user_id_str|
+-----------+
|  143515471|
|   85741735|
|   71457972|
| 2908108256|
|  106799492|
|   59156981|
|  902137872|
|  110354554|
|   97424433|
|   62364020|
| 2706101936|
|    5654472|
|  145579921|
| 2502172122|
|  243716471|
| 2610379644|
|  123138418|
|  257376764|
|  269145593|
|  370711133|
+-----------+
only showing top 20 rows



In [13]:
# Left join experiment_df dataframe with daily_active_users dataframe in order find the records which are exist in both dataframes.
# Here from left join, all the user_id_str in the experiment_df dataframe is joining with user_id_str in the daily_active_users dataframe.
# If not found it will apply as null value

experiment_df = experiment_df.join(daily_active_users, ["user_id_str"], "left")

In [14]:
experiment_df.show()

+-----------+----------------+
|user_id_str|user_screen_name|
+-----------+----------------+
|   62364020|            null|
|   71457972|            null|
|   97424433|            null|
| 2502172122|            null|
|  257376764|            null|
|  145579921|            null|
|  123138418|            null|
|   59156981|            null|
|  106799492|            null|
| 2610379644|            null|
| 2908108256|            null|
|  110354554| praveen_pandey_|
|  269145593|            null|
| 1510968974|            null|
|  243716471|            null|
|  370711133|            null|
|    5654472|            null|
|  902137872|     mybharatraj|
| 2706101936|            null|
|   85741735|      AmazonHelp|
+-----------+----------------+
only showing top 20 rows



In [15]:
# Here I mapped the experiment_df dataframe in order to modify the user_id_str column to whether_active
# Here I defined a method to modify the whether_active column based on the user_screen_name. If user_screen_name is null then
# that user is not active. Otherwise that user is active.

def check_whether_active(x):
    if x["user_screen_name"]:
        return "yes"
    else:
        return "no"

experiment_user = experiment_df.rdd.map(lambda x:
    (x.user_id_str,
     check_whether_active(x)
    ))

experiment_user = experiment_user.toDF(['user_id_str', 'whether_active'])
experiment_user.show(10)
experiment_user.count()

+------------------+--------------+
|       user_id_str|whether_active|
+------------------+--------------+
|          89107116|            no|
|          89633357|            no|
|         585782554|            no|
|763375765253652480|            no|
|793640186504581120|            no|
|         283168534|            no|
|           6927562|            no|
|         160326340|            no|
|          20561973|            no|
|         283608923|            no|
+------------------+--------------+
only showing top 10 rows



5000

In [16]:
# Calculate Percentage of Active Count

total_count = experiment_user.count()
active_count = experiment_user.filter(experiment_user['whether_active'] == 'yes').count()

active_percentage_count = (active_count/total_count) * 100
print ("Percentage of Active Count :", active_percentage_count)

Percentage of Active Count : 2.42


In [17]:
# Writing experiment_user dataframe to csv file

experiment_user.toPandas().to_csv('experiment_user.csv', index=False)

## STEP 3

In [18]:
# Read the final_experiment.csv and create a dataframe

final_experiment = pd.read_csv("final_experiment.csv")
final_experiment = spark.createDataFrame(final_experiment)

final_experiment.show()

+-------------+----+--------------+----------------+
|  user_id_str|info|whether_active|user_screen_name|
+-------------+----+--------------+----------------+
|   6.236402E7|   F|          null|            null|
|2.706101936E9|   M|          null|            null|
|    5654472.0|   F|          null|            null|
| 1.45579921E8|   F|          null|            null|
|2.502172122E9|   M|          null|            null|
| 2.43716471E8|   F|          null|            null|
|2.610379644E9|   M|          null|            null|
| 1.23138418E8|   M|          null|            null|
| 2.57376764E8|   F|          null|            null|
| 2.69145593E8|   M|          null|            null|
| 3.70711133E8|   F|          null|            null|
|1.510968974E9|   F|          null|            null|
|3.526380922E9|   M|          null|            null|
| 1.63413904E8|   F|          null|            null|
|  1.6980347E7|   M|          null|            null|
|1.209614366E9|   M|          null|           

In [19]:
# Drop these 2 columns since they are empty and we need to join with other 2 dataframes

columns_to_drop = ['whether_active', 'user_screen_name']
final_experiment = final_experiment.drop(*columns_to_drop)

In [20]:
# First left join final_experiment dataframe with experiment_user dataframe in order to join all user_id_str 
# values in final_experiment with experiment_user where not available values in experiment_user will get null.

# Then left join above result dataframe with daily_active_users dataframe in order to join all user_id_str 
# values in above result with daily_active_users where not available values in daily_active_users will get null.

final_experiment = final_experiment.join(experiment_user, ["user_id_str"], "left").join(daily_active_users, ["user_id_str"], "left")
final_experiment.show()

+-------------+----+--------------+----------------+
|  user_id_str|info|whether_active|user_screen_name|
+-------------+----+--------------+----------------+
|   6.236402E7|   F|            no|            null|
|3.526380922E9|   M|            no|            null|
|3.285473358E9|   F|           yes|    iwritegarima|
|1.209614366E9|   M|            no|            null|
|2.502172122E9|   M|            no|            null|
| 2.57376764E8|   F|            no|            null|
| 1.45579921E8|   F|            no|            null|
| 1.23138418E8|   M|            no|            null|
|2.610379644E9|   M|            no|            null|
| 4.22175328E8|   M|            no|            null|
| 1.63413904E8|   F|            no|            null|
|  1.6980347E7|   M|            no|            null|
| 2.69145593E8|   M|            no|            null|
|1.510968974E9|   F|            no|            null|
|1.970607968E9|   M|            no|            null|
| 2.43716471E8|   F|            no|           

In [21]:
# Here I mapped the final_experiment dataframe in order to modify the user_screen_name column.
# Here I defined a method to modify the user_screen_name column based on the user_screen_name. If user_screen_name is null then
# that user is Not found. Otherwise I'm returning the user_screen_name.

def check_whether_user_screen_name_is_null(x):
    if x["user_screen_name"]:
        return x["user_screen_name"]
    else:
        return "Not found"

final_experiment = final_experiment.rdd.map(lambda x:
    (x.user_id_str,
     x.info,
     x.whether_active,
     check_whether_user_screen_name_is_null(x)
    ))
final_experiment = final_experiment.toDF(['user_id_str', 'info', 'whether_active', 'user_screen_name'])
final_experiment.show(10)
final_experiment.count()

+-------------+----+--------------+----------------+
|  user_id_str|info|whether_active|user_screen_name|
+-------------+----+--------------+----------------+
|  8.9107116E7|   F|            no|       Not found|
|  8.9633357E7|   M|            no|       Not found|
| 5.85782554E8|   M|            no|       Not found|
|3.923649083E9|   M|            no|       Not found|
|  4.1337472E7|   M|            no|       Not found|
|2.316449398E9|   F|            no|       Not found|
|   6.236402E7|   F|            no|       Not found|
| 4.18871874E8|   M|            no|       Not found|
|  8.1329096E7|   M|            no|       Not found|
|3.526380922E9|   M|            no|       Not found|
+-------------+----+--------------+----------------+
only showing top 10 rows



4500

In [22]:
# Writing final_experiment dataframe to csv file

final_experiment.toPandas().to_csv('final_experiment.csv', index=False)