In [0]:
%sh
pip install --upgrade pip
pip install bs4
pip install nltk
pip install pyspellchecker
pip install python-Levenshtein

In [0]:
import nltk
nltk.download('punkt')
nltk.download('stopwords')

## Answer to Question 1
“this is jsut graet!”
1. How can you fix typos in the above sentence using Levenshtein distance algorithm? (write a small python code to show)<br>

  Using Levenshtein distance algorithm can fix typos by indicating the minimum number of operations (insert, delete and replace) that needs to be applied to a misspelled word in comparison to a correct spelled word. We may find the lev distance between the misspelled word and the correct word and then applied number of operations to fix the typo. See code below.

2. For each typo, what kind of edit operation you need to do? (consider the three edit operations: insert, delete, replace)<br>
There are several ways to apply edit operation to fix the typo.<br>
  For typo 'jsut': <br>
    - Method 1: we can first delete the letter `s` and then insert a letter `s` after the letter `u`.
    - Method 2: we can first replace the letter `s` with a letter `u` and then replace the next letter `u` with a letter `s`

  For typo 'graet': <br>
    - Method 1: we can first delete the letter `a` and then insert a letter `a` after the letter `e`.
    - Method 2: we can first replace the letter `a` with a letter `e` and then replace the next letter `e` with a letter `a`

In [0]:
import Levenshtein as lev
from nltk.tokenize import word_tokenize
from spellchecker import SpellChecker

spell = SpellChecker()
    
text = "this is jsut graet!"
words = word_tokenize(text)

# There are some limitations in spellchecker module which identifies the word 'jsut'
# as a correct word. Hence, I had to remove it from its list of word from the module
# to apply and fix the typo.
spell.word_frequency.remove('jsut') 

misspelled = spell.unknown(words)
for word in misspelled:
  print("Typo = %s. Correct word = %s. Levenshtein Distance = %d"
        %(word, spell.correction(word), lev.distance(word, spell.correction(word))))

In [0]:
from nltk.corpus import stopwords
stop_en = stopwords.words('english')
from nltk.tokenize import sent_tokenize
from nltk.tokenize import word_tokenize
from nltk.stem import SnowballStemmer
sb = SnowballStemmer(language = 'english')

from bs4 import BeautifulSoup
import pyspark.sql.functions as f
from pyspark.sql.functions import udf
from pyspark.sql.functions import concat_ws
from pyspark.sql.functions import lit
from pyspark.sql.functions import sqrt
from pyspark.sql import Window
import pyspark.sql.types as t

import math

## Answer to Question 2

In [0]:
@udf
def preprocess_udf(body):
    body = BeautifulSoup(body)
    #removed noise
    urls  =  body.find_all('a')
    if len(urls) > 0: body.a.clear()
    #removed noise
    codes = body.find_all('code')
    if len(codes) > 0: body.code.clear()
    #removed noise
    pres = body.find_all('pre')
    if len(pres) > 0: body.pre.clear()
    text = body.get_text()
    words = []
    text = text.lower() #convert words to lowercase
    sents = nltk.sent_tokenize(text)
    for sent in sents:
      for word in nltk.word_tokenize(sent):
        if word in stop_en: continue #removed stopwords
        if len(word) < 3: continue #removed words with less than 3 characters
        words.append(sb.stem(word))#transform word to root
    return " ".join(words)

def distinct(text):
    unique = []
    seen = set()
    for word in text.split():
        if word not in seen: #removed duplicate words
            unique.append(word)
            seen.add(word)
    return unique
unique = f.udf(distinct, t.ArrayType(t.StringType()))

#verify length and reduction of words/characters per every step
# slen = udf(lambda s: len(s), t.IntegerType())

def dtm(text, compare):
    d = [0] * len(compare)
    for word in text.split():
      if word in compare:
        d[compare.index(word)] += 1
    return d
occurrences = f.udf(dtm, t.ArrayType(t.IntegerType()))

def cosine_similarity(dt1, dt2):
    sum_total = 0;
    sum_dt1 = 0;
    sum_dt2 = 0;
    for x,y in zip(dt1, dt2):
      sum_total += x * y
      sum_dt1 += x * x
      sum_dt2 += y * y
    similar = sum_total / math.sqrt(sum_dt1 * sum_dt2)
    return similar
similarity = f.udf(cosine_similarity, t.DoubleType())

class SOAnalysis(object):
  
  def __init__(self, progLang):
    pass

  def getFile(self):
    file_location = "/FileStore/tables/SO_"+progLang+".csv"
    self.df = spark.read.option("header", True).option("wholeFile", True).option("escape", "\"").option("multiLine", True).csv(file_location)
      
  def filtered_cleanDF(self):
    df0 = self.df.select('Title', preprocess_udf("Title").alias("title_cleaned"),
                         'Body', preprocess_udf("Body").alias("body_cleaned"),
                         'AcceptedAnswerId', 'Score').drop("Title").drop("Body")
    df0 = df0.withColumn("TextualContent", concat_ws(' ', df0.title_cleaned, df0.body_cleaned))
    df0 = df0.drop("title_cleaned").drop("body_cleaned")
    return df0
  
  def getMaxScoreUnacceptedAns(self):
    unaccepted_df = self.filtered_cleanDF().where('TextualContent is not null').where('AcceptedAnswerId is null')
    unaccepted_df = unaccepted_df.withColumn("Score", unaccepted_df["Score"].cast(t.IntegerType()))
    w = Window.partitionBy('AcceptedAnswerId')
    unaccepted_df = unaccepted_df.withColumn('maxScore', f.max('Score').over(w)).where(f.col('Score') == f.col('maxScore')).drop('maxScore')
    return unaccepted_df

  def similarityAnalysis(self):
    UnacceptedQuestion = self.getMaxScoreUnacceptedAns().select('TextualContent').first()['TextualContent']
    df1 = self.filtered_cleanDF().where('TextualContent is not null').where('AcceptedAnswerId is not null')
    df2 = df1.withColumn("UnacceptedQuestion", lit(UnacceptedQuestion))
    df2 = df2.withColumn("Merged_Comparison", concat_ws(' ', df2.TextualContent, df2.UnacceptedQuestion))
    df3 = df2.select('*', unique("Merged_Comparison").alias("DistinctWords")).drop("Merged_Comparison")
    df4 = df3.select('*', occurrences("TextualContent","DistinctWords").alias("DT1"),
                     occurrences("UnacceptedQuestion","DistinctWords").alias("DT2"))
    df5 = df4.select('*', similarity("DT1","DT2").alias("CosineSimilarity")).drop("DistinctWords")
    return df5
  
  def getTopSimilarQuestions(self):
    return self.similarityAnalysis().orderBy(['CosineSimilarity'], ascending=[0])

In [0]:
progLang = 'Python'
soAnalysis = SOAnalysis(progLang)
soAnalysis.getFile()
soAnalysis.filtered_cleanDF().show()

In [0]:
soAnalysis.getMaxScoreUnacceptedAns().printSchema()
soAnalysis.getMaxScoreUnacceptedAns().show()

In [0]:
soAnalysis.similarityAnalysis().printSchema()
soAnalysis.similarityAnalysis().show()

In [0]:
soAnalysis.getTopSimilarQuestions().show(3)

In [0]:
progLang = 'Java'
soAnalysis = SOAnalysis(progLang)
soAnalysis.getFile()
soAnalysis.filtered_cleanDF().show()

In [0]:
soAnalysis.getMaxScoreUnacceptedAns().show()

In [0]:
soAnalysis.similarityAnalysis().show()

In [0]:
soAnalysis.getTopSimilarQuestions().show(3)

In [0]:
progLang = 'Javascript'
soAnalysis = SOAnalysis(progLang)
soAnalysis.getFile()
soAnalysis.filtered_cleanDF().show()

In [0]:
soAnalysis.getMaxScoreUnacceptedAns().show()

In [0]:
soAnalysis.similarityAnalysis().show()

In [0]:
soAnalysis.getTopSimilarQuestions().show(3)