<font size='4'><b>Notebook content:</b> An end-to-end solution to the Data Incubator's spark mini-project.</font>

</br></br>

In [0]:
# Importing libraries.
import datetime as dt
import numpy as np
import pyspark
import re
from collections import namedtuple
from lxml import etree

from pyspark import SparkContext, SparkConf
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import Word2Vec, HashingTF, Tokenizer, StopWordsRemover, CountVectorizer, IDF, RegexTokenizer
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.evaluation import BinaryClassificationEvaluator

from pyspark.sql import functions as F
from pyspark.sql import SQLContext
from pyspark.sql.window import Window 

In [0]:
print(sc)

### Bad XML

In [0]:
# Setting the path.
import os
def localpath(path):
    return 'file://' + str(os.path.abspath(os.path.curdir)) + '/' + path

In [0]:
# Parsing function to remove bad XML.
def row_start(line_current):
    line_strip = line_current.strip().strip('\n')
    return (line_strip.find("<row ")==0) 

In [0]:
#lines = sc.textFile(localpath('/spark-stats-data/allPosts/'))
start_rows = lines.filter(lambda x: row_start(x))
rows=lines.filter(lambda x: is_row(x))

In [0]:
rows.count()

In [0]:
# Calculating bad XML count.
abs(rows.count()-start_rows.count())

In [0]:
# Feed to the grader through this function.
def bad_xml():
    return 237

<br/><br/>

### Favorites and score

In [0]:
#def is_row(line_current):
 #   line_strip = line_current.strip().strip('\n')
  #  return (line_strip.find("<row ")==0 and line_strip[-3:]==" />")

In [0]:
# Do it at the very beginning as it will restart the python kernel.
%pip install lxml

In [0]:
from collections import namedtuple

In [0]:
def is_row(line_current):
    line_strip = line_current.strip().strip('\n')
    return (line_strip.find("<row ")==0 and line_strip[-3:]==" />")

In [0]:
# Reading posts data into an RDD.
rdd_posts = sc.textFile('/FileStore/tables/part_00000_xml.gz')
rdd_posts.count()

In [0]:
# Function to parse scores.
def fetch_scores(line): 
    
    #line_encoded = line.strip().encode('utf-8')
    named_tuple = namedtuple('named_tuple', ['FavoriteCount','score','count'])
    root = etree.fromstring(line)
    
    try: 
        FavoriteCount = int(root.attrib['FavoriteCount'])
    except: 
        FavoriteCount = int(0)
    
    try: 
        score = int(root.attrib['Score'])
    except: 
        score = int(0)
    
    single = named_tuple(FavoriteCount,score, 1)
    
    return  (single.FavoriteCount, (single.score, 1))

In [0]:
# Function to parse posts.
def parse_posts(line):
    line = line.encode('utf-8') 
    try: 
        root = etree.fromstring(line)
        return etree.tostring(root)
    except: 
        return None

In [0]:
parsed_posts = rdd_posts.filter(lambda x: is_row(x)) \
               .map(parse_posts) \
               .filter(lambda x: x is not None)

In [0]:
# Combining scores and posts.
combined = parsed_posts.map(fetch_scores)\
                .reduceByKey(lambda x,y: tuple(np.add(x,y)))\
                .sortByKey()\
                .take(50)

In [0]:
combined

<br/><br/>

### Answer percentage

In [0]:
rdd_posts = sc.textFile(localpath('/spark-stats-data/allPosts/')) 
rdd_posts.count()

In [0]:
rdd_users = sc.textFile(localpath('/spark-stats-data/allUsers/'))
rdd_users.count()

In [0]:
# Function to parse users.
def parse_users(line):
    if '  <row'in line:
        try:
            root = ET.fromstring(line.encode('utf8'))
        except:
            pass
            return ("Empty")
            
        if root != '':
            if "Id" and "Reputation" in root.attrib:
                return(root.attrib["Id"], int(root.attrib["Reputation"]))
            else:
                return("Empty")
        else:
            return("Empty")
    else:
        return("Empty")

In [0]:
parsed_users = rdd_users.map(parse_users).filter(lambda x: x!= 'Empty')
parsed_users.count()

In [0]:
# Function to parse posts.
def parse_posts(line):
    if '  <row'in line:
        try:
            root = ET.fromstring(line.encode('utf8'))
        except:
            pass
            return ("Empty")
            
        if root != '':
            if "OwnerUserId" and "PostTypeId" in root.attrib:
                type = root.attrib["PostTypeId"]
                try:
                    owner = root.attrib["OwnerUserId"]
                except:
                    pass
                    return ("Empty")
                if type == '2':
                    type_count = [1.,0.,1.]
                else:
                    if type == '1':
                        type_count = [0.,1.,1.]
                    else:
                        return("Empty")
                return(owner, type_count)
            else:
                return("Empty")
        else:
            return("Empty")
    else:
        return("Empty")

In [0]:
parsed_posts = rdd_posts.map(parse_posts).filter(lambda x: x!= 'Empty')\
        .reduceByKey(lambda x, y: np.add(x,y))

parsed_posts.count()

In [0]:
parsed_posts = rdd_posts.map(parse_posts).filter(lambda x: x!= 'Empty')\
        .reduceByKey(lambda x, y: np.add(x,y))

parsed_posts.count()

In [0]:
# Combining posts and users.
combined = parsed_posts.join(parsed_users).map(lambda x: (x[1][1], x[1][0], int(x[0])))\
        .map(lambda x: (x[0],(x[2], x[1][0]/x[1][2]))).sortByKey(False)\
        .map(lambda x: x[1]).collect()
print(combined[:99])

In [0]:
# Feed to the grader in this format.
total = parsed_posts.map(lambda x: ("t",x[1])).reduceByKey(lambda x, y: np.add(x,y)).collect()
sum_of_total = total[0]
fraction = (-1, sum_of_total[1][0]/sum_of_total[1][2])
print(fraction)

<br/><br/>

### First Question

In [0]:
rdd_users = sc.textFile('/FileStore/tables/user_part_00000_xml.gz')
rdd_users.count()

In [0]:
rdd_posts = sc.textFile('/FileStore/tables/part_00000_xml.gz') 
rdd_posts.count()

In [0]:
rdd_users = sc.textFile(localpath('/spark-stats-data/allUsers/'))
rdd_users.count()

In [0]:
rdd_posts = sc.textFile(localpath('/spark-stats-data/allPosts/')) 
rdd_posts.count()

In [0]:
def is_row(line_current):
    line_strip = line_current.strip().strip('\n')
    return (line_strip.find("<row ")==0 and line_strip[-3:]==" />")

In [0]:
# Function to parse users.
def parse_users(line): 
    
    line= line.encode('utf-8')
    named_tuple = namedtuple('named_tuple', ['userID','reputation', 'user_creation_date'])
    
    try: 
        root = etree.fromstring(line)
    except: 
        root= None
    
    try: 
        userID = root.attrib['Id']
    except: 
        userID = None
    
    try: 
        reputation = int(root.attrib['Reputation'])
    except: 
        reputation = 0
    
    try: 
        date_object = root.attrib['CreationDate'].split('T')
        date_concat = date_object[0] + ' ' + date_object[1]
        user_creation_date = dt.datetime.strptime(date_concat, '%Y-%m-%d %H:%M:%S.%f') 
    except: 
        user_creation_date = None
    
    single = named_tuple(userID, reputation, user_creation_date)
    return single


In [0]:
parsed_users= rdd_users.filter(lambda x: is_row(x))\
                    .map(parse_users) \
                    .filter(lambda x: x[0] != None and x[2] != None) \
                    .map(lambda x: (x.userID, (x.reputation, x.user_creation_date)))

In [0]:
# Function to parse posts.
def parse_posts(line):
    line = line.encode('utf-8') 
    try: 
        root = etree.fromstring(line)
        return etree.tostring(root)
    except: 
        return None

In [0]:
# Function to extract dates.
def fetch_dates(line):
    
    #row = row.encode('utf-8')
    named_tuple_posts = namedtuple('named_tuple_posts', ['userID','question', 'question_creation_date'])
    
    try: 
        root = etree.fromstring(line)
    except: 
        root = None
        
    try: 
        userID = root.attrib['OwnerUserId']
    except: 
        userID = None   
    
    try: 
        if root.attrib['PostTypeId'] == '1':
            question = 'Yes'
        else: 
            question = None
    except: 
        question = None
    
    try: 
        date_object = root.attrib['CreationDate'].split('T')
        date_concat = date_object[0] + ' ' + date_object[1]
        question_creation_date = dt.datetime.strptime(date_concat, '%Y-%m-%d %H:%M:%S.%f')
    except: 
        question_creation_date = None
    
    single = named_tuple_posts(userID, question, question_creation_date)
    return single    
    

In [0]:
parsed_posts = rdd_posts.filter(lambda x: is_row(x)) \
               .map(parse_posts) \
               .filter(lambda x: x is not None)

In [0]:
# Combining post creation date with posts.
dates .parsed_posts_date = parsed_posts.map(fetch_dates) \
                .filter(lambda x: x[0] != None and x[1] != None and x[2] != None) \
                .map(lambda x: (x.userID, x.question_creation_date))

In [0]:
# Comparing user creation date against post creation date.
combined = parsed_posts_date.reduceByKey(lambda x, y: min(x, y)) \
                        .join(parsed_users) \
                        .map(lambda x: (int(x[0]), (x[1][0] - x[1][1][1]).days, x[1][1][0])) \
                        .takeOrdered(100, key=lambda x: -x[2])

In [0]:
combined

</br></br>

### Identify Veterans

In [0]:
# Function to parse users.
def parse_users(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line.encode('utf8'))
        except:
            pass
            return ("Empty")
            
        if root != '':
            if "Id" and "CreationDate" in root.attrib:
                return(root.attrib["Id"], root.attrib["CreationDate"])
            else:
                return("Empty")
        else:
            return("Empty")
    else:
        return("Empty")

In [0]:
parsed_users = rdd_users.map(parse_users).filter(lambda x: x!= 'Empty')
parsed_users.count()

In [0]:
# Function to parse posts.
def parse_posts(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line.encode('utf8'))
        except:
            pass
            return ("Empty")            
        if root != '':
            if "CreationDate" in root.attrib:
                try:
                    return (root.attrib["OwnerUserId"], root.attrib["CreationDate"])
                except:
                    pass
                    return ("Empty")
            else:
                return("Empty")
        else:
            return("Empty")
    else:
        return("Empty")

In [0]:
parsed_posts = rdd_posts.map(parse_posts).filter(lambda x: x!= 'Empty')

parsed_posts.count()

In [0]:
# Function to parse questions.
def parse_questions(line):
    if '<row'in line:
        try:
            root = ET.fromstring(line.encode('utf8').strip())
        except:
            pass
            return ("Empty")
            
        if root != '':
            if "OwnerUserId" in root.attrib:
                p_type = root.attrib["PostTypeId"]
                owner = root.attrib["OwnerUserId"]
                if p_type == '1':
                    score = float(root.attrib["Score"])
                    views = float(root.attrib["ViewCount"])
                    answers = float(root.attrib["AnswerCount"])
                    create_date = root.attrib["CreationDate"] 
                    favs = 0.
                    try:
                        favs = float(root.attrib["FavoriteCount"])
                    except:
                        pass

                    return(owner, [create_date, score, views, answers, favs])
                else:
                    return("Empty")
            else:
                return("Empty")
        else:
            return("Empty")
    else:
        return("Empty")

In [0]:
parsed_questions = rdd_posts.map(parse_questions).filter(lambda x: x!= "Empty")\
            .reduceByKey(min).map(lambda x: (x[0],x[1][1:]))
parsed_questions.count()

In [0]:
parsed_questions.take(10)

In [0]:
from datetime import datetime
import time

In [0]:
# Function to find whether user has posted between 100 and 150 days.
def date_conversion (date1, date2):  
    obj1= datetime.strptime(date1, "%Y-%m-%dT%H:%M:%S.%f")
    obj2= datetime.strptime(date2, "%Y-%m-%dT%H:%M:%S.%f")
    diff = obj2- obj1
    diff_sec = diff.total_seconds()
    if diff_sec > 8640000 and diff_sec < 12960000:
        return 1
    else:
        return 0

In [0]:
# More than 1 post is veteran otherwise brief user.
def transform(x):
    if x[1]>=1:
        veteran =1
    else:
        veteran= 0
    return (x[0],veteran)

In [0]:
# Combining users and posts.
combined = parsed_users.join(parsed_posts).map(lambda x: (x[0], date_conversion(x[1][0], x[1][1])))\
        .reduceByKey(lambda x,y: x+y).map(transform)

In [0]:
combined.take(10)

In [0]:
# Combining classification and questions.
veterans = combined.join(parsed_questions).map(lambda x: (x[1][0], x[1][1]+[1]))\
            .reduceByKey(lambda x,y: np.add(x,y))\
            .map(lambda x: (x[0], [x[1][0]/x[1][4], x[1][1]/x[1][4], x[1][2]/x[1][4], x[1][3]/x[1][4]]))

In [0]:
veterans.take(5)

</br></br>

### Identify veterans for full data/stack

In [0]:
rdd_posts = sc.textFile(localpath('/spark-stack-data/allPosts/')) 
rdd_posts.count()

In [0]:
rdd_users = sc.textFile(localpath('/spark-stack-data/allUsers/')) 
rdd_users.count()

In [0]:
# Function to parse users.
def parse_users(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line.encode('utf8'))
        except:
            pass
            return ("Empty")
            
        if root != '':
            if "Id" and "CreationDate" in root.attrib:
                return(root.attrib["Id"], root.attrib["CreationDate"])
            else:
                return("Empty")
        else:
            return("Empty")
    else:
        return("Empty")

In [0]:
parsed_users = rdd_users.map(parse_users).filter(lambda x: x!= 'Empty')
parsed_users.count()

In [0]:
parsed_users.take(10)

In [0]:
# Function to parse posts.
def parse_posts(line):
    if '<row' in line:
        try:
            root = ET.fromstring(line.encode('utf8'))
        except:
            pass
            return ("Empty")            
        if root != '':
            if "CreationDate" in root.attrib:
                try:
                    return (root.attrib["OwnerUserId"], root.attrib["CreationDate"])
                except:
                    pass
                    return ("Empty")
            else:
                return("Empty")
        else:
            return("Empty")
    else:
        return("Empty")

In [0]:
parsed_posts = rdd_posts.map(parse_posts).filter(lambda x: x!= 'Empty')

parsed_posts.count()

In [0]:
parsed_posts.take(20)

In [0]:
# Function to parse questions.
def parse_questions(line):
    if '<row'in line:
        try:
            root = ET.fromstring(line.encode('utf8').strip())
        except:
            pass
            return ("Empty")
            
        if root != '':
            if "OwnerUserId" in root.attrib:
                p_type = root.attrib["PostTypeId"]
                owner = root.attrib["OwnerUserId"]
                if p_type == '1':
                    score = float(root.attrib["Score"])
                    views = float(root.attrib["ViewCount"])
                    answers = float(root.attrib["AnswerCount"])
                    create_date = root.attrib["CreationDate"] 
                    favs = 0.
                    try:
                        favs = float(root.attrib["FavoriteCount"])
                    except:
                        pass

                    return(owner, [create_date, score, views, answers, favs])
                else:
                    return("Empty")
            else:
                return("Empty")
        else:
            return("Empty")
    else:
        return("Empty")

In [0]:
parsed_questions = rdd_posts.map(parse_questions).filter(lambda x: x!= "Empty")\
            .reduceByKey(min).map(lambda x: (x[0],x[1][1:]))
parsed_questions.count()

In [0]:
parsed_questions.take(10)

In [0]:
# Function to find whether user has posted between 100 and 150 days.
from datetime import datetime
import time

def date_conversions(date1, date2):  
    obj1 = datetime.strptime(date1, "%Y-%m-%dT%H:%M:%S.%f")
    obj2= datetime.strptime(date2, "%Y-%m-%dT%H:%M:%S.%f")
    diff = obj2- obj1
    diff_sec = diff.total_seconds()
    if diff_sec > 8640000 and diff_sec < 12960000:
        return 1
    else:
        return 0

In [0]:
# More than 1 post is veteran otherwise brief user.
def transform(x):
    if x[1]>=1:
        veteran =1
    else:
        veteran = 0
    return (x[0],veteran)

In [0]:
# Combining users and posts.
combined = parsed_users.join(parsed_posts).map(lambda x: (x[0], date_conversion(x[1][0], x[1][1])))\
        .reduceByKey(lambda x,y: x+y).map(transform)
combined.count()

In [0]:
combined.take(40)

In [0]:
# Combining classification and questions.
veterans = combined.join(parsed_questions).map(lambda x: (x[1][0], x[1][1]+[1]))\
            .reduceByKey(lambda x,y: np.add(x,y))\
            .map(lambda x: (x[0], [x[1][0]/x[1][4], x[1][1]/x[1][4], x[1][2]/x[1][4], x[1][3]/x[1][4]]))

In [0]:
veterans.take(5)

</br></br>

###  Word2Vec for ggplot2.

In [0]:
# Setting vector size and random seed as per the problem statement.
word2vec = Word2Vec().setVectorSize(100).setSeed(42)

In [0]:
def is_row(line_current):
    line_strip = line_current.strip().strip('\n')
    return (line_strip.find("<row ")==0 and line_strip[-3:]==" />")

In [0]:
# Function to extract tags.
def fetch_tags(line):
    root = etree.XML(line)
    tags = []
    if 'Tags' in root.attrib: 
        Tag_string = root.get('Tags')
        tags = Tag_string.replace('&lt;', ' ').replace('&gt;', ' ').replace('<', ' ').replace('>', ' ').split()
    else:
        return None
    return tags

In [0]:
tags = sc.textFile(localpath('spark-stack-data/allPosts'))\
        .filter(is_row)\
        .map(fetch_tags)\
        .filter(lambda x: x is not None)

tags.count()

In [0]:
model = word2vec.fit(tags)

In [0]:
# Finding synonyms.
synonyms = model.findSynonyms('ggplot2', 25)

for word, cosine_distance in synonyms:
    print("{} {}".format(word, cosine_distance))

In [0]:
result='''
lattice 0.914175089362
r-grid 0.853013366773
plotmath 0.844089403526
boxplot 0.839753503564
plotrix 0.831518218593
ecdf 0.830968580308
gmisc 0.824988746884
levelplot 0.824531900738
density-plot 0.824492297613
melt 0.816001808733
gridextra 0.813623201172
line-plot 0.809634432145
loess 0.807918811886
rgl 0.804110386348
tapply 0.803792142344
ggvis 0.801683385182
mgcv 0.801153310268
r-factor 0.798146535195
quantile 0.797029189706
performanceanalytics 0.794787815957
weibull 0.792841571028
ggdendro 0.791935147398
categorical-data 0.790621594458
ggmap 0.790548134704
standard-error 0.788530333391
'''

</br></br>

### Classifications

In [0]:
def is_row(line_current):
    line_strip = line_current.strip().strip('\n')
    return (line_strip.find("<row ")==0 and line_strip[-3:]==" />")

In [0]:
train = sc.textFile('/FileStore/tables/part_00001').filter(lambda x: is_row(x))
test= sc.textFile('/FileStore/tables/part_00002').filter(lambda x: is_row(x))

In [0]:
train = sc.textFile(localpath('/spark-stats-data/train/')).filter(lambda x: is_row(x))
test= sc.textFile(localpath('/spark-stats-data/test/')).filter(lambda x: is_row(x))

In [0]:
# Function to extract tags.
def fetch_tags(row):

    row = row.strip().encode('utf-8')

    try: 
        root = etree.fromstring(row)
        post_id = int(root.attrib['Id'])
        tags = root.attrib['Tags']
        return (post_id, tags)  
    except Exception: 
        return None 

In [0]:
# Extracting the top 10 tags from training set.
alpha_numeric = re.compile('[a-zA-Z0-9-\']+') 
tags_10 = train.map(fetch_tags) \
          .filter(lambda x: x is not None) \
          .map(lambda line: (alpha_numeric.findall(line[1].lower()),)) \
          .flatMap(lambda x: [(tag, 1) for tags in x for tag in tags]) \
          .reduceByKey(lambda x,y: x+y) \
          .takeOrdered(10, key = lambda x: -x[1])

In [0]:
tags_10_words = [i[0] for i in tags_10]
tags_10_words

In [0]:
# Function check whether a given tag is in top 10 tags or not.
def within_top(line): 
    
    for tag in tags_10_words:
        if tag in line:
            return 1
    return 0

In [0]:
# Function to extract body and tag.
def fetch_body_and_tags(line):
    
    line = line.strip().encode('utf-8')
    
    try: 
        root = etree.fromstring(line)
        body = root.attrib['Body']
        tags = root.attrib['Tags']
        post_type = int(root.attrib['PostTypeId'])
        return (body, tags, post_type)  
    except Exception: 
        return None

In [0]:
training = train.map(fetch_body_and_tags) \
                                .filter(lambda x: x is not None and x[2] == 1) \
                                .map(lambda x: (x[0], alpha_numeric.findall(x[1].lower()))) \
                                .map(lambda x: (x[0], within_top(x[1]))) \
                                .toDF(['body', 'label']) \
                                .cache()
training.show(3)

In [0]:
# Function to extract body and ID.
def fetch_id_body(line):

    line = line.strip().encode('utf-8')
    
    try: 
        root = etree.fromstring(line)
        Id = int(root.attrib['Id'])
        body = root.attrib['Body']
        post_type = int(root.attrib['PostTypeId'])
        return (Id, body,post_type)   
    except Exception: 
        return None

In [0]:
testing = test.map(fetch_id_body) \
                        .filter(lambda x: x is not None and x[2] == 1) \
                        .toDF(['ID', 'body']) \
                        .select('ID', 'body')
testing.show(3)

In [0]:
tokenizer = Tokenizer(inputCol="body", outputCol="words")
stopwords_remover = StopWordsRemover(inputCol=tokenizer.getOutputCol(), outputCol="words_minus_stopwords")
hashingTF = HashingTF(inputCol=stopwords_remover.getOutputCol(), outputCol="features", numFeatures = 2000)

In [0]:
train_tokens = tokenizer.transform(training)
train_tokens_nostopwords = stopwords_remover.transform(train_tokens)
train_hashes = hashingTF.transform(train_tokens_nostopwords)

In [0]:
test_tokens = tokenizer.transform(testing)
test_tokens_nostopwords = stopwords_remover.transform(test_tokens)
test_hashes = hashingTF.transform(test_tokens_nostopwords)

In [0]:
# Building a logistic regression model.
logreg = LogisticRegression(maxIter=10, regParam=0.08)
model = logreg.fit(train_hashes)

In [0]:
prediction = model.transform(test_hashes)
selected = prediction.select("ID", "body", "prediction")

In [0]:
selected_sorted = selected.sort("ID").collect()
selected_list = [int(x[2]) for x in selected_sorted]
selected_list[:10]

In [0]:
classification = selected_list
classification[:20]

### Model tuning

In [0]:
tokenizer = Tokenizer(inputCol="body", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")

tokens = tokenizer.transform(training)
hashes = hashingTF.transform(tokens)

test_tokens = tokenizer.transform(testing)
test_hashes = hashingTF.transform(test_tokens)

In [0]:
pipeline = Pipeline(stages=[tokenizer, hashingTF, logreg])

In [0]:
paramGrid = (ParamGridBuilder()  
    .addGrid(hashingTF.numFeatures, [2000])
    .addGrid(logreg.regParam, [0.5, 0.1, 0.05])
    .build())

In [0]:
# Cross validating through a pipeline.
cross_validator = CrossValidator(estimator=pipeline,
                          estimatorParamMaps=paramGrid,
                          evaluator=BinaryClassificationEvaluator(),
                          numFolds=6)

In [0]:
cvModel = cross_validator.fit(training)

In [0]:
better_prediction = cvModel.transform(testing)
better_prediction.show(3)

In [0]:
better_selected = better_prediction.select("ID", "body", "prediction")
better_selected = better_selected.sort("ID")
better_selected.show(3)

<br/><br/>

<font size='4'><b>Final thoughts:</b> Whole approach boils down to two broader tasks: parsing the given XML files followed by applying suitable aggregations. In Word2Vec and classification problems, NLP/ML is also involved. Because data are huge, it is advisable to code for sample files and later scale it up to save time. Further, K-means problem is not solved because it is ungraded. </font> 