In [1]:
import pandas as pd

file = './Amazon_Responded_Oct05.csv'

In [2]:
from pyspark import SparkContext

sc = SparkContext("local", "First App")

In [3]:
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession

sql = SQLContext(sc)
spark = SparkSession.builder.master("local").appName("Word Count").getOrCreate()


#df = spark.read.load(file,format="csv", inferSchema="true", header="true")
df = sql.read.format('com.databricks.spark.csv').options(header='true', inferschema='true', quote='"', delimiter=',', multiLine = 'true', escape='"').load(file)
#df = spark.read.format("csv").option("header", "true").option("delimiter", ",").option("escape", ",").load(file)
df.count()

462030

In [5]:
df_data = df.select('id_str', 'tweet_created_at','user_verified','favorite_count','retweet_count', 'text_')
df_data


DataFrame[id_str: string, tweet_created_at: string, user_verified: boolean, favorite_count: int, retweet_count: double, text_: string]

In [6]:
filter_data = df_data[df_data['user_verified'] == "True"]

In [14]:
#df.select('text_').distinct().sort('text_').show()
filter_data.count()
#filter_data.select('user_verified').distinct().sort('user_verified').show()

171899

In [8]:
from datetime import datetime
from pyspark.sql.functions import udf, to_date, to_utc_timestamp
from pyspark.sql.types import StringType

def get_format(x):
    return str(datetime.strptime(x, '%a %b %d %H:%M:%S %z %Y').strftime('%b %d'))

date_fn = udf(get_format, StringType())

df_test_2 = filter_data.withColumn("tweet_created_at", date_fn("tweet_created_at"))

In [9]:
print(df_test_2.take(2))

df_test_2.select('tweet_created_at').distinct().show()

[Row(id_str="'793281386912354304'", tweet_created_at='Nov 01', user_verified=True, favorite_count=0, retweet_count=0.0, text_="@SeanEPanjab I'm sorry, we're unable to DM you. Was this order purchased on https://t.co/nUUp5MLhYl, or one of our other sites? ^CL"), Row(id_str="'793502854459879424'", tweet_created_at='Nov 01', user_verified=True, favorite_count=0, retweet_count=0.0, text_='@SeanEPanjab Please give us a call/chat so we can look into this order for you: https://t.co/hApLpMlfHN. ^HB')]
+----------------+
|tweet_created_at|
+----------------+
|          Nov 27|
|          Mar 22|
|          Apr 16|
|          Apr 27|
|          May 24|
|          Mar 01|
|          Apr 10|
|          May 27|
|          Nov 01|
|          Nov 06|
|          Apr 24|
|          Dec 10|
|          Mar 11|
|          May 15|
|          May 29|
|          Jun 29|
|          Jul 12|
|          Nov 04|
|          Aug 05|
|          Jan 21|
+----------------+
only showing top 20 rows



In [10]:
# GroupBy using dataframe
temp = df_test_2.groupby(df_test_2.tweet_created_at).count()
temp.take(10)

[Row(tweet_created_at='Nov 27', count=208),
 Row(tweet_created_at='Mar 22', count=506),
 Row(tweet_created_at='Apr 16', count=632),
 Row(tweet_created_at='Apr 27', count=394),
 Row(tweet_created_at='May 24', count=843),
 Row(tweet_created_at='Mar 01', count=459),
 Row(tweet_created_at='Apr 10', count=863),
 Row(tweet_created_at='May 27', count=95),
 Row(tweet_created_at='Nov 01', count=370),
 Row(tweet_created_at='Nov 06', count=357)]

In [11]:
#GroupBy and count using RDD object
filter_data_rdd = df_test_2.rdd
filter_data_rdd.count()

import pyspark.sql.functions as func
grouped_data = filter_data_rdd.map(lambda x: (x[1],1)).groupByKey()

In [15]:
# Statement prints key and number of 1's that key occurs

#print(list((j[0], list(j[1])) for j in grouped_data.take(5)))
freq_tweet = grouped_data.mapValues(sum).map(lambda x: (x[1],x[0])).sortByKey(False)

In [16]:
freq_tweet.collect()

[(1536, 'Jan 03'),
 (1508, 'Jan 10'),
 (1496, 'Jan 11'),
 (1410, 'Jan 12'),
 (1364, 'Jan 06'),
 (1360, 'Jan 07'),
 (1342, 'Jan 20'),
 (1298, 'Mar 02'),
 (1295, 'Jan 13'),
 (1292, 'Jan 21'),
 (1290, 'Jan 14'),
 (1286, 'Jan 18'),
 (1279, 'Dec 15'),
 (1259, 'Jan 24'),
 (1249, 'Nov 18'),
 (1201, 'Dec 03'),
 (1196, 'Jan 02'),
 (1192, 'Jun 27'),
 (1190, 'Jul 04'),
 (1175, 'Jan 19'),
 (1163, 'Jan 25'),
 (1149, 'Jan 23'),
 (1143, 'Jan 08'),
 (1124, 'Jan 17'),
 (1120, 'May 11'),
 (1112, 'Jul 03'),
 (1109, 'Mar 30'),
 (1089, 'Apr 05'),
 (1083, 'Jan 26'),
 (1080, 'Jan 27'),
 (1074, 'Jun 20'),
 (1071, 'Jan 04'),
 (1058, 'Nov 05'),
 (1058, 'Feb 22'),
 (1054, 'Feb 09'),
 (1045, 'Jun 23'),
 (1040, 'Jun 30'),
 (1024, 'Jan 22'),
 (1020, 'May 03'),
 (1019, 'Mar 08'),
 (1019, 'Jul 29'),
 (1018, 'Mar 29'),
 (1016, 'Apr 18'),
 (1007, 'Jan 16'),
 (1006, 'Jan 15'),
 (1004, 'Jun 07'),
 (1003, 'Jan 09'),
 (987, 'Feb 02'),
 (982, 'Jun 02'),
 (979, 'Dec 13'),
 (979, 'Dec 10'),
 (978, 'Dec 26'),
 (968, 'Jan 30'),

In [17]:
# Part 2: calculate the sum of “favorite_count” and “retweet_count” for each tweet on that day 
#(For the date with highest number of tweets)

max_date = freq_tweet.map(lambda x: x[1]).take(1)[0]
max_date

'Jan 03'

In [18]:
filter_data_max_date = filter_data_rdd.filter(lambda x: x[1] == max_date)

In [19]:
filter_data_max_df = filter_data_max_date.toDF()
filter_data_max_df.count()

1536

In [20]:
print(filter_data_max_df)

from pyspark.sql.types import IntegerType
filter_data_max_df = filter_data_max_df.withColumn("favorite_count", filter_data_max_df["favorite_count"].cast(IntegerType()))
filter_data_max_df = filter_data_max_df.withColumn("retweet_count", filter_data_max_df["retweet_count"].cast(IntegerType()))

print(filter_data_max_df)


DataFrame[id_str: string, tweet_created_at: string, user_verified: boolean, favorite_count: bigint, retweet_count: double, text_: string]
DataFrame[id_str: string, tweet_created_at: string, user_verified: boolean, favorite_count: int, retweet_count: int, text_: string]


In [21]:
#new_test = filter_data_max_df.withcolumn('sum', reduce(add, ))
df_3 = filter_data_max_df.withColumn('sum_test', sum([filter_data_max_df[col] for col in ["favorite_count","retweet_count"]]))
df_3

DataFrame[id_str: string, tweet_created_at: string, user_verified: boolean, favorite_count: int, retweet_count: int, text_: string, sum_test: int]

In [22]:
#df_3.take(10)
df_3.select('favorite_count','retweet_count', 'sum_test').distinct().show()
df_3.count()

+--------------+-------------+--------+
|favorite_count|retweet_count|sum_test|
+--------------+-------------+--------+
|             0|            1|       1|
|             0|            0|       0|
|             1|            0|       1|
|             1|            1|       2|
|             4|            1|       5|
|             2|            1|       3|
|             2|            0|       2|
+--------------+-------------+--------+



1536

In [23]:
from pyspark.sql.functions import col
sorted_data = df_3.sort(col('sum_test').desc())
sorted_data

DataFrame[id_str: string, tweet_created_at: string, user_verified: boolean, favorite_count: int, retweet_count: int, text_: string, sum_test: int]

In [24]:
top_100_rows = sorted_data.select('id_str', 'text_', 'sum_test').limit(100)
top_100_rows

DataFrame[id_str: string, text_: string, sum_test: int]

In [25]:
# Count words using pyspark dataframe
import pyspark.sql.functions as f

top_100_rows = top_100_rows.withColumn('word count', f.size(f.split(f.col('text_'), ' ')))
top_100_rows

DataFrame[id_str: string, text_: string, sum_test: int, word count: int]

In [26]:
top_100_rows.take(5)

[Row(id_str="'816329761530093568'", text_='@amazon worst shopping  experience,  no service, no substantial reply to complaints, no delivery for 1 week post guarantee date.', sum_test=5, word count=21),
 Row(id_str="'816083406962434048'", text_='@ItsJosshA We always aim to deliver by the date given in your confirmation email. Have we missed that date? Any update in tracking?  ^NF', sum_test=3, word count=25),
 Row(id_str="'816086117938319360'", text_="@ItsJosshA Oh no! I'm sorry! Please reach out to us so that we can look into options: https://t.co/hApLpMlfHN. ^JO", sum_test=2, word count=19),
 Row(id_str="'816095108013654017'", text_='@KStefl Sounds like you know what to add to your Halloween playlist for this year! ^BV', sum_test=2, word count=16),
 Row(id_str="'816109446069911554'", text_='@Schoey1992 Happy Birthday, Matt! #FriendsForever #FriendshipGoals ^JO', sum_test=2, word count=7)]

In [27]:
top_100_rows_rdd_count = top_100_rows.select("text_").rdd.flatMap(lambda x: x[0].split(' ')).map(lambda x: (x, 1)).reduceByKey(lambda x,y: x+y)

In [41]:
top_100_rows_rdd_count.take(10)

[('@amazon', 1),
 ('worst', 1),
 ('shopping', 3),
 ('', 9),
 ('experience,', 1),
 ('no', 4),
 ('service,', 1),
 ('substantial', 1),
 ('reply', 2),
 ('to', 62)]

In [29]:
file_2 = './find_text.csv'

write_df = spark.read.load(file_2,format="csv", inferSchema="true", header="true")
write_df

DataFrame[id_str: string, text: string]

In [30]:
write_df.count()

53928

In [31]:
df_1_join = df_data.select('id_str', 'text_')
df_2_join = write_df.select('id_str')

In [32]:
from pyspark.sql.functions import col

temp_join = df_1_join.alias('a').join(df_2_join.alias('b'),col('a.id_str') == col('b.id_str'))
join_output = temp_join.select([col('a.'+xx) for xx in df_1_join.columns])

#+ [col('b.'+xx) for xx in df_2_join.columns]

In [33]:
join_output.count()

53927

In [42]:
join_output.write.csv('./submit_out.csv')

In [43]:
top_100_rows_rdd_count.toDF().write.csv('./word_freq_submit.csv')

In [40]:
# question 2 
max_date

'Jan 03'