# Setup (Run this block first)

In [1]:
DATA_PATH = 'datascience.stackexchange.com'
import preprocessing
from pyspark import SparkContext
import os
#os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/adoptopenjdk-8.jdk/Contents/Home' 
users = preprocessing.user_xml(f'{DATA_PATH}/Users.xml')
posts = preprocessing.post_xml(f'{DATA_PATH}/PostHistory.xml')

sc: SparkContext = SparkContext.getOrCreate()
user_rdd = sc.parallelize(users)
posts_rdd = sc.parallelize(posts)

# Question 1
__From the Users.xml file, find all users which are from Georgia and output to screen their DisplayName only.__

In [None]:
ga_users = user_rdd.filter(lambda s: (" GA" in s.Location or "Atlanta, Georgia" in s.Location)) 
for user in ga_users.collect():
    print(user.DisplayName)


# Question 2
__Using the Users.xml file, provide the count for all users which joined (CreationDate) in 2017. (30 points). Output this to the screen.__


In [None]:
users_2017 = user_rdd.filter(lambda s: "2017" in s.CreationDate)
print(f'{users_2017.count()} accounts created in 2017.')


# Question 3
__Using the PostHistory file, count the number of Posts that feature the words “Spark” and “Scala”. Output this to the screen.__

In [None]:
filtered_posts = posts_rdd.filter(lambda s: "scala" in s.Text.lower() and "spark" in s.Text.lower())
print(f'{filtered_posts.count()} posts')


# Question 4
__Using the PostHistory file, provide a total count of the words used by each distinct user. In other words, count all words in all posts for each user and display this to screen. You can only identify users by the UserID (30 points). You get 15 bonus points if you get the actual DisplayName of the user.__

In [2]:
from operator import add
display_names = user_rdd.map(lambda s: (s.Id, s.DisplayName))
post_counts = posts_rdd.map(lambda s: (s.UserId, len(s.Text.split())))
grouped_post_counts = post_counts.reduceByKey(add)
joined_rdd = grouped_post_counts.join(display_names).sortBy(lambda s: s[1][0], False)
print(f'{"UserId":<10}  {"DisplayName":<35}  WordCount')
for x in joined_rdd.collect():
    print(f'{x[0]:<10}  {x[1][1]:<35}  {x[1][0]}')

UserId      DisplayName                          WordCount
836         Neil Slater                          352120
67328       Esmailian                            200052
29575       Stephen Rauch                        196239
45264       n1k31t4                              145635
-1          Community                            125614
28175       Vaalizaadeh                          110133
43077       Kari                                 107538
29587       JahKnows                             83099
29169       Ethan                                67695
11097       Dawny33                              60274
8820        Martin Thoma                         57425
40853       Toros91                              49790
8878        Kasra Manshaei                       42751
381         Emre                                 42696
924         Anony-Mousse                         37619
71219       aranglol                             37187
4683        David Marx                           34245

# Question 5
__Using the users.xml, comments.xml and PostHistory.xml files, produce a single file that includes the following information: DisplayName, Number of Comments, total Score and Number of posts. This file should have the users (DisplayName) sorted by score, descending from higher to lower.__

In [7]:
from operator import add
comments = preprocessing.comments_xml(f'{DATA_PATH}/Comments.xml')
comments_rdd = sc.parallelize(comments)
mapped_rdd = comments_rdd.map(lambda s: (s.UserId, int(s.Score)))

comment_counts = mapped_rdd.countByKey()
cc_rdd = sc.parallelize([(k, v) for k, v in comment_counts.items()])
score_rdd = mapped_rdd.foldByKey(0, add)

post_counts = posts_rdd.map(lambda s: (s.UserId, s.Id)).countByKey()
pc_rdd = sc.parallelize([(k, v) for k, v in post_counts.items()])

final_rdd = pc_rdd.join(cc_rdd).join(score_rdd).join(display_names)

final_rdd = final_rdd.mapValues(lambda v: (v[0][0][0], v[0][0][1], v[0][1], v[1])).sortBy(lambda s: s[1][2], False)
with open('question5.csv', 'w') as fp:
    fp.write('UserId, DisplayName, PostCount, CommentCount, TotalScore\n ')
    for row in final_rdd.collect():
        fp.write(f'{row[0]}, {row[1][3]}, {row[1][0]}, {row[1][1]}, {row[1][2]}\n')
    
