# Big Data Programming - Assignment 2

## Using Stack Exchange - Datascience dump

In this assignment, We are going to use RDDs from pyspark to answer some questions regarding the data. We will use Lambda functions to perform required operations for most part.

### Importing required classes

We will import the following classes :

* 'SparkSession' to initialize the Spark Context, 
* 'xml.etree.ElementTree' for parsing the xml files from StackExchange Data Science dump, 
* 're' is the python standard library for regular expressions, 
* 'pprint' module is to format the printing of data

We will initialize sparkContext from SparkSession.builder

In [2]:
from pyspark.sql import SparkSession
import xml.etree.ElementTree as ET
import re
from pprint import pprint

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

### Reading in the Users.xml data into a RDD
We will parse the XML data using python standard library xml.etree. We will parse the XML into a list of dictionaries and then parallelize this list into a RDD. 

So this list will have a python dictionary in every row.

In [3]:
UsersXmlData = ET.parse('../data_files/datascience/Users.xml')
Users = sc.parallelize([item.attrib for item in UsersXmlData.getroot()])
print("No of records in Users.xml : ",Users.count())
print("Sample Record :")
pprint(Users.first(),width=100)

No of records in Users.xml :  66951
Sample Record :
{'AboutMe': "<p>Hi, I'm not really a person.</p>\n"
            '\n'
            "<p>I'm a background process that helps keep this site clean!</p>\n"
            '\n'
            '<p>I do things like</p>\n'
            '\n'
            '<ul>\n'
            '<li>Randomly poke old unanswered questions every hour so they get some '
            'attention</li>\n'
            '<li>Own community questions and answers so nobody gets unnecessary reputation from '
            'them</li>\n'
            '<li>Own downvotes on spam/evil posts that get permanently deleted</li>\n'
            '<li>Own suggested edits from anonymous users</li>\n'
            '<li><a href="http://meta.stackexchange.com/a/92006">Remove abandoned '
            'questions</a></li>\n'
            '</ul>\n',
 'AccountId': '-1',
 'CreationDate': '2014-05-13T21:29:22.820',
 'DisplayName': 'Community',
 'DownVotes': '1325',
 'Id': '-1',
 'LastAccessDate': '2014-05-13T21:29:22

### 1) From the Users.xml file, find all users which are from Georgia and output to screen their DisplayName only. (20 points)

We are using the functionailty of a Python dictionary, which is loaded into each row of the RDD. We will use 'filter' function to filter for lines with values 'GEORGIA' and 'GA'. 

We use function 'get' to find the value for a specific key, while handling the situation where we might not have that key in a specific row. 

In [20]:
GeorgiaUsers = Users.filter(lambda line : 
                            ('GEORGIA' in line.get('Location',"").upper() or 
                             'GA' in line.get('Location',"")))
GeorgiaUsers.persist()
print('No. of users from Georgia :', GeorgiaUsers.count())
print()
GeorgiaUserNames = GeorgiaUsers.map(lambda line : line['DisplayName'])
GeorgiaUsers.unpersist()
print('The Display names of Users from Georgia : ')
print()
pprint(GeorgiaUserNames.collect())

No. of users from Georgia : 167

The Display names of Users from Georgia : 

['Tony Boyles',
 'pkerl',
 'Nick Larsen',
 'gfritz',
 'Aleksandr Blekh',
 'Michael',
 'Ayush',
 'azoorob',
 'ontek',
 'Aravind R. Yarram',
 'ilya',
 'vkb',
 'Daisuke Aramaki',
 'tempusfugit',
 'Henry Crutcher',
 'Goddard',
 'Matt Simpson',
 'Peter Woolfitt',
 'Giorgi Gzirishvili',
 'matt biskup',
 'Jason W',
 'Peter Mourfield',
 'Magsol',
 'Bob Baxley',
 'Alex Azazel',
 'badjr',
 'mplunney',
 'YC Hu',
 'ryan',
 'Patrick Gerbes',
 'Ilya Lapitan',
 'Dan Anton',
 'pradyumnad',
 'Manish Ranjan',
 'Dato Janez',
 'Psidom',
 'Teresa Madsen',
 'Neuromeda',
 'Brandon',
 'jpm',
 'Mr. Rooter of Savannah',
 'Mr. Rooter of Southeast GA',
 'Abesalomi Gogatishvili',
 'Khiem Ha',
 'Jenna Kwon',
 'Tedo G.',
 'mlibre',
 'Ahmet Cecen',
 'Jeff',
 'Guy Gordon',
 'C3Theo',
 'niru dyogi',
 'Vinitha Palani',
 'Mac18',
 'Andrew',
 'Aditya Gogoi',
 'turtlemonvh',
 'Lewis Rodgers',
 'Tarun Luthra',
 'Jesse Scherer',
 'Devendra Lattu',
 

### 2) Using the Users.xml file, provide the count for all users which joined (CreationDate) in 2017. (30 points). Output this to the screen.

Using the CreationDate as key, we can find the value 2017 and filter for those lines.

In [5]:
Users2017 = Users.filter(lambda line : '2017' in line['CreationDate'])
print('Users who joined in 2017 : ', Users2017.count())

Users who joined in 2017 :  14239


### Reading in PostHistory.xml into a RDD

In [6]:
PostXmlData = ET.parse('../data_files/datascience/PostHistory.xml')
PostHistory = sc.parallelize([item.attrib for item in PostXmlData.getroot()])
print('No of Records in PostHistory.xml : ',PostHistory.count())
print("Sample Record :")
print()
pprint(PostHistory.first(),width=100)

No of Records in PostHistory.xml :  121522
Sample Record :

{'CreationDate': '2014-05-13T23:58:30.457',
 'Id': '7',
 'PostHistoryTypeId': '2',
 'PostId': '5',
 'RevisionGUID': '009bca93-fce2-44ed-a277-a8452650a627',
 'Text': "I've always been interested in machine learning, but I can't figure out one thing about "
         'starting out with a simple "Hello World" example - how can I avoid hard-coding '
         'behavior?\r\n'
         '\r\n'
         'For example, if I wanted to "teach" a bot how to avoid randomly placed obstacles, I '
         "couldn't just use relative motion, because the obstacles move around, but I don't want "
         'to hard code, say, distance, because that ruins the whole point of machine learning.\r\n'
         '\r\n'
         'Obviously, randomly generating code would be impractical, so how could I do this?',
 'UserId': '5'}


### 3) Using the PostHistory file, count the number of Posts that feature the words “Spark” and “Scala”. Output this to the screen. (20 points)

In the 'Text' column which contains the actual post, we are filtering for posts with either the word SPARK and SCALA and printing the count of them.

In [7]:
PostSearch = PostHistory.filter(lambda line : 
                                ('SPARK' in line.get('Text',"").upper() and 
                                 'SCALA' in line.get('Text',"").upper()))
print("No of Posts containing the words 'SCALA' or 'SPARK' : ", PostSearch.count())

No of Posts containing the words 'SCALA' or 'SPARK' :  211


### 4) Using the PostHistory file, provide a total count of the words used by each distinct user. In other words, count all words in all posts for each user and display this to screen. You can only identify users by the UserID (30 points). You get 15 bonus points if you get the actual DisplayName of the user.

We will use the regular expressions for this question. 
* The RegEx - '\w+' evaluates to matching all alphanumeric characters along with '_'.
* The 'findall' function works on iterating through the string to separate all the matches to provide us a list of separated strings.
* We use Map function to make a RDD consisting UserID, Count of words from Post and '1'(a arbitrary value to be used to get the count of posts for future use). 
* We use 'reduceByKey' to summarize the number of words in the posts and the arbitrary '1' value for total post count and show the total of those values by key, which is 'UserId' here.
* We make another RDD just containing UserId and DisplayName using the map function.
* Now we join the RDD with PostCount and DisplayName using UserID as key to get all the data into a single RDD.

* Finally we will use map one more time to print the data in user friendly format

In [21]:
PostCount = PostHistory.map(lambda line : 
                            (line.get('UserId',"No User ID"),
                             (len(re.findall(r'\w+', line.get('Text',""))),1)))

PostsByUser = PostCount.reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1]))
print('User ID count from Summary : ', PostsByUser.count())

UserDisplayName = Users.map(lambda line : (line['Id'],line.get('DisplayName','No Display Name')))
PostsByName = UserDisplayName.join(PostsByUser)

PostsByNameOut = PostsByName.map(lambda line : line[1][0]+'('+line[0]+') - '+str(line[1][1][0]))
print('DISPLAYNAME(USERID) - Count of Words')
print()
pprint(PostsByNameOut.collect(),width=50)

User ID count from Summary :  14101
DISPLAYNAME(USERID) - Count of Words

['buruzaemon(24) - 1406',
 'Alex I(26) - 12280',
 'Robert Cartaino(50) - 22',
 'Clayton(53) - 801',
 'Dan C(64) - 107',
 'Apollo(70) - 323',
 'Srikar Appalaraju(77) - 2087',
 'Damian Melniczuk(82) - 1920',
 'Igor Bobriakov(88) - 400',
 'Bryan(102) - 287',
 'rapaio(108) - 25344',
 'vefthym(113) - 954',
 'Johnny000(115) - 2462',
 'Marc Claesen(119) - 462',
 'Stu(141) - 557',
 'Memming(154) - 68',
 'Jay Godse(157) - 62',
 'blunders(158) - 3836',
 'Gregor(160) - 122',
 'user179(179) - 41',
 'Oleksi(180) - 55',
 'Cici(192) - 149',
 'sushant-hiray(226) - 211',
 'blue-sky(237) - 914',
 'Chris Simokat(249) - 141',
 'phyrox(278) - 123',
 'Alireza(283) - 245',
 'miraculixx(389) - 2651',
 'alaiacano(414) - 209',
 'idclark(426) - 603',
 'hroptatyr(451) - 441',
 'erogol(464) - 277',
 'Stanpol(478) - 374',
 'user490(490) - 38',
 'Brown_Dynamite(496) - 313',
 'azza-bazoo(508) - 58',
 'tilo.wiklund(536) - 92',
 'indico(548) - 75

### Reading in Comments.xml into a RDD

In [9]:
CommentsXmlData = ET.parse('../data_files/datascience/Comments.xml')
Comments = sc.parallelize([item.attrib for item in CommentsXmlData.getroot()])
print('No of Records in Comments.xml : ',Comments.count())
print('Sample Record :')
pprint(Comments.first(),width=50)

No of Records in Comments.xml :  41722
Sample Record :
{'CreationDate': '2014-05-14T00:23:15.437',
 'Id': '5',
 'PostId': '5',
 'Score': '9',
 'Text': 'this is a super theoretical AI '
         'question. An interesting discussion! '
         'but out of place...',
 'UserId': '34'}


### 5) GRADUATE STUDENTS: Using the users.xml, comments.xml and PostHistory.xml files, produce a single file that includes the following information: DisplayName, Number of Comments, total Score and Number of posts. This file should have the users (DisplayName) sorted by score, descending from higher to lower. (40 points)

We can divide this question into 4 components :
* Getting the Summary of Total Score and Number of Comments by User
* Joining the Summary of Comments with Summary of Posts from previous question with key as 'UserId'
* Joining the resultant of previous step to 'UserDisplayName' RDD to get the Display names for all users.
* Sorting the Final dataset consisting of User ID, Display Name, Total Score, Number of comments, Number of Posts by using the Total Score in descending


For the first 2 steps : (Please look at code for format of data at each step)
* We can start by taking the fields UserId, Score and Arbitrary '1'(for comment count) into a new RDD 'CommentsScore'
* We will apply reduceByKey to 'CommentsScore', to get a summary of Total Score and No of comments by UserID


* Now we will join the reduced Comments RDD with the reduced Post Count RDD
    * We use a 'fullOuterJoin' for this purpose because, there could be users who have made comments but have never made any posts and vice versa.
    * But since we are using a 'fullOuterJoin' we can expect 'None' values where a User might have missing values in either of the RDDs.
    * We defined a function 'map_null' to handle all those 'None' values, by filling the missing values with '0's and also to convert the data into a single key,value tuple for further operations.

In [10]:
CommentsScore = Comments.map(lambda line : 
                             (line.get('UserId',"No User ID"),
                              (int(line.get('Score',0)),1)))
#Format - (UserID,(Score,1))

ScoreByUser = CommentsScore.reduceByKey(lambda x,y: (x[0]+y[0], x[1]+y[1]))
#Format - (UserID,(Total Score,No. of Comments))

#Function to handle the Null values because of fullOuterJoin
def map_null(line) :
  arr = []
  for x in line :
    if x is not None :
      for y in x :
        if y is not None : arr.append(y)
        else : arr.append(0)
    else :
      for i in range(2) : arr.append(0)
  
  return(tuple(arr))
        
ScoreAndPosts = PostsByUser.fullOuterJoin(ScoreByUser).map(lambda line : (line[0],map_null(line[1])))
#Format - (UserId,(Total Words,No. of Posts,Total Score,No. of Comments))

For the next 2 steps : (Please look at code for format of data at each step)
* Now we can join the RDD from last step -ScoreAndPosts- with -UserDisplayName- RDD to get the Display Names of the users.
* By using 'sortBy' method and the position of 'Total Score' as key, data is sorted in descending order.


Finally, we will now export the resulting dataset consisting all the necessary fields into a csv file. 

We can sure use the built-in function 'saveAsTextFile()' to export the RDD, but the problem with this, spark divides the data into parts and exports in multiple files. So to get a single file, we can use python File libraries. To do that, We will collect the data into memory and then iterate through each row to add it to the file after necessary conversions. So that, the file is ready to be read for any other purposes in multiple platforms.

In [19]:
FinalUserScore = UserDisplayName.join(ScoreAndPosts)
#Format - (UserID,(DisplayName,(Total Words,No. of Posts,Total Score,No. of Comments)))

FinalScoreSorted = FinalUserScore.sortBy(lambda line : line[1][1][2],False)

FinalScoreOut = FinalScoreSorted.map(lambda line : (line[0],line[1][0],line[1][1][3],line[1][1][2],line[1][1][1]))
print('UserID, DisplayName, No.of Comments, Total Score, No.of Posts\n')
pprint(FinalScoreOut.take(10))

### Code to output an RDD to a text-file ###
#Format - (UserID,DisplayName,No. of Comments,Total Score,No. of Posts)

Fileout = open("FinalScore.csv","w")
Fileout.write("Id,DisplayName,NoofComments,TotalScore,NoofPosts\n")
for line in FinalScoreOut.collect() :
    Fileout.write(','.join(str(var) for var in line))
    Fileout.write('\n')

print('\nThe data is successfully exported to the file :',Fileout.name)
Fileout.close()

### Built-in function ###
#FinalScoreOut.saveAsTextFile("FinalScore.csv")

UserID, DisplayName, No.of Comments, Total Score, No.of Posts

[('381', 'Emre', 1246, 684, 440),
 ('836', 'Neil Slater', 1199, 544, 1180),
 ('471', 'Spacedman', 325, 177, 111),
 ('28175', 'Vaalizaadeh', 839, 170, 1249),
 ('21', 'Sean Owen', 345, 150, 1193),
 ('8820', 'Martin Thoma', 277, 132, 648),
 ('35644', 'Aditya', 627, 123, 221),
 ('924', 'Anony-Mousse', 463, 117, 447),
 ('23305', 'oW_', 255, 101, 421),
 ('11097', 'Dawny33', 285, 95, 702)]

The data is successfully exported to the file : FinalScore.csv
