<h1>Question 5</h1>
<br>Duplicate Detection/Minhash in cookbook_txt.<br>
<b> NOTE: All the code is executed in databricks due to memory issues in jupyter.

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.ml.feature import *
import re

sc = SparkContext()
spark = SparkSession.builder.getOrCreate()

In [2]:
#convorting the regular expressions
def get_exp(pairs):
    line = re.sub(r'\n\s*\n','\n',pairs[1],re.MULTILINE)
    return [[[exp for exp in pairs[0].split('/')][-1] ,line]]

In [3]:
#Making an rdd by applying functions to all files in the folder
data_docs = sc.wholeTextFiles("/FileStore/tables/*.txt").flatMap(get_exp)

In [4]:
#Convorting the rdd to a dataframe for further processing
docs_info = data_docs.toDF(['doc_name','recipe'])
docs_info.show()

In [5]:
#creating tokens and showing the tokenized data along with the prev data This gives us a set of values we can compare.
tokens = Tokenizer(inputCol="recipe", outputCol="tokens")
tokenized_data = tokens.transform(docs_info)
tokenized_data.show()

In [6]:
#Making shingles using NGram
n_grams = NGram(n=2, inputCol="tokens", outputCol="shingles")
shingled_data = n_grams.transform(tokenized_data)
shingled_data.show()

In [7]:
#Vectorizing the shingles
shingles_vec = CountVectorizer(inputCol="shingles", outputCol="shingle_vectors", vocabSize=10000, minDF=2.0)
model = shingles_vec.fit(shingled_data)
shingled_data = model.transform(shingled_data)
shingled_data.show()

In [8]:
#generating hash signatures using the min_hash algorithm 
min_hash = MinHashLSH(inputCol="shingle_vectors", outputCol="hashes", seed=12345, numHashTables=3)
model = min_hash.fit(shingled_data)
hashed_data = model.transform(shingled_data)
hashed_data.show()

In [9]:
# using approx similarity join for self joining and finding the distances among the document data
distances_val = model.approxSimilarityJoin(hashed_data, hashed_data, 3.0, distCol="eucDistance").select(col("datasetA.doc_name").alias("docA"),col("datasetB.doc_name").alias("docB"),col("eucDistance")).sort(desc("eucDistance")).dropDuplicates(['eucDistance'])

In [10]:
#Displaying the top 5 rows due to memory issues
distances_val.show(10)

In [11]:
#setting a threshold of 0.7 and filtering out the values over the threshold.
#Displaying the top 5 rows due to memory issues
threshold = 0.7
duplicates = distances_val.filter(distances_val['eucDistance'] > threshold)
duplicates.show(5)