In [1]:
!pip install pyspark



In [0]:
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext,SparkSession
from pyspark.ml.feature import CountVectorizer,MinHashLSH
from pyspark.sql.types import *
from pyspark.sql import *
from pyspark import SparkContext, SparkConf
from pyspark import SparkConf
from pyspark.sql.functions import udf

In [3]:
!unzip cook.zip

Archive:  cook.zip
replace cook/amem.txt? [y]es, [n]o, [A]ll, [N]one, [r]ename: N


In [0]:
spark = SparkSession \
    .builder \
    .appName("Q3 MinHash") \
    .getOrCreate()

sc = spark.sparkContext
sqlContext = SQLContext(sc)
data = sc.wholeTextFiles('cook')

In [0]:
data_final = data.map(lambda x: Row(Filename=x[0],Data=x[1].strip()))
schema = "Filename Data"
fields = [StructField(field_name, StringType(), True) for field_name in schema.split()]


In [6]:
schema_final = StructType(fields)
df1 = spark.createDataFrame(data_final, schema_final)
df1.show()

+--------------------+--------------------+
|            Filename|                Data|
+--------------------+--------------------+
|file:/content/coo...|The American Woma...|
|file:/content/coo...|THE 

IDEAL BAR...|
|file:/content/coo...|Manual For Army C...|
|file:/content/coo...|"Aunt Babette's" ...|
|file:/content/coo...|The American Matr...|
+--------------------+--------------------+



**Function to calculate shingle**

In [0]:
import re
def get_shingles(text):

    text = text.replace('\r', '')
    text = text.replace('\n', '')
    text = re.sub('[^a-zA-Z0-9]+', ' ', text)
    text = text.lstrip().split(" ")  
    shingles = [text[i:i+3] for i in range(len(text)-3+1)]
    
    return shingles

In [0]:
text_shingle = lambda y: get_shingles(y)
shingle_final = udf(text_shingle,ArrayType(StringType()))

df_shingle = df1.withColumn("shingles", shingle_final(df1.Data))
df_shingle = df_shingle.drop("Data")

In [0]:
countVectorizer = CountVectorizer(inputCol="shingles", outputCol="features", vocabSize = 100000, minDF=2)
model_cv = countVectorizer.fit(df_shingle)
result = model_cv.transform(df_shingle)

In [10]:
minHash = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345)
model_minHash = minHash.fit(result)
result_minHash = model_minHash.transform(result)
result_minHash.show()


+--------------------+--------------------+--------------------+----------------+
|            Filename|            shingles|            features|          hashes|
+--------------------+--------------------+--------------------+----------------+
|file:/content/coo...|[[The, American, ...|(9166,[0,2,3,5,6,...|    [[294644.0]]|
|file:/content/coo...|[[THE, IDEAL, BAR...|(9166,[6,14,15,17...|    [[482803.0]]|
|file:/content/coo...|[[Manual, For, Ar...|(9166,[0,1,2,3,4,...|    [[294644.0]]|
|file:/content/coo...|[[Aunt, Babette, ...|(9166,[0,1,2,3,4,...|    [[294644.0]]|
|file:/content/coo...|[[The, American, ...|(9166,[18,700,390...|[[9.46173052E8]]|
+--------------------+--------------------+--------------------+----------------+



In [0]:
resultDF = result_minHash.drop("shingles")

In [12]:
resultDF.show(5)

+--------------------+--------------------+----------------+
|            Filename|            features|          hashes|
+--------------------+--------------------+----------------+
|file:/content/coo...|(9166,[0,2,3,5,6,...|    [[294644.0]]|
|file:/content/coo...|(9166,[6,14,15,17...|    [[482803.0]]|
|file:/content/coo...|(9166,[0,1,2,3,4,...|    [[294644.0]]|
|file:/content/coo...|(9166,[0,1,2,3,4,...|    [[294644.0]]|
|file:/content/coo...|(9166,[18,700,390...|[[9.46173052E8]]|
+--------------------+--------------------+----------------+



In [13]:
relation = model_minHash.approxSimilarityJoin(resultDF, resultDF, 0.5).filter("distCol != 0.0")
relation.show()

+--------------------+--------------------+-------------------+
|            datasetA|            datasetB|            distCol|
+--------------------+--------------------+-------------------+
|[file:/content/co...|[file:/content/co...|0.40930079155672827|
|[file:/content/co...|[file:/content/co...|0.40930079155672827|
+--------------------+--------------------+-------------------+



Reference: https://mattilyra.github.io/2017/05/23/document-deduplication-with-lsh.html