# IDS 561 Homework 2
Isaac Salvador<br>UIN: 669845132

## Dataset
The first task prior to data manipulation is importing the necessary `spark` packages and the _**Amazon_Responded_Oct05.csv**_ file as a `spark` `rdd`.

In [1]:
from pyspark import SparkContext, SparkConf
import pandas as pd

conf = SparkConf().setAppName("IDS561_HW2").set("spark.driver.bindAddress", "127.0.0.1")
sc = SparkContext(conf=conf)
sc.setLogLevel("OFF")

# create initial rdd with raw csv file data
rdd = sc.textFile("data/Amazon_Responded_Oct05.csv")

23/10/03 19:07:39 WARN Utils: Your hostname, Isaacs-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.163 instead (on interface en0)
23/10/03 19:07:39 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/10/03 19:07:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


We can then use the `take()` method within in the `head()` function to see the first 5 records of the rdd:

In [2]:
def head(rdd, n: int=5) -> None:
    for i in rdd.take(n):
        print(i)


head(rdd)

id_str,tweet_created_at,user_screen_name,user_id_str,user_statuses_count,user_favourites_count,user_protected,user_listed_count,user_following,user_description,user_location,user_verified,user_followers_count,user_friends_count,user_created_at,tweet_language,text_,favorite_count,favorited,in_reply_to_screen_name,in_reply_to_status_id_str,in_reply_to_user_id_str,retweet_count,retweeted,text

'793270689780203520',Tue Nov 01 01:57:25 +0000 2016,SeanEPanjab,143515471,51287,4079,False,74,False,"Content marketer; Polyglot; Beard aficionado; Sikh. Persian, Catalan, French, Spanish. You'll find lol in the interstitial lulls of my tweets.",غریب الوطن,False,1503,850,Thu May 13 17:43:52 +0000 2010,en,@AmazonHelp Can you please DM me? A product I ordered last year never arrived.,0,False,AmazonHelp,,85741735,0,False,
'793281386912354304',Tue Nov 01 02:39:55 +0000 2016,AmazonHelp,85741735,2225450,11366,False,796,False,We answer Amazon support questions 7 days a week. Support available in English / D

It is now apparent that there are two issues with the `rdd`. Spark is not context-aware and the headers appear as an instance in the data, and there are empty rows scattered throughout the `rdd`. We can make use of the `.filter()` method to remove the errant rows.

In [3]:
# get first string in rdd
header = rdd.first()
column_names = header.split(",")

rdd1 = rdd\
    .filter(lambda x: x != header)\
    .filter(lambda x: x != '')

We can also conveniently show the indices and column names as a byproduct of this operation.

In [4]:
# print indices and columns of rdd1
print("index\tcolumn name")
print("-------------------")

for i, column in enumerate(column_names):
    print(f"{i}\t{column}")


index	column name
-------------------
0	id_str
1	tweet_created_at
2	user_screen_name
3	user_id_str
4	user_statuses_count
5	user_favourites_count
6	user_protected
7	user_listed_count
8	user_following
9	user_description
10	user_location
11	user_verified
12	user_followers_count
13	user_friends_count
14	user_created_at
15	tweet_language
16	text_
17	favorite_count
18	favorited
19	in_reply_to_screen_name
20	in_reply_to_status_id_str
21	in_reply_to_user_id_str
22	retweet_count
23	retweeted
24	text


We will now make the assumption that valid records in the dataset correspond to those whose whose lists created by the `split()` method contain 25 elements, as shown in the above column name summary. We can also use this operation to extract the six relevant columns `id_str`, `tweet_created_at`, `user_verified`, `favorite count`, `retweet_count`, and `text_` needed for further analysis.

In [5]:
relevant_indices = [0,1,11,17,22,16]

rdd2 = rdd1\
    .filter(lambda x: len(x.split(",")) == 25)\
    .map(lambda x: [x.split(",")[i] for i in relevant_indices])

print([column_names[i] for i in relevant_indices])
head(rdd2, 5)

['id_str', 'tweet_created_at', 'user_verified', 'favorite_count', 'retweet_count', 'text_']
["'793502854459879424'", 'Tue Nov 01 17:19:57 +0000 2016', 'True', '0', '0', '@SeanEPanjab Please give us a call/chat so we can look into this order for you: https://t.co/hApLpMlfHN. ^HB']
["'793513446633533440'", 'Tue Nov 01 18:02:03 +0000 2016', 'True', '0', '0', "@SeanEPanjab I'm not able to access account info here. Please reach out by Phone/Chat so we can look at this: https://t.co/EKXRLsnxJu ^GL"]
["'793299404975247360'", 'Tue Nov 01 03:51:31 +0000 2016', 'False', '0', '0', "@JeffBezos @amazonIN @AmazonHelp Tring...Tring...Tring Who's There? Your Suffering Customers from India Mr. Bezos...Get Up and Help!"]
["'793407430344310785'", 'Tue Nov 01 11:00:46 +0000 2016', 'False', '0', '0', '@AmazonHelp How many times do you want to write back to you guys??? Check the complaints through email regarding  Order 171-1338898-5999507.']
["'793423313674571776'", 'Tue Nov 01 12:03:53 +0000 2016', 'True'

## Task 1

### _Step 1_
To remove records where `"user_verfied"` is `"False"`, we use the `filter()` method accordingly. Subseqeuntly, the `map()` function is used to remove the `"user_verified"` column and convert the `"favorite_count"` and `"retweet_count"` columns to `int()`.

In [6]:
rdd3 = rdd2\
    .filter(lambda x: x[2] == "True")\
    .map(lambda x: [x[0], x[1], int(x[3]), int(x[4]), x[5]] )

print(f"Number of verified records: {rdd3.count()}")

Number of verified records: 100965


### _Step 2_
In order to group by `"tweet_created_at"`, we first perform string manipulation to extract the month and date in the form `"MMM 00"`.



In [7]:
rdd4 = rdd3.map(
    lambda x: [x[0], x[1][4:10], *x[2:]]
)

To obtain the number of tweets created per day, we apply the following `rdd` methods:
1. `groupBy()` – Records are grouped by the modified `tweet_created_at` column.
2. `map()` – Records are mapped to date and count of records per date.
3. `collect()` – `rdd4` is collected as a regular python object for analysis.

In [8]:
tweet_dates = rdd4\
    .groupBy(
        # use groupBy() to group by tweet_created_at
        lambda x: x[1]
    )\
    .map(
        # use map() to collect dates and number of corresponding records
        lambda x: (x[0], len(list(x[1])))
    ).collect()

# obtain the date with the most tweets
biggest_day, most_tweets = max(tweet_dates, key = lambda x: x[1])

print(f"{biggest_day} had {most_tweets} tweets, the highest number of tweets in the dataset.")

Jan 11 had 893 tweets, the highest number of tweets in the dataset.


### _Step 3_
#### Part 1: Sum of `"favoritecount"` and `"retweet_count"`_
To get the sum of `"favoritecount"` and `"retweet_count"` for each tweet on `biggest_day`, we perform the following operations on `rdd4`:
1. `filter()` – Filter tweets to those who were created on `biggest_day`
2. `map()` – Map `rdd5` to a set of key-value pairs that corresponds to `"id_str"` and sum of `favoritecount` and `retweet_count`


In [9]:
rdd5 = rdd4\
    .filter(
        lambda x: x[1] == biggest_day
    )\
    .map(
        lambda x: [x[0], x[2]+x[3]]
    )
    
head(rdd5, 10)

["'819265252453941249'", 0]
["'819006776070770690'", 0]
["'819154757193498624'", 1]
["'819194260243312640'", 0]
["'819251153112367105'", 0]
["'819235424476495872'", 1]
["'819167359734874112'", 0]
["'819045790878408704'", 0]
["'819048545508622336'", 0]
["'819256450543472640'", 0]


#### Part 2: _Text_ content of the top 100 tweets
To obtain the text contents of the top 100 tweets, we first construct `rdd6` by using the `groupBy` method on `rdd5`, sorted by the sum of `"favoritecount` and `"retweet_count_"`. We then perform a `leftOuterJoin()` on `rdd6` with `rdd4` using the key `"id_str"`. Finally, a `map()` method is used to extract the text from the resultant joined rdd.

In [10]:
rdd6 = rdd5.sortBy(
    lambda x: x[1], ascending=False
)

top_100_tweets = rdd6\
    .leftOuterJoin(
        rdd4.map(
            lambda x: [x[0], x[-1]]
        )
    )\
    .map(
        lambda x: x[1][1]
    ).take(100)
    
for tweet in top_100_tweets[:5]:
    print(tweet)

@darryl_edison Hello! These products were never sold on Amazon.in. We have escalated this (1/2) ^HK
@tonini30 We're very sorry for the multiple tries it took to resolve your issue! Thank you for the update! ^CC
@MrsKatEdd My apologies for this. Are you able to cancel it here?: https://t.co/EMoca4Sfya ^PK
@betagirl96 Getting your orders to you by the estimated delivery date is our priority. ^EM
@deepmahan Hi there! We're sorry you don't have your order yet. Did you happen to receive any correspondence on the delay? ^YP


#### Part 3: Word Frequency Counts
We can finally iterate through `top_100_tweets` to obtain the word (token) freqeuncy counts of the top 100 tweets that occurred on January 11.

In [11]:
# empty dict to fill with word frequency counts
word_freq_counts = {}

# loop through top 100 tweets
for tweet in top_100_tweets:
    # loop through tokens in tweets
    for token in tweet.split():
        # add token if not in dict
        if token not in word_freq_counts.keys():
            word_freq_counts[token] = 1
        # increase count if token in dict
        else:
            word_freq_counts[token] += 1

# sort by frequency
sorted_word_freq_counts = sorted(word_freq_counts.items(), key = lambda x: x[1], reverse=True)

# display
for entry in sorted_word_freq_counts[:10]:
    print(entry)

('to', 66)
('the', 62)
('you', 44)
('here:', 28)
('this', 27)
('your', 27)
('for', 26)
('us', 22)
('sorry', 21)
('can', 19)


## Task 2

In [12]:
find_text_rdd = sc.textFile("data/find_text.csv")

In [13]:
find_text_rdd.collect()

['id_str,text',
 "'793270689780203520',",
 "'793281386912354304',",
 "'793299404975247360',",
 "'793301295255945216',",
 "'793315815411978240',",
 "'793322306848292864',",
 "'793322433625415680',",
 "'793365409047023616',",
 "'793369654878232577',",
 "'793375905280393216',",
 "'793376242837823488',",
 "'793378044052406272',",
 "'793378188416131072',",
 "'793379112685568000',",
 "'793381418395136000',",
 "'793382930085253121',",
 "'793383832720474113',",
 "'793386133434593280',",
 "'793386974459682816',",
 "'793390636619759616',",
 "'793393912769576960',",
 "'793394384213532672',",
 "'793395133337198592',",
 "'793397280254472193',",
 "'793399638548242436',",
 "'793404385443348480',",
 "'793406096966819840',",
 "'793407060989865984',",
 "'793407430344310785',",
 "'793408494649081856',",
 "'793411263133061120',",
 "'793411871965601792',",
 "'793412238073815040',",
 "'793412334429540352',",
 "'793416007331315714',",
 "'793419622011719681',",
 "'793419953399533569',",
 "'793420070919766017'

In [26]:
header = find_text_rdd.first()

def remove_commas_except_first(index, iterator):
    for i, line in enumerate(iterator):
        if i == 0:
            yield line  # Keep the first element as is
        else:
            yield [line[0:-1], "placeholder"]  # Remove commas from the rest of the elements


find_text_rdd2 = find_text_rdd.mapPartitionsWithIndex(
    remove_commas_except_first
    )

find_text_rdd2.collect()

['id_str,text',
 ["'793270689780203520'", 'placeholder'],
 ["'793281386912354304'", 'placeholder'],
 ["'793299404975247360'", 'placeholder'],
 ["'793301295255945216'", 'placeholder'],
 ["'793315815411978240'", 'placeholder'],
 ["'793322306848292864'", 'placeholder'],
 ["'793322433625415680'", 'placeholder'],
 ["'793365409047023616'", 'placeholder'],
 ["'793369654878232577'", 'placeholder'],
 ["'793375905280393216'", 'placeholder'],
 ["'793376242837823488'", 'placeholder'],
 ["'793378044052406272'", 'placeholder'],
 ["'793378188416131072'", 'placeholder'],
 ["'793379112685568000'", 'placeholder'],
 ["'793381418395136000'", 'placeholder'],
 ["'793382930085253121'", 'placeholder'],
 ["'793383832720474113'", 'placeholder'],
 ["'793386133434593280'", 'placeholder'],
 ["'793386974459682816'", 'placeholder'],
 ["'793390636619759616'", 'placeholder'],
 ["'793393912769576960'", 'placeholder'],
 ["'793394384213532672'", 'placeholder'],
 ["'793395133337198592'", 'placeholder'],
 ["'79339728025447

In [27]:

find_text_rdd3 = find_text_rdd2\
    .leftOuterJoin(rdd2)

In [31]:
len(find_text_rdd.collect())

53928

In [36]:
find_text_rdd3.sortBy(lambda x: x[0]).collect()

[("'", ('8', None)),
 ("'793270689780203520'", ('placeholder', None)),
 ("'793281386912354304'", ('placeholder', None)),
 ("'793299404975247360'", ('placeholder', 'Tue Nov 01 03:51:31 +0000 2016')),
 ("'793301295255945216'", ('placeholder', None)),
 ("'793315815411978240'", ('placeholder', None)),
 ("'793322306848292864'", ('placeholder', None)),
 ("'793322433625415680'", ('placeholder', 'Tue Nov 01 05:23:02 +0000 2016')),
 ("'793365409047023616'", ('placeholder', None)),
 ("'793369654878232577'", ('placeholder', None)),
 ("'793375905280393216'", ('placeholder', 'Tue Nov 01 08:55:30 +0000 2016')),
 ("'793376242837823488'", ('placeholder', None)),
 ("'793378044052406272'", ('placeholder', None)),
 ("'793378188416131072'", ('placeholder', 'Tue Nov 01 09:04:35 +0000 2016')),
 ("'793379112685568000'", ('placeholder', 'Tue Nov 01 09:08:15 +0000 2016')),
 ("'793381418395136000'", ('placeholder', None)),
 ("'793382930085253121'", ('placeholder', None)),
 ("'793383832720474113'", ('placeholder

In [37]:
sc.stop()