#<b> Question 5: Implement the Locality Minhash/LSH algorithm discussed in class, using Spark (Scala, Java, or Python) <br>
    <B> NOTE: I have implemented all the code in databricks because of the memory issues in jupyter notebook. and i have attached cell by cell implementation screenshot for your reference if you need the data to be displayed in tabular format.<br>
 <b>#References (Understod the concept from) <br>
     https://databricks.com/blog/2017/05/09/detecting-abuse-scale-locality-sensitive-hashing-uber-engineering.html <br>
 https://mattilyra.github.io/2017/05/23/document-deduplication-with-lsh.html <br>
 http://mccormickml.com/2015/06/12/minhash-tutorial-with-python-code/<br>
     https://spark.apache.org/docs/1.5.2/api/python/pyspark.ml.html<br>
https://spark.apache.org/docs/2.2.3/ml-features.html#n-gram <br>
         https://towardsdatascience.com/countvectorizer-hashingtf-e66f169e2d4e<br>
        

In [2]:
#creating spark session
from pyspark.sql import SparkSession

spark = SparkSession\
        .builder\
        .appName("Big Data Q5")\
        .getOrCreate()
sc = spark.sparkContext

In [3]:
sc

In [4]:
#importing various ml libraries and pyspark functions necessary to run the code. 
from pyspark.ml.feature import Tokenizer, RegexTokenizer, NGram

from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.ml.feature import Tokenizer, RegexTokenizer, NGram
import re as regexp
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.feature import CountVectorizer

In [5]:
#Defining a function named change that removes new line character which do not need to be compared
def change(pair):
    line = regexp.sub(r'\n\s*\n','\n',pair[1],regexp.MULTILINE)
    return [[[name for name in pair[0].split('/')][-1] ,line]]

In [6]:
#Flattening the data and loading the text files
data = sc.wholeTextFiles("/FileStore/tables/*.txt").flatMap(change)

In [7]:
#Converting the data into dataframes and storing it in data_df
data_df = data.toDF(['doc_title','doc_content'])

In [8]:
#Using tokenizer function which is the process of taking text (such as a sentence) and breaking it into individual terms (usually words)
#The Tokenizer splits the txt file into output columns of doc_words,doc_title and doc_content
tokenizer = Tokenizer(inputCol="doc_content",outputCol="doc_words")

In [9]:
#Transforming the dataframe using the tokenizer object
#Transform method is taken from pyspark.ml.Transformer
token = tokenizer.transform(data_df)

In [10]:
#selecting various columns
data_select = token.select("doc_title","doc_content", "doc_words")

In [11]:
#Displaying the result
data_select.show()

In [12]:
#Using ngrams function which is a sequence of n tokens (typically words) for some integer n. 
#The NGram class can be used to transform input features into n-grams.
#Going from pair of words, to trigrams, till ngrams combination of words
ng = NGram(n=2, inputCol="doc_words", outputCol="ngrams")
data_select = ng.transform(data_select)

In [13]:
#CountVectorizer converts into vectors 
# fit is used to fit the model 
#model is transformed 
count_vect = CountVectorizer(inputCol="ngrams", outputCol="features", vocabSize=100000, minDF=2)
model = count_vect.fit(data_select)
data_select = model.transform(data_select)

In [14]:
#Displaying the result
data_select.show()

In [15]:
#Used MinHashLSH fucntion from SparkMLand using this to generate the hashes which are generated at a grouped level.
# We are taking line wise split grouped by the no of words mentioned in the Vocab size
# This gives us the flexibility of checking how many rows were similar.
#LSH class for Jaccard distance.
min_hash_lsh = MinHashLSH(inputCol="features", outputCol="hashValues", seed=12345).setNumHashTables(3)
minhash_model = min_hash_lsh.fit(data_select)
minhash_model.transform(data_select)
data_select.show()
print("Total Files - ",data_select.count())
print("Column Data types ",data_select.dtypes)

In [16]:
#Approximate similarity join takes two datasets (data_select in this case) and approximately returns pairs of rows in the datasets whose distance is smaller than a user-defined threshold (0.7 in this case).
approx_model = minhash_model.approxSimilarityJoin(data_select, data_select,3.0, distCol="JaccardDistance").select(col("datasetA.doc_title").alias("Title A"), col("datasetB.doc_title").alias("Title B"),col("JaccardDistance")).sort(desc("JaccardDistance")).dropDuplicates(['JaccardDistance'])

In [17]:
#Displaying only top 10 rows because it was taking alot of computational time for the entire data. 
approx_model.show(10)

In [18]:
#setting a threshold 0.7 to decide take a decidsion based on JaccardDistance
#I am displaying only top 5 rows as it was taking alot of computational time for the entire dataset. 
matches = approx_model.filter(approx_model['JaccardDistance'] > 0.7)
matches.show(5)