## Initialize Cluster

In [None]:
!python --version

Python 3.7.0


In [None]:
!which pip

/global/software/jupyterhub-spark/anaconda3/bin/pip


In [None]:
import os
import atexit
import sys
import time

import pyspark
from pyspark.context import SparkContext
from pyspark.sql import SQLContext
import findspark
from sparkhpc import sparkjob

#Exit handler to clean up the Spark cluster if the script exits or crashes
def exitHandler(sj,sc):
    try:
        print('Trapped Exit cleaning up Spark Context')
        sc.stop()
    except:
        pass
    try:
        print('Trapped Exit cleaning up Spark Job')
        sj.stop()
    except:
        pass

findspark.init()

#Parameters for the Spark cluster
nodes=1
tasks_per_node=4 
memory_per_task=4096 #4 gig per process, adjust accordingly
# Please estimate walltime carefully to keep unused Spark clusters from sitting 
# idle so that others may use the resources.
walltime="1:00" #1 hours
#os.environ['SBATCH_PARTITION']='cpu2019' #Set the appropriate ARC partition

sj = sparkjob.sparkjob(
     ncores=nodes*tasks_per_node,
     cores_per_executor=tasks_per_node,
     memory_per_core=memory_per_task,
     walltime=walltime
    )

sj.wait_to_start()
time.sleep(60)
sc = sj.start_spark()

#Register the exit handler                                                                                                     
atexit.register(exitHandler,sj,sc)

#You need this line if you want to use SparkSQL
sqlCtx=SQLContext(sc)

INFO:sparkhpc.sparkjob:Submitted batch job 237

INFO:sparkhpc.sparkjob:Submitted cluster 1


# Examples

## Assignment 1


In [None]:
!pwd

/home/ajoy.das/projects/spark


In [None]:
DATA_PATH = '/home/ajoy.das/projects/spark'

In [None]:
import nltk
nltk.download('punkt')

[nltk_data] Downloading package punkt to /home/ajoy.das/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


True

In [None]:
nltk.download('stopwords')
from nltk.corpus import stopwords
stop_en = stopwords.words('english')


[nltk_data] Downloading package stopwords to
[nltk_data]     /home/ajoy.das/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [None]:
import pandas as pd
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
from pyspark.sql import Row
from bs4 import BeautifulSoup
 
from pyspark.sql.functions import udf
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
import pyspark.sql.functions as f

In [None]:
sqlContext = sqlCtx
spark = sqlCtx

@udf("String")
def preprocess_udf(body):
    body = BeautifulSoup(body)
 
    # noise
    urls  =  body.find_all('a')
    if len(urls) > 0: body.a.clear()
 
    codes = body.find_all('code')
    if len(codes) > 0: body.code.clear()
 
    pres = body.find_all('pre')
    if len(pres) > 0: body.pre.clear()
 
        #s = body.find_all('p')
        #or p in ps:
    text = body.get_text()
 
    words = []
    text = text.lower()
    sents = nltk.sent_tokenize(text)
    for sent in sents:
      for word in nltk.word_tokenize(sent):
        if word in stop_en: continue
        if len(word) < 3: continue
        words.append(word)
    return " ".join(words)
 
class SOAnalysis(object):
  
  def __init__(self, progLang):
    pass
 
  def getFile(self):
    file_location = DATA_PATH + "/SO-"+progLang+".csv"
    self.df = spark.read.option("header", True).option("wholeFile", True).option("escape", "\"").option("multiLine", True).csv(file_location)
    self.df_prog_body = self.df.select('Body').toPandas()
    
  def tokenizeAndPreprocess(self, text):
    
    text = text.lower()
    sents = nltk.sent_tokenize(text)
    for sent in sents:
      for word in nltk.word_tokenize(sent):
        if word in stop_en: continue
        if len(word) < 3: continue
        self.words.append(word)
 
  def doWordCount(self):
    bodies = []
    self.words = []
    isnullBody = pd.isnull(self.df_prog_body["Body"])
    noiseTags = ["a", "code", "pre"]
    for index, row in self.df_prog_body.iterrows():
      if isnullBody[index]: continue
      body = row['Body']
      body = BeautifulSoup(body)
 
      # noise
      urls  =  body.find_all('a')
      if len(urls) > 0: body.a.clear()
      
      codes = body.find_all('code')
      if len(codes) > 0: body.code.clear()
      
      pres = body.find_all('pre')
      if len(pres) > 0: body.pre.clear()
 
      #s = body.find_all('p')
      #or p in ps:
      bodies.append(body.get_text())
    for body in bodies:
      self.tokenizeAndPreprocess(body)
    print("total words = ", len(self.words))
    rdd1 = sc.parallelize(self.words)
    row_rdd = rdd1.map(lambda x: Row(x))
    wdf=sqlContext.createDataFrame(row_rdd,['word'])
    ddg = wdf.groupBy('word').count()
    ddg.orderBy(['count'], ascending=[0]).show(10)
  
  def getPctAcceptedAnswer(self):
    totalAccepted = self.df.where('AcceptedAnswerId is not null').count()
    totalQuestions = self.df.count()
    if totalQuestions != 0:
      pct = totalAccepted * 100.0/ totalQuestions
    else:
      pct = None
    print("Percentage of Accepted Answer = %.2f"%(pct))
  
  def getQuestionType(self):
    dfTitle = self.df.where('Title is not null').select('Title')
    qTotal = dfTitle.count()
    qWhy = dfTitle.where("Title like 'why %'").count()
    qHow = dfTitle.where("Title like 'how %'").count()
    qWhat = dfTitle.where("Title like 'what %'").count()
    qOther = qTotal - (qWhy + qHow + qWhat)
    if qTotal != 0:
      pctQWhy = qWhy * 100.0/ qTotal
      pctQHow = qHow * 100.0 / qTotal
      pctQWhat = qWhat * 100.0 / qTotal
      pctQOther = qOther * 100.0 / qTotal
      #print(pctQWhat, pctQWhy, pctQHow, pctQOther)
    else:
      pctQWhy = N, pctQHow, pctQWhat, pctQOther = None, None, None, None
    print("Pct What = %.2f. Why = %.2f. How = %.2f. Other = %.2f"%(pctQWhat, pctQWhy, pctQHow, pctQOther))
  
  def doWordCountSparkOnly(self):
    dfBody = self.df.select('Body', preprocess_udf("Body").alias("body_cleaned")).drop("Body")
    dfBody.withColumn('word', f.explode(f.split(f.col('body_cleaned'), ' '))).groupBy('word').count().sort('count', ascending=False).show(10)

In [None]:
progLang = 'Python'
soAnalysis = SOAnalysis(progLang)
soAnalysis.getFile()
soAnalysis.df_prog_body.head()


Unnamed: 0,Body
0,<p>I'd like to do some server-side scripting u...
1,<p>Can you please tell me how much is <code>(-...
2,<p>I am using <code>win32com</code> in python ...
3,<p>I'm using pip to install Python libraries o...
4,<p>I want to change my Anaconda Prompt User fi...


In [None]:
%%time
soAnalysis.doWordCount()

total words =  3506294
+------+-----+
|  word|count|
+------+-----+
|  file|33425|
|  code|30444|
|python|29591|
|  like|24039|
| using|23305|
|   n't|21634|
|  line|21368|
|   get|20192|
|  data|20045|
|  want|19666|
+------+-----+
only showing top 10 rows

CPU times: user 3min 29s, sys: 1.28 s, total: 3min 30s
Wall time: 3min 36s


In [None]:
%%time
soAnalysis.doWordCountSparkOnly()

+------+-----+
|  word|count|
+------+-----+
|  file|33425|
|  code|30444|
|python|29591|
|  like|24039|
| using|23305|
|   n't|21634|
|  line|21368|
|   get|20192|
|  data|20045|
|  want|19666|
+------+-----+
only showing top 10 rows

CPU times: user 26.6 ms, sys: 10.7 ms, total: 37.3 ms
Wall time: 3min 25s


In [None]:
%%time
soAnalysis.getPctAcceptedAnswer()

Percentage of Accepted Answer = 51.10
CPU times: user 0 ns, sys: 2.34 ms, total: 2.34 ms
Wall time: 1.65 s


In [None]:
%%time
soAnalysis.getQuestionType()

Pct What = 0.13. Why = 0.19. How = 1.59. Other = 98.10
CPU times: user 4.03 ms, sys: 19 µs, total: 4.05 ms
Wall time: 3.14 s


## Assignment 2

In [None]:
!pip install --user pyspellchecker

Collecting pyspellchecker
[?25l  Downloading https://files.pythonhosted.org/packages/64/c7/435f49c0ac6bec031d1aba4daf94dc21dc08a9db329692cdb77faac51cea/pyspellchecker-0.6.2-py3-none-any.whl (2.7MB)
[K    100% |████████████████████████████████| 2.7MB 1.9MB/s 
[?25hInstalling collected packages: pyspellchecker
Successfully installed pyspellchecker-0.6.2
[33mYou are using pip version 10.0.1, however version 21.3.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [None]:
import nltk
nltk.download('punkt')


[nltk_data] Downloading package punkt to /home/ajoy.das/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [None]:
nltk.download('stopwords')
from nltk.corpus import stopwords
stop_en = stopwords.words('english')


[nltk_data] Downloading package stopwords to
[nltk_data]     /home/ajoy.das/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [None]:
import pandas as pd
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
from pyspark.sql import Row
from bs4 import BeautifulSoup
 
from pyspark.sql.functions import udf
from pyspark.ml.feature import Tokenizer
from pyspark.ml.feature import StopWordsRemover
import pyspark.sql.functions as f
from nltk.stem.porter import *
from spellchecker import SpellChecker

In [None]:
text = "this is jsut graet!"
spell = SpellChecker()
misspelled = spell.unknown(nltk.word_tokenize(text))
misspelled


{'graet'}

In [None]:
for word in misspelled:
  print(word, "=>", spell.correction(word))


graet => great


In [None]:
def dot(A,B): 
    return (sum(a*b for a,b in zip(A,B)))
  
def getTextDict(a, b):
  ab = set(a + b)
  ka = {}
  kb = {}
  for k in ab:
    ka[k] = 0
    kb[k] =0
  for k in a:
    ka[k] += 1
  for k in b:
    kb[k] += 1
  
  return ka, kb
 
def getCosineSimilarity(ka, kb):
  a, b = getTextDict(ka, kb)
  a = a.values()
  b = b.values()
  return dot(a,b) / ( (dot(a,a) **.5) * (dot(b,b) ** .5) )
 
a = ['a', 'a', 'b']
b = ['b', 'c', 'c']
getCosineSimilarity(a, b)


0.19999999999999996

In [None]:
spark = sqlCtx

In [None]:
def preprocess(body):
  body = BeautifulSoup(body)
  stemmer = PorterStemmer()
  # noise
  urls  =  body.find_all('a')
  if len(urls) > 0: body.a.clear()
 
  codes = body.find_all('code')
  if len(codes) > 0: body.code.clear()
 
  pres = body.find_all('pre')
  if len(pres) > 0: body.pre.clear()
  text = body.get_text()
 
  words = []
  text = text.lower()
  sents = nltk.sent_tokenize(text)
  for sent in sents:
    for word in nltk.word_tokenize(sent):
      if word in stop_en: continue
      if len(word) < 3: continue
      words.append(stemmer.stem(word))
  return words  
 
class SimilarQuestionRecommender(object):
  def __init__(self, progLang):
    print("loading and preprocessing started ....")
    file_location = DATA_PATH + "/SO-"+progLang+".csv"
    df = spark.read.option("header", True).option("wholeFile", True).option("escape", "\"").option("multiLine", True).csv(file_location)
    pdf = df.select('Id', 'AcceptedAnswerId', 'Score', 'Title', 'Body').toPandas()
    pdf['BodyPreprocessed'] = pdf['Body'].map(lambda a: preprocess(a))
    pdf['TitlePreprocessed'] = pdf['Title'].map(lambda a: preprocess(a))
    pdf['BodyTitle'] = pdf['TitlePreprocessed'] + pdf['BodyPreprocessed'] 
    self.pdfn = pdf[['Id', 'AcceptedAnswerId', 'Score', 'BodyTitle']]
    print("loading and preprocessing done ....")
    
  def getMaxScoreForQuestionWithoutAcceptedAnswer(self):
    maxScore = -99999
    isnullAcct = pd.isnull(self.pdfn['AcceptedAnswerId'])
    for index, row in self.pdfn.iterrows():
      if isnullAcct[index]: 
        score = int(row['Score'])
        if score > maxScore:
          maxScore = score
    return maxScore
  
  def getIdsOfQuestionsWithoutAcceptedAnswer(self):
    maxScore = self.getMaxScoreForQuestionWithoutAcceptedAnswer()
    idsForNonAccepted = []
    isnullAcct = pd.isnull(self.pdfn['AcceptedAnswerId'])
    for index, row in self.pdfn.iterrows():
      if isnullAcct[index]: 
        score = int(row['Score'])
        if score == maxScore:
          idsForNonAccepted.append(row["Id"])
    return idsForNonAccepted  
 
  def getSimilarQuestionForCandidateWithNotAccepted(self, qid):
    sims = dict()
    isnullAcct = pd.isnull(self.pdfn['AcceptedAnswerId'])
    textN = self.pdfn.loc[self.pdfn['Id'] == str(qid)]['BodyTitle'].values[0]
    for index, row in self.pdfn.iterrows():
      if isnullAcct[index]: continue
      textA = row['BodyTitle']
      iid = row['Id']
      sim = getCosineSimilarity(textN, textA)
      sims[iid] = sim
    sims_sorted = sorted(sims.items(), key=lambda x: x[1], reverse=True)
    topThree = sims_sorted[0:3]
    for item in topThree:
      iid = item[0] 
      sim_score = item[1]
      print("\t Question Id = ", iid, " with Similarity Score = ", sim_score)
    #return sims_sorted
  
  def recommendSimilarQuestion(self):
      idsForNonAccepted = self.getIdsOfQuestionsWithoutAcceptedAnswer()
      print("Total highest scored questions without accepted answers = %d"%(len(idsForNonAccepted)))
      for qid in idsForNonAccepted:
        print("For ", qid, ": Recommendations are: ")
        self.getSimilarQuestionForCandidateWithNotAccepted(qid)


In [None]:
%%time
progLang = "Python"
sqr = SimilarQuestionRecommender(progLang)

loading and preprocessing started ....
loading and preprocessing done ....
CPU times: user 5min 54s, sys: 2.77 s, total: 5min 57s
Wall time: 6min 20s


In [None]:
%%time
sqr.recommendSimilarQuestion()

Total highest scored questions without accepted answers = 1
For  455612 : Recommendations are: 
	 Question Id =  56433990  with Similarity Score =  0.39605901719066977
	 Question Id =  1598579  with Similarity Score =  0.3818813079129867
	 Question Id =  8868985  with Similarity Score =  0.380920029682232
CPU times: user 20.6 s, sys: 5.79 ms, total: 20.6 s
Wall time: 20.8 s


## MlLib

In [None]:
spark = sqlCtx

In [None]:
from pyspark.ml.feature import VectorIndexer, StringIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml import Pipeline
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [None]:
# Load the data stored in LIBSVM format as a DataFrame.
data = spark.read.format("libsvm").load("sample_libsvm_data.txt")

# Index labels, adding metadata to the label column.
# Fit on whole dataset to include all labels in index.
labelIndexer = StringIndexer(inputCol="label", outputCol="indexedLabel").fit(data)
# Automatically identify categorical features, and index them.
# We specify maxCategories so features with > 4 distinct values are treated as continuous.
featureIndexer =\
    VectorIndexer(inputCol="features", outputCol="indexedFeatures", maxCategories=4).fit(data)

# Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = data.randomSplit([0.7, 0.3])

# Train a DecisionTree model.
dt = DecisionTreeClassifier(labelCol="indexedLabel", featuresCol="indexedFeatures")

# Chain indexers and tree in a Pipeline
pipeline = Pipeline(stages=[labelIndexer, featureIndexer, dt])

# Train model.  This also runs the indexers.
model = pipeline.fit(trainingData)

# Make predictions.
predictions = model.transform(testData)

# Select example rows to display.
predictions.select("prediction", "indexedLabel", "features").show(5)

# Select (prediction, true label) and compute test error
evaluator = MulticlassClassificationEvaluator(
    labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
print("Test Error = %g " % (1.0 - accuracy))

treeModel = model.stages[2]
# summary only
print(treeModel)
# $example off$

+----------+------------+--------------------+
|prediction|indexedLabel|            features|
+----------+------------+--------------------+
|       1.0|         1.0|(692,[95,96,97,12...|
|       1.0|         1.0|(692,[98,99,100,1...|
|       1.0|         1.0|(692,[121,122,123...|
|       1.0|         1.0|(692,[122,123,124...|
|       1.0|         1.0|(692,[123,124,125...|
+----------+------------+--------------------+
only showing top 5 rows

Test Error = 0.0909091 
DecisionTreeClassificationModel (uid=DecisionTreeClassifier_7e29319e858c) of depth 2 with 5 nodes
