In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

!apt-get install -y openjdk-8-jdk-headless -qq > /dev/null #install openjdk


In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64" #set environment variable
!update-alternatives --set java /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java

os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

update-alternatives: using /usr/lib/jvm/java-8-openjdk-amd64/jre/bin/java to provide /usr/bin/java (java) in manual mode


In [None]:
import re, hashlib, math, time
from random import randint, seed
seed(16)


class hashFamily:
    def __init__(self, i):
        self.resultSize = 8 # how many bytes we want back
        self.maxLen = 20 # how long can our i be (in decimal)
        self.salt = str(i).zfill(self.maxLen)[-self.maxLen:]
        self.id = i
        
    def get_hash_value(self, el_to_hash):
        return int(hashlib.sha1(str(el_to_hash).encode('utf-8') + self.salt.encode('utf-8')).hexdigest()[-self.resultSize:], 16)
    

class shingler:
    def __init__(self, k):
        
        if k > 0:
            self.k = int(k)
        else:
            self.k = 10
        
    #inner class utility
    def process_doc(self, document):
        return re.sub("( )+|(\n)+"," ",document).lower()
    
    def get_shingles(self, document):
        shingles = set()
        document= self.process_doc(document)
        for i in range(0, len(document)-self.k+1 ):
            shingles.add(document[i:i+self.k])
        return shingles
    
    def get_k(self):
        return self.k
    
    #return sorted hash
    def get_hashed_shingles(self, shingles_set):
        hash_function = hashFamily(0)
        return sorted( {hash_function.get_hash_value(s) for s in shingles_set} )

In [None]:
# !pip install spark
# !pip install findspark

import findspark
findspark.init()

from pyspark.sql import SparkSession

file_path = "/content/dataset_rent_rome_kijiji.tsv"  
spark = SparkSession.builder.getOrCreate()
segments = spark.read.csv( path=file_path, header=True, sep='\t' )

#segments.persist() # to avoid lazy behaviour and store dataset in memory
segments.show() # data preview

+--------------------+--------------------+--------------------+------------------+-------------------+--------------------+
|               Title|   Short Description|            Location|      Price (Euro)|          Timestamp|             Url Adv|
+--------------------+--------------------+--------------------+------------------+-------------------+--------------------+
|Studio accessoria...|Affitto studio a ...|               Roma |              450 | 12 ottobre, 11:32 |https://www.kijij...|
|Negozio 169Mq per...|Privato affitta n...|Prenestino / Casi...|            1.700 | 12 ottobre, 08:45 |https://www.kijij...|
|Negozio in tiburt...|Negozio c/1 roma ...|Tiburtino / Colla...|            6.000 | 17 October, 21:20 |https://www.kijij...|
|Studio medico via...|Studio medico avv...|Trieste / Somalia...|              200 | 17 October, 20:22 |https://www.kijij...|
|Cerco: Appartamen...|Donna lavoratrice...|               Roma |Contatta l'utente | 17 October, 19:39 |https://www.kijij...|


In [None]:
from pyspark.sql.functions import monotonically_increasing_id 

dataset = segments.select('Title','Short Description').withColumn("doc_id", monotonically_increasing_id())
dataset.show()
dataset.persist()

+--------------------+--------------------+------+
|               Title|   Short Description|doc_id|
+--------------------+--------------------+------+
|Studio accessoria...|Affitto studio a ...|     0|
|Negozio 169Mq per...|Privato affitta n...|     1|
|Negozio in tiburt...|Negozio c/1 roma ...|     2|
|Studio medico via...|Studio medico avv...|     3|
|Cerco: Appartamen...|Donna lavoratrice...|     4|
|Elegante studio m...|Studio medico con...|     5|
|Ufficio su strada...|A pochi metri da ...|     6|
|  Camera in affitto |Camera per studen...|     7|
|Magazzino Via Giu...|Affittasi Locale ...|     8|
|Negozio + Piazzal...|Privato AFFITTA S...|     9|
|Locale uso studio...|Affittasi N.1 loc...|    10|
|Sala per Feste pe...|Spazio modulabile...|    11|
|Sala per feste a ...|Affitto locale a ...|    12|
|  Box insonorizzato |Box con soppalco ...|    13|
|Stanze per Medici...|Affittasi 1 o 2 S...|    14|
|Appartamento in m...|Appartamento in v...|    15|
|Location per fest...|Per una f

DataFrame[Title: string, Short Description: string, doc_id: bigint]

In [None]:
def shingling_map(row):
    out = [] 
    sh_instance = shingler(10)
    hashed_shingles = sh_instance.get_hashed_shingles( sh_instance.get_shingles( row['Title']+" "+row['Short Description'] ) ) 
    signature_size = 50
    for i in range(0,signature_size): #signature size
        out.append( (row['doc_id'], hashed_shingles, i ) ) 
    #return an iterator to use flatMap => produce more than one key-value pair as output (namely one per hash function)
    return iter(out)

    
#Use rdd.collect() to get all data from workers to driver. In the specific case it returns a list of [(doc_id, shingle_set),...] where shingle_set = [sh1,sh2,...sh_n]
docId_shingleset_hfunc = dataset.rdd.flatMap(shingling_map)

In [None]:
def minhash_map(docId_ShingleSet_hFunct):
    doc_id = docId_ShingleSet_hFunct[0]
    shingles = docId_ShingleSet_hFunct[1]
    hash_f = hashFamily( docId_ShingleSet_hFunct[2] )
    min_h = math.inf
    for el in shingles:
        hash_value = hash_f.get_hash_value(el)
        if hash_value < min_h:
            min_h = hash_value
            
    return (doc_id, min_h )


# as Reduce step we use the built in groupByKey() since no extra operation is needed.
sig_matrix = docId_shingleset_hfunc.map(minhash_map).groupByKey().map(lambda x : (x[0], list(x[1])))
sig_matrix.persist() # to do not compute it every time since its an expensive computation


PythonRDD[31] at RDD at PythonRDD.scala:53

In [None]:
def map_buckets(row):
    
    band_nr = 5
    row_nr = 10
    doc_id = row[0]
    doc_sign = row[1]
    hash_funct = hashFamily(1)
    out = []
    
    for i in range(0,band_nr):
        band_id = i
        idx = i*row_nr   
        set_col = ' '.join(str(x) for x in doc_sign[idx:idx+row_nr])
        bucket = hash_funct.get_hash_value(set_col)
        out.append( ( (band_id, bucket), doc_id)  )
    
    return iter(out) #since we are going to return multiple tuple we call flatMap() and return an iterator on those tuple
    
candidates = sig_matrix.flatMap(map_buckets).groupByKey().map(lambda x : (x[0], list(x[1]))) #the map() is used to convert iterator to a list

In [None]:
!java -version

openjdk version "1.8.0_282"
OpenJDK Runtime Environment (build 1.8.0_282-8u282-b08-0ubuntu1~18.04-b08)
OpenJDK 64-Bit Server VM (build 25.282-b08, mixed mode)


In [None]:
similar_pairs = set()
sig_df = sig_matrix
sig_df.persist() #store it in memory to avoid lazy behaviour
sig_df = sig_df.collect() #collect all the distributed record to speedup computation
start_time = time.time()

for candidates_list in candidates.map(lambda x: x[1]).collect(): #collecting all candidate lists from worker nodes
    
    candidates_nr = len(candidates_list)
    
    for i in range(0,candidates_nr-1):
        for j in range(i+1,candidates_nr):
            
            doc_id_1 = candidates_list[i]
            doc_id_2 = candidates_list[j]
            
            docId_Sig_1 = sig_df[doc_id_1]
            docId_Sig_2 = sig_df[doc_id_2]
            if docId_Sig_1[0] != doc_id_1:
                raise Exception("DocId is %d while retrived %d from signature matrix."%(doc_id_1,docId_Sig_1[0]))
            if docId_Sig_2[0] != doc_id_2:
                raise Exception("DocId is %d while retrived %d from signature matrix."%(doc_id_1,docId_Sig_2[0])) 
                
            sig_1 = set(docId_Sig_1[1]) #transform signature into a set
            sig_2 = set(docId_Sig_2[1])
            #sig_1 = sig_df.filter("_1 =="+" "+ str(doc_id_1) ).select('_2') #get signature of doc 1
            #sig_2 = sig_df.filter("_1 =="+" "+ str(doc_id_2) ).select('_2') #get signature of doc 2
            
            #sig_2 = set(sig_2.collect()[0]['_2'])
            js = len(sig_1.intersection(sig_2) ) / len(sig_1.union(sig_2) ) #Compute Jaccard'Similarity
            if js >= 0.8:
                pair = tuple(sorted((doc_id_1,doc_id_2) ))
                similar_pairs.add(  pair   ) 

end_time = time.time()  

lsh_time = end_time - start_time

print("FOUND %d SIMILAR PAIRS" %len(similar_pairs))

#sim_pairs = candidates.flatMap(get_similar_items).groupByKey().map(lambda x : (x[0], set(serted(x[1]))))

FOUND 10322 SIMILAR PAIRS


In [None]:
DocID_Shingles=docId_shingleset_hfunc.map(lambda x: (x[0],x[1]) ).reduceByKey(lambda x,y: x).collect()
#DocID_Shingles.persist()
doc_nr = len(DocID_Shingles)
similar_pairs_bf = set()

start_time = time.time()

for i in range(0,doc_nr-1):
    for j in range(i,doc_nr):
        doc_id_1 = DocID_Shingles[i][0] #DocID_Shingles[i] is  tuple (doc_id, shingle_set)
        doc_id_2 = DocID_Shingles[j][0]
        
        sig_1 = set(DocID_Shingles[i][1]) #transform shingle_set into a python set data structure
        sig_2 = set(DocID_Shingles[j][1])
        js = len(sig_1.intersection(sig_2) ) / len(sig_1.union(sig_2) ) #Compute Jaccard'Similarity
        if js >= 0.8:
            pair = tuple(sorted((doc_id_1,doc_id_2) ))
            similar_pairs_bf.add(  pair   ) 
            
end_time = time.time()
    
bf_time = end_time - start_time 

print("FOUND %d SIMILAR PAIRS WITH BRUTEFORCE" %len(similar_pairs_bf))

FOUND 12986 SIMILAR PAIRS WITH BRUTEFORCE


In [None]:
print("EXECUTION REPORT")
print("LSH\n%d\tSIMILAR ITEMS\n%.2f\tSECONDS\n"%(len(similar_pairs), lsh_time))
print("BRUTEFORCE\n%d\tSIMILAR ITEMS\n%.2f\tSECONDS\n"%(len(similar_pairs_bf), bf_time))
print("%d SIMILAR PAIR DISCOVERED WITH BRUTEFORCE AND LSH\n"%( len(similar_pairs.intersection(similar_pairs_bf)) ))

print("%d NUMBER OF FALSE POSITIVE SIMILARITIES WITH LSH\n"%(len(similar_pairs) - len(similar_pairs.intersection(similar_pairs_bf)) ) )

print("%d NON DETECTED SIMILARITIES BY LSH"%(len(similar_pairs_bf) - len(similar_pairs.intersection(similar_pairs_bf))) )

EXECUTION REPORT
LSH
10322	SIMILAR ITEMS
0.89	SECONDS

BRUTEFORCE
12986	SIMILAR ITEMS
329.28	SECONDS

10321 SIMILAR PAIR DISCOVERED WITH BRUTEFORCE AND LSH

1 NUMBER OF FALSE POSITIVE SIMILARITIES WITH LSH

2665 NON DETECTED SIMILARITIES BY LSH


In [None]:
#LSH

for i in range(0,10):
    pair = similar_pairs.pop()
    doc1 = pair[0]
    doc2 = pair[1]
    print("Doc: "+str(i))
    dataset.where('doc_id == '+str(doc1)).show()
    print("Similar Doc:")
    dataset.where('doc_id == '+str(doc2)).show()
    print("\n\n")

Doc: 0
+--------------------+--------------------+------+
|               Title|   Short Description|doc_id|
+--------------------+--------------------+------+
|Negozio 169Mq per...|Privato affitta n...|   266|
+--------------------+--------------------+------+

Similar Doc:
+--------------------+--------------------+------+
|               Title|   Short Description|doc_id|
+--------------------+--------------------+------+
|Negozio 169Mq per...|Privato affitta n...|  2034|
+--------------------+--------------------+------+




Doc: 1
+--------------------+--------------------+------+
|               Title|   Short Description|doc_id|
+--------------------+--------------------+------+
|Negozi C1 in via ...|Privato affitta n...|   714|
+--------------------+--------------------+------+

Similar Doc:
+--------------------+--------------------+------+
|               Title|   Short Description|doc_id|
+--------------------+--------------------+------+
|Negozi C1 in via ...|Privato affitt