In [1]:
from pyspark import SparkContext
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import SparkSession
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import CountVectorizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer, NGram
from pyspark.ml.feature import MinHashLSH
import re


filepath = "/FileStore/tables/*.txt"

def flat(pair):
    tx = re.sub(r'\n\s*\n','\n',pair[1],re.MULTILINE)
    tx = re.sub("\n", "", tx)
    tx = re.sub("\r", "", tx)
    tx = re.sub(' +', ' ', tx)
    return [[[name for name in pair[0].split('/')][-1] ,tx]]
spksess = SparkSession \
        .builder \
        .appName("LSH") \
        .getOrCreate()
spkcontx = spksess.sparkContext

df = spkcontx.wholeTextFiles(filepath).flatMap(flat)
# try to  print(df)
cook_book_df = df.toDF(['title','content'])

#try to Tokenize df 
tok = Tokenizer(inputCol="content", outputCol="words")
tokenized = tok.transform(cook_book_df)
newdata = tokenized.select("title","content", "words")

#Fit a CountVectorizerModel
cvec = CountVectorizer(inputCol="words", outputCol="features", vocabSize=50000, minDF=2)
mod = cvec.fit(newdata)
newdata = mod.transform(newdata)

# Fit a MinHashLSH mod 
mhash = MinHashLSH(inputCol="features", outputCol="hashVal", seed=12345).setNumHashTables(3)
mod = mhash.fit(newdata)
newdata = mod.transform(newdata)
print("Number of Files - ")
print(newdata.count())
print("Data types of columns - ")
print(newdata.dtypes)
newdata.show()

match_mx = mod.approxSimilarityJoin(newdata, newdata, 3, "JaccardDistance").select(col("datasetA.title")\
                    .alias("Title A"), col("datasetB.title").alias("Title B"),\
                    col("JaccardDistance")).sort(desc("JaccardDistance")).dropDuplicates(['JaccardDistance'])
threshold = 0.85
# match_mx.show()
occurences = match_mx.filter(match_mx['JaccardDistance'] < threshold)
# print(occurences.count())
print("Displaying occurences' having JaccardDistance < 0.9")
occurences.show()