<a href="https://colab.research.google.com/github/rashidesai24/Data-Science-Projects/blob/master/PySpark%20with%20Amazon%20Tweets.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


**Installing Spark**

To run spark in Colab, we need to first install all the dependencies in Colab environment i.e. Apache Spark 2.3.2 with hadoop 2.7, Java 8 and Findspark to locate the spark in the system. The tools installation can be carried out inside the Colab notebook. Code reference:

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null   #Installing Spark

In [None]:
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.6.tgz # Get Spark Installer version

In [None]:
!tar xf spark-2.4.7-bin-hadoop2.6.tgz # Untarring Spark Installer

In [None]:
!pip install -q findspark

Once Spark and Java are installed in Colab, we set an environment path that enables to run Pyspark in the Colab environment. Setting the location of Java and Spark:

In [6]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.6"

Running a local  session to test the Spark installation:

In [7]:
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
# Colab is ready to run Pyspark!

Importing Amazon Responded data and selecting the required columns



In [8]:
data = spark.read.format("csv").option("header","true").load("drive/My\ Drive/Colab\ Notebooks/Amazon_Responded_Oct05.csv")
data1 = data.select("id_str",'tweet_created_at','user_verified','favorite_count','retweet_count','text_')

***Step 1: Remove the records where “user_verified” is “FALSE”.***

We will only keep the records where user_verified is TRUE: verified tweets

In [9]:
data_filtered = data1.filter(data1.user_verified == 'False')
data_filtered.count() # Counting the number of tweets whereverified users

193220

In [10]:
data_filtered = data1.filter(data1.user_verified == 'True')
data_filtered.count() # Counting the number of tweets with only vaild/verified users
# Keeping the records where “user_verified” is True.

171797

In [11]:
data_filtered.select("tweet_created_at").show(5,False) #Displaying the top 5 rows for tweet times from the "tweet_created_at" column

+------------------------------+
|tweet_created_at              |
+------------------------------+
|Tue Nov 01 02:39:55 +0000 2016|
|Tue Nov 01 17:19:57 +0000 2016|
|Tue Nov 01 17:25:26 +0000 2016|
|Tue Nov 01 18:02:03 +0000 2016|
|Tue Nov 01 03:59:02 +0000 2016|
+------------------------------+
only showing top 5 rows



Splitting the date column to parse month date and year

In [12]:
from pyspark.sql.functions import split   
tweet = split(data_filtered["tweet_created_at"], ' ')
data_filtered1 = data_filtered.withColumn('Month', tweet.getItem(1))
data_filtered1 = data_filtered1.withColumn('Date', tweet.getItem(2))
data_filtered1 = data_filtered1.withColumn('Year', tweet.getItem(5))
data_filtered1.show(5)

+--------------------+--------------------+-------------+--------------+-------------+--------------------+-----+----+----+
|              id_str|    tweet_created_at|user_verified|favorite_count|retweet_count|               text_|Month|Date|Year|
+--------------------+--------------------+-------------+--------------+-------------+--------------------+-----+----+----+
|'793281386912354304'|Tue Nov 01 02:39:...|         True|             0|            0|@SeanEPanjab I'm ...|  Nov|  01|2016|
|'793502854459879424'|Tue Nov 01 17:19:...|         True|             0|            0|@SeanEPanjab Plea...|  Nov|  01|2016|
|'793504235400884224'|Tue Nov 01 17:25:...|         True|             0|            0|@SeanEPanjab With...|  Nov|  01|2016|
|'793513446633533440'|Tue Nov 01 18:02:...|         True|             0|            0|@SeanEPanjab I'm ...|  Nov|  01|2016|
|'793301295255945216'|Tue Nov 01 03:59:...|         True|             0|            0|@aakashwangnoo Hi...|  Nov|  01|2016|
+-------

In [13]:
import pyspark.sql.functions as sq 
df = data_filtered1.withColumn("tweet_created_at",sq.concat(sq.col("Month"), sq.lit(" "), sq.col("Date"),sq.lit(" "), sq.col("Year")))
df = df.select('id_str','tweet_created_at','user_verified',df.favorite_count.cast('int'), df.retweet_count.cast('int'),'text_')
df.show(5) # Show only the first 5 rows

+--------------------+----------------+-------------+--------------+-------------+--------------------+
|              id_str|tweet_created_at|user_verified|favorite_count|retweet_count|               text_|
+--------------------+----------------+-------------+--------------+-------------+--------------------+
|'793281386912354304'|     Nov 01 2016|         True|             0|            0|@SeanEPanjab I'm ...|
|'793502854459879424'|     Nov 01 2016|         True|             0|            0|@SeanEPanjab Plea...|
|'793504235400884224'|     Nov 01 2016|         True|             0|            0|@SeanEPanjab With...|
|'793513446633533440'|     Nov 01 2016|         True|             0|            0|@SeanEPanjab I'm ...|
|'793301295255945216'|     Nov 01 2016|         True|             0|            0|@aakashwangnoo Hi...|
+--------------------+----------------+-------------+--------------+-------------+--------------------+
only showing top 5 rows



***Step 2: For the records (“user_verified” is “TRUE”), group by created date, and count the number of tweets for each date***

In [14]:
tweet_count = df.groupby(df.tweet_created_at).agg(sq.count('id_str').alias("count_of_tweets"))
tweet_count.show(10)
# Output will be the dates of the tweets created by date and the count

+----------------+---------------+
|tweet_created_at|count_of_tweets|
+----------------+---------------+
|     Dec 01 2016|            875|
|     Dec 25 2016|            433|
|     Feb 08 2017|            926|
|     Feb 19 2017|            725|
|     Mar 30 2017|           1109|
|     Apr 26 2017|            295|
|     Dec 05 2016|            824|
|     Dec 18 2016|            178|
|     Jul 06 2017|            673|
|     Mar 20 2017|            558|
+----------------+---------------+
only showing top 10 rows



In [15]:
import pyspark
conf = pyspark.SparkConf()
sc = pyspark.SparkContext.getOrCreate(conf=conf)
from pyspark.sql import SQLContext
sqlcontext = SQLContext(sc)

tweet_count.registerTempTable("tmpcounts")
tweet_count_ordered = sqlcontext.sql("SELECT * FROM tmpcounts order by count_of_tweets desc limit 10")
tweet_count_ordered.show()

+----------------+---------------+
|tweet_created_at|count_of_tweets|
+----------------+---------------+
|     Jan 03 2017|           1536|
|     Jan 10 2017|           1508|
|     Jan 11 2017|           1496|
|     Jan 12 2017|           1410|
|     Jan 06 2017|           1364|
|     Jan 07 2017|           1360|
|     Jan 20 2017|           1336|
|     Mar 02 2017|           1296|
|     Jan 13 2017|           1295|
|     Jan 21 2017|           1290|
+----------------+---------------+



***Step 3: For the date with highest number of tweets (you can figure it out from step 2), calculate the sum of “favorite_count” and “retweet_count” for each tweet on that day. Then report the text content (“text_”) of the top 100 tweets with highest sum. Count the word frequency of the 100 tweets and report the result.***

In [16]:
df.registerTempTable("tmpdf")
# Writing a SQL query to retrieve 100 favorite count and retweet count where 
sum_of_retweets = sqlcontext.sql("SELECT text_,favorite_count+ retweet_count as total from tmpdf where tweet_created_at = 'Jan 03 2017' order by total desc limit 100")
sum_of_retweets.show(10,False)
text = sum_of_retweets.toPandas()
a =text["text_"].tolist()

+--------------------------------------------------------------------------------------------------------------------------------------------+-----+
|text_                                                                                                                                       |total|
+--------------------------------------------------------------------------------------------------------------------------------------------+-----+
|@amazon worst shopping  experience,  no service, no substantial reply to complaints, no delivery for 1 week post guarantee date.            |5    |
|@ItsJosshA We always aim to deliver by the date given in your confirmation email. Have we missed that date? Any update in tracking?  ^NF    |3    |
|@ItsJosshA Oh no! I'm sorry! Please reach out to us so that we can look into options: https://t.co/hApLpMlfHN. ^JO                          |2    |
|@KStefl Sounds like you know what to add to your Halloween playlist for this year! ^BV                   

As a good practice to the data processing, it is important that we clean the data. For the same, defining a function remove punctuation, extra symbols from the usernames

In [17]:
# Importing required libraries
import nltk
import pandas as pd
import string
import re
nltk.download('punkt') # Downloading package punkt
from nltk.tokenize import sent_tokenize, word_tokenize
nltk.download('stopwords')
from collections import Counter

def cleaning(textfile):
    y = textfile
    cleaned=[]
    for i in y:
      # print(i)
      i.replace("'", " ")  #step 1: replacing apostraphe
      i=re.sub(r"\@\w+"," ",i)   #step2: removing words starting with @
      cleaned.append(i.translate(str.maketrans(string.punctuation,' '*len(string.punctuation)))) # step 3: remove_punctuation                   
    output=[]
    for i in cleaned:
        output.append(" ".join([w.lower() for w in i.split()  if w.isalpha()]))    

    return output


[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [18]:
clean_text = cleaning(a)
from pyspark import SparkContext
sc =SparkContext.getOrCreate()
k = clean_text
tuple(k)
rdd = sc.parallelize(k)
result_rdd = rdd.flatMap(lambda line: line.split(" ")) \
    .map(lambda word: (word, 1)) \
    .reduceByKey(lambda a, b: a + b) \
    .collect()
print(result_rdd)
tweet_result_rdd=pd.DataFrame(result_rdd)
tweet_result_rdd.to_csv(r"/content/tweet_result_rdd.csv")

[('worst', 1), ('no', 5), ('service', 1), ('substantial', 1), ('delivery', 6), ('week', 1), ('we', 46), ('always', 3), ('in', 13), ('confirmation', 1), ('have', 18), ('update', 2), ('tracking', 5), ('fancy', 1), ('already', 1), ('don', 5), ('look', 7), ('pass', 2), ('details', 6), ('https', 44), ('co', 45), ('lil', 1), ('sounds', 1), ('like', 7), ('know', 13), ('playlist', 1), ('this', 27), ('year', 2), ('bv', 2), ('d', 7), ('help', 9), ('when', 5), ('connect', 1), ('us', 28), ('matt', 1), ('friendshipgoals', 1), ('oh', 1), ('i', 23), ('out', 6), ('into', 6), ('options', 3), ('hear', 4), ('as', 4), ('let', 7), ('doesn', 5), ('arrive', 6), ('tomorrow', 2), ('an', 8), ('ar', 3), ('apologies', 2), ('incorrect', 1), ('item', 5), ('using', 2), ('above', 2), ('do', 4), ('further', 5), ('concerns', 1), ('ca', 1), ('thanks', 11), ('shout', 1), ('looking', 2), ('improve', 1), ('share', 1), ('specific', 1), ('feedback', 7), ('via', 4), ('form', 1), ('sj', 6), ('delay', 1), ('indicate', 1), ('new

# TASK 2
Join the text content of each tweet according to “id_str” to Amazon_Responded_Oct05.csv and fill in the “text” column.

In [20]:
# Loading and reading the find_text.csv file
findtext_data= spark.read.format("csv").option("header","true").load("drive/My\ Drive/Colab\ Notebooks/find_text.csv")
findtext_data.show(5,False)

+--------------------+----+
|id_str              |text|
+--------------------+----+
|'793270689780203520'|null|
|'793281386912354304'|null|
|'793299404975247360'|null|
|'793301295255945216'|null|
|'793315815411978240'|null|
+--------------------+----+
only showing top 5 rows



In [21]:
# The output needs to have tweet IDs stitched to its corresponding text response from the Amazon Tweets text file.
findtext_data.registerTempTable("ids")
data.registerTempTable("tweets")
out = sqlcontext.sql("SELECT DISTINCT I.id_str,T.text_ from ids I JOIN tweets T on I.id_str = T.id_str")
out.show(5,False)
output = out.toPandas()
output.to_csv(r"/content/join_output.csv") # Output the file as a CSV

+--------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
|id_str              |text_                                                                                                                                    |
+--------------------+-----------------------------------------------------------------------------------------------------------------------------------------+
|'793382930085253121'|@mybharatraj Hi! Sorry about that. I'd like our team to look into this, please reach out to us via: https://t.co/vlvfJr4nN9 ^SG          |
|'793441656984903680'|@AmazonHelp done, please contact them and let them know I am waiting for my package.                                                     |
|'793517259880861696'|Your customer service sucks, @AmazonHelp. If you guys can't do your job, perhaps go back to school and learn something proper.           |
|'793533066157387776'|@flamablebro