## DOCUMENT SIMILARITY SEARCH WITH PYSPARK

In [1]:
from pyspark.sql import SparkSession

# Create SparkSession
spark = SparkSession.builder \
        .appName("SimilaritySearch") \
        .master("spark://172.29.15.15:7077") \
        .config("spark.executor.instances", "2") \
        .getOrCreate()


# Read datafile
file_path = "hdfs://172.29.15.15:9870/khang/dataset/corpus.jsonl"
df = spark.read.json(file_path)

# # # #segments.persist() # to avoid lazy behaviour and store dataset in memory
df.show() # data preview

ModuleNotFoundError: No module named 'pyspark'

In [12]:
spark.stop()

In [2]:
print("Number of documents: ", df.count())
print(type(df))

[Stage 2:>                                                        (0 + 20) / 20]

Number of documents:  5233329
<class 'pyspark.sql.dataframe.DataFrame'>


                                                                                

In [3]:
from pyspark.sql.functions import col

# Check if the column "id" is unique or not
is_unique = df.groupBy("_id").count().filter(col("count") > 1).isEmpty()
print(is_unique)

23/10/31 14:21:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/31 14:21:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/31 14:21:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/31 14:21:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/31 14:21:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/31 14:21:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/31 14:21:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/31 14:21:56 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
23/10/31 14:21:56 WARN RowBasedKeyValueBatch: Calling spill() on

True


In [14]:
# Get Spark's Worker information
sc = spark.sparkContext

# Get the driver node's hostname and port
driver_host = sc.getConf().get("spark.driver.host")
driver_port = sc.getConf().get("spark.driver.port")

print("Driver node hostname:", driver_host)
print("Driver node port:", driver_port)

user = sc.sparkUser()
print("User running the Spark job:", user)

Driver node hostname: master
Driver node port: 34641
User running the Spark job: khangvt146


## SHINGLING

`Shingling` algorithms will dividing a document or text into a sequence of contiguous, overlapping, or non-overlapping `shingles`, which are essentially small units of text. Shingling is typically used to create a compact representation of the text, which can then be used to compare and measure the similarity between documents.

The Map function for shingling will distribute documents among worker nodes in the network producing `(doc id,Shingles set, hash_id)` pairs. The reduce function don't need in this case.

In [33]:
from utils.utilities import Shingling
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

def shingling_map(row):
    out = []
    sh_instance = Shingling(5)
    shingles = sh_instance.get_shingles(row["title"] + " " + row["text"], words=True)
    signature_size = 100
    for i in range(0, signature_size):  # signature size
        out.append((row["_id"], shingles, i))

    # return an iterator to use flatMap => produce more than one key-value pair as output (namely one per hash function)
    return iter(out)

# Define the schema as a list of StructField objects
schema = StructType([
    StructField("doc_id", StringType(), nullable=False),
    StructField("shingles_set", StringType(), nullable=True),
    StructField("hash_id", IntegerType(), nullable=True)
])

#Use rdd.collect() to get all data from workers to driver.
result = df.rdd.flatMap(shingling_map).toDF(schema)
result.filter(col("hash_id") == 0).collect()

ERROR:root:KeyboardInterrupt while sending command.               (0 + 20) / 20]
Traceback (most recent call last):
  File "/home/khangvt146/workspace/similarity_search/venv/lib/python3.8/site-packages/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
  File "/home/khangvt146/workspace/similarity_search/venv/lib/python3.8/site-packages/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
  File "/usr/lib/python3.8/socket.py", line 669, in readinto
    return self._sock.recv_into(b)
KeyboardInterrupt


KeyboardInterrupt: 

[Stage 37:>                                                       (0 + 20) / 20]

## MIN-HASHING

`Min Hashing` is a technique used to estimate the similarity between sets, in this case is the sets of shingles extracted from documents. It works by creating a signature matrix that represents the presence or absence of shingles in each documents.

 Design a map-reduce task to produce the signature matrix:
 - Map Task: take input `(doc_id, shingle_set, h_i)` with `h_i` is an hash function from hash family defined above and produce the minhash value of the set for that given hash function. Output: `(doc_id, min_hash)`
 - Reduce Task: group result from Map Task. Ouput: `(doc_id, minhash_signature)`


In [None]:
from utils.utilities import HashFamily
import math

def minhash_map(row):
    doc_id = row[0]
    shingles = row[1]
    hash_f = HashFamily(row[2])
    min_h = math.inf
    for el in shingles:
        hash_value = hash_f.get_hash_value(el)
        if hash_value < min_h:
            min_h = hash_value

    return (doc_id, min_h)

shingleset_hfunc = df.rdd.flatMap(shingling_map)
sig_matrix = shingleset_hfunc.map(minhash_map).groupByKey()

# Format result
sig_matrix = sig_matrix.map(lambda x : (x[0], list(x[1])))

In [5]:
from utils.utilities import Shingling

shingling = Shingling(5)
data = df.first()
string = data["title"] + " " + data["text"]
shingles = shingling.get_shingles(string, True)
hashed_shingles = shingling.get_hashed_shingles(shingles)
len(hashed_shingles)

42

In [18]:
from utils.utilities import HashFamily
a_text = [23145, 12345, 15674, 42212, 99899]
hash_1 = HashFamily(1)
hash_2 = HashFamily(2)

for item in a_text:
    value_1 = hash_1.get_hash_value(item)
    value_2 = hash_2.get_hash_value(item)

    print(f"Value 1: {value_1} -- Value 2: {value_2}")
    break

Value 1: 863571031 -- Value 2: 3368988008
