# **Finding similar items**

### Project for the **Algorithms for massive data course**


MSc, Data Science for Economics*

Shojaat Joodi Bigdilo

June 2024

In [None]:
from google.colab import drive
drive.mount('/content/gdrive',force_remount=True)

Mounted at /content/gdrive


In [None]:
!pip install kaggle

import warnings
warnings.filterwarnings("ignore")

In [None]:
!pip install findspark

In [None]:
!pip install pyspark

In [None]:
import os
os.environ['KAGGLE_USERNAME'] = 'xxxxxxxxx'
os.environ['KAGGLE_KEY'] = 'xxxxxxxxx'

In [None]:
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

In [None]:
extract_to_path  = "/content/gdrive/My Drive/Massive_Data_Project/Job_Dataset"

import zipfile
with zipfile.ZipFile('1-3m-linkedin-jobs-and-skills-2024.zip', 'r') as zip_ref:
    zip_ref.extractall(extract_to_path)

In [None]:
from pyspark import SparkConf, SparkContext

from pyspark.sql import SparkSession
from pyspark.sql.functions import isnan, when, count, col, countDistinct
from pyspark.sql.functions import lower, regexp_replace, size
from pyspark.sql.functions import udf
from pyspark.sql.functions import explode
from pyspark.sql.types import StringType, IntegerType, DoubleType
from pyspark.sql.types import ArrayType

from pyspark.ml.feature import Tokenizer, StopWordsRemover, MinHashLSH
from pyspark.ml.linalg import Vectors, VectorUDT

import re
import time
import string
import datetime
import warnings
import numpy as np
warnings.filterwarnings("ignore")

In [None]:
conf = SparkConf().setAppName("Similar_Documents")
spark = SparkSession.builder.config(conf = conf).getOrCreate()
sc = spark.sparkContext
type(sc)

In [None]:
file_path = "/content/gdrive/My Drive/Massive_Data_Project/Job_Dataset/job_summary.csv"

df_Dataset = spark.read.csv(file_path, header=True, inferSchema=True, multiLine=True, escape='"',
                           encoding = "ISO-8859-1")

## choosing chunk of data

In [None]:
size = 100000
Job_df = df_Dataset.limit(size)

In [None]:
type(Job_df)

# Pre-processsing

### Exploratory analysis

In [None]:
Job_df.show(n = 10)

In [None]:
Job_df = Job_df.select("job_summary")
Job_df.show(n = 10)

#### Giving Id for each row

In [None]:
indexed_rdd = Job_df.rdd.zipWithIndex()
Job_df = indexed_rdd.map(lambda x: (x[1], x[0][0])).toDF(["Id", "job_summary"])

In [None]:
Job_df.show(5)

In [None]:
# checking missing values in the columns
Job_df.select([count(when(isnan(c), c)).alias(c) for c in Job_df.columns]).show()

In [None]:
#count distinct values in each column
Job_df.select([countDistinct(c).alias(c) for c in Job_df.columns]).show()

### Duplicates check

In [None]:
# show duplicates in Body column
Job_df.groupBy("job_summary").count().filter("count > 1").show()

In [None]:
# Filter the rows where 'job_summary' starts with 'Job Title:\nCerti'
filtered_rows = Job_df.filter(col("job_summary").startswith("Job Title:\nCertified Nursing Assistant (CNA)\nCompany"))
filtered_rows.show()

In [None]:
# ID number 1319
row_with_id_1319 = Job_df.filter(Job_df['ID'] == 1319).collect()

txt = row_with_id_1319[0][1]
txt

In [None]:
# ID number 1586
row_with_id_1586 = Job_df.filter(Job_df['ID'] == 1586).collect()

txt2 = row_with_id_1586[0][1]
txt2

In [None]:
# Checking Equality of texts
if txt == txt2:
    print('Equal')
else:
    print('Not Equal')

### Delete Duplicates Document

In [None]:
Job_df = Job_df.dropDuplicates(['job_summary'])

In [None]:
Job_df.select([countDistinct(c).alias(c) for c in Job_df.columns]).show()

In [None]:
# checking again duplicates
Job_df.groupBy("job_summary").count().filter("count > 1").show()

In [None]:
row_with_id_1586 = Job_df.filter(Job_df['ID'] == 1586).collect()
row_with_id_1586

# Text cleaning and pre-processing

In [None]:
Job_df = Job_df.select('Id',"job_summary")

### LoweCasing Text

In [None]:
Job_df = Job_df.withColumn('job_summary', lower(Job_df['job_summary']))

### Remove HTML Tags

In [None]:
def remove_html_tags(text):
    pattern = re.compile('<.*?>')
    return pattern.sub(r'', text) if text else text

remove_html_tags_udf = udf(remove_html_tags, StringType())
Job_df = Job_df.withColumn('job_summary', remove_html_tags_udf(Job_df['job_summary']))

###  Remove URLs

In [None]:
def remove_url(text):
    pattern = re.compile(r'https?://\S+|www\.\S+')
    return pattern.sub(r'', text)

remove_url_udf = udf(remove_url, StringType())
Job_df = Job_df.withColumn('job_summary', remove_url_udf(Job_df['job_summary']))

### Remove Punctuations

In [None]:
import string
def remove_punctuation(text):
    return text.translate(str.maketrans('', '', string.punctuation))

remove_punctuation_udf = udf(remove_punctuation, StringType())
Job_df = Job_df.withColumn('job_summary', remove_punctuation_udf(Job_df['job_summary']))

### Remove numbers

The following document has aroud 42 different number inside it, so we need to delet them.
3x12 , 180000060000, 12003, 0, 4, 02142024, 05152024, 13, 556166975, 56166975 , 12 , 7 , 7, 100 , 133, 3467, 68100, 10 , 25, 50 , 100,
100 , 20 , 3, 2, , 1, 0, 100, 15, 15, 15, 91, 401,36, 50, 2023, 2022, 2021 ,2020, 2019.

In [None]:
row_with_id_160 = Job_df.filter(Job_df['ID'] == 160).collect()
row_with_id_160

In [None]:
def remove_numbers(text):
    pattern = re.compile(r'\d+')
    return pattern.sub(r'', text)

remove_numbers_udf = udf(remove_numbers, StringType())
Job_df = Job_df.withColumn('job_summary', remove_numbers_udf(Job_df['job_summary']))

In [None]:
row_with_id_160 = Job_df.filter(Job_df['ID'] == 160).collect()
row_with_id_160

### Remove Non-ASCII characters:
Some texts have some non-ASCII characters like (ã°â\x9fâ\x9fâ¡), so we need to delete them from texts

In [None]:
row_with_id_915 = Job_df.filter(Job_df['ID'] == 915).collect()
row_with_id_915

In [None]:
def remove_non_ascii(text):
    if text is None:
        return None
    return re.sub(r'[^\x00-\x7F]+', '', str(text))

remove_non_ascii_udf = udf(remove_non_ascii, StringType())
Job_df = Job_df.withColumn('job_summary', remove_non_ascii_udf(Job_df['job_summary']))

In [None]:
row_with_id_915 = Job_df.filter(Job_df['ID'] == 915).collect()
row_with_id_915

### Remove extra space

In [None]:
from pyspark.sql.functions import regexp_replace, col, trim

def remove_extra_spaces(df, column_name):
    df = df.withColumn(column_name, regexp_replace(col(column_name), "\\s+", " "))
    return df.withColumn(column_name, trim(col(column_name)))

In [None]:
Job_df = remove_extra_spaces(Job_df, "job_summary")

### Tokenization

In [None]:
tokenizer = Tokenizer().setInputCol("job_summary").setOutputCol("Tokens")
Job_df = tokenizer.transform(Job_df)
Job_df.show(5)

### Removing Stopwords

In [None]:
remove_stopwords = StopWordsRemover()
stopwords = remove_stopwords.getStopWords()
print(stopwords[:10])
print(len(stopwords))

remove_stopwords.setInputCol("Tokens").setOutputCol("Tokens stopwords removed")
Job_df = remove_stopwords.transform(Job_df)

In [None]:
# counting the number of tokens after stopwords removed
Job_df = Job_df.withColumn("Number of tokens", size(col("Tokens")))
Job_df = Job_df.withColumn("Number of tokens After stopwords removed", size(col("Tokens stopwords removed")))
Job_df = Job_df.withColumn("Number of stopwords removed", size(col("Tokens stopwords removed")) - size(col("Tokens")) )

In [None]:
Job_df.show(10)

###  Join the words

To join the words back together after tokenization and stopword removal, you can use the concat_ws function provided by PySpark. Here’s how you can do it:

In [None]:
from pyspark.sql.functions import concat_ws

Job_df = Job_df.withColumn("Cleaned_text", concat_ws(" ", col("Tokens stopwords removed")))

In [None]:
Job_df.select("Cleaned_text").show(truncate=False)

## Final dataset

In [None]:
Job_df_proces = Job_df.select('Id', "Cleaned_text")

In [None]:
Job_df_proces.show()

### Creating Shingles

In [None]:
def shingle(text, k):
    shingles = set()
    words = text.split()
    for i in range(len(words) - k + 1):
        shingles.add(' '.join(words[i:i+k]))
    return list(shingles)

In [None]:
k = 2
shingle_udf = udf(lambda text: shingle(text, k), ArrayType(StringType()))
Job_df_proces = Job_df_proces.withColumn("shingles", shingle_udf(col("Cleaned_text")))

In [None]:
Job_df_proces.select("shingles").show(5)

#### convert shingles to sparse vectors

In [None]:
# Flatten the shingles column to get all unique shingles
unique_shingles = Job_df_proces.select(explode("shingles").alias("shingle")).distinct().collect()
shingle_index = {row["shingle"]: idx for idx, row in enumerate(unique_shingles)}

print("Unique shingles and their indices:")
print(shingle_index)

def shingles_to_sparse_vector(shingles):
    indices = sorted([shingle_index[sh] for sh in shingles if sh in shingle_index])
    values = [1.0] * len(indices)
    return Vectors.sparse(len(unique_shingles), indices, values)


sparse_vector_udf = udf(lambda shingles: shingles_to_sparse_vector(shingles), VectorUDT())

Job_df_proces = Job_df_proces.withColumn("features", sparse_vector_udf(col("shingles")))

In [None]:
Job_df_proces.select("features").show(truncate=False)

## Implementing MinHashLSH

In [None]:
start = time.time()

# Initialize MinHashLSH
mh = MinHashLSH(inputCol="features", outputCol="hashes", seed=12345, numHashTables=20)
model = mh.fit(Job_df_proces)
print("The hashed dataset where hashed values are stored in the column 'hashes':")
hash = model.transform(Job_df_proces)

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join to Calculate Jaccard Distances.
result = model.approxSimilarityJoin(hash, hash, 0.6, distCol="JaccardDistance").select(
    col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")
)

# Filter out self-pairs and display the results
result_filtered = result.filter(col("idA") < col("idB"))

end = time.time()
computation_time = round(end - start, 3)
print("Computation time: {} seconds".format(computation_time))

print('------------------------------------')


# https://spark.apache.org/docs/2.2.0/ml-features.html
# https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.ml.feature.MinHashLSH.html

In [None]:
type(result_filtered)

In [None]:
result_filtered.show()

In [None]:
result_filtered.sort(result_filtered.JaccardDistance.asc()).show(10)

In [None]:
result_filtered.sort(result_filtered.JaccardDistance.desc()).show(10)

In [None]:
# JaccardDistance between 0.2 and 0.3
filtered_result = result_filtered.filter((result_filtered.JaccardDistance >= 0.2) & (result_filtered.JaccardDistance <= 0.3))
filtered_result.sort(filtered_result.JaccardDistance.asc()).show(10)

#### Result of minhash function (hash values)

In [None]:
hash.show()

#### Sparce vector for first document , id = 160

In [None]:
hash.first()['features']

#### Signature vector for first document, id = 160
Values inside DenseVector shows value of each hash function.

In [None]:
hash.first()['hashes']

### Creating New dataframe in order to compare pair document with each other

In [None]:
Job_df2 = Job_df.select('Id', "Tokens stopwords removed")

In [None]:
from pyspark.sql import DataFrame

def analyze_text_by_id(df: DataFrame, id_number: int):
    row_with_id = df.filter(df['ID'] == id_number).collect()
    print(row_with_id)

    if not row_with_id:
        print(f"No row found with ID {id_number}")
        return

    txt = row_with_id[0][1:][0]

    print(f"Type of txt: {type(txt)}")
    print(f"Length of txt: {len(txt)}")
    print(f"Fourth character in txt: {txt[3]}")

    return txt

In [None]:
def analyze_lists(text1, text2):
    list1 = [word for word in text1 if word]
    list2 = [word for word in text2 if word]

    num_words_list1 = len(list1)
    num_words_list2 = len(list2)
    num_unique_words_list1 = len(set(list1))
    num_unique_words_list2 = len(set(list2))

    common_words = set(list1).intersection(list2)
    num_common_words = len(common_words)

    percentage_common_list1 = (num_common_words / num_unique_words_list1) * 100 if num_unique_words_list1 > 0 else 0
    percentage_common_list2 = (num_common_words / num_unique_words_list2) * 100 if num_unique_words_list2 > 0 else 0

    return (num_words_list1, num_words_list2, num_unique_words_list1,
            num_unique_words_list2, num_common_words,
            percentage_common_list1, percentage_common_list2)

### Comparing the Documents with 'ID' number of 1909 & 3014, which have Jaccard distance equal to 0.20



In [None]:
txt1 = analyze_text_by_id(Job_df2, 1909)

In [None]:
txt2 = analyze_text_by_id(Job_df2, 3014)

In [None]:
# Comparing number of common words inside Documents with 'ID' number of 1909 & 3014

(num_words_list1, num_words_list2, num_unique_words_list1,
 num_unique_words_list2, num_common_words,
 percentage_common_list1, percentage_common_list2) = analyze_lists(txt1, txt2)

print(f"Number of words in Text_1: {num_words_list1}")
print(f"Number of words in Text_2: {num_words_list2}")
print(f"Number of Unique words in Text_1: {num_unique_words_list1}")
print(f"Number of Unique words in Text_2: {num_unique_words_list2}")
print(f"Number of common Uniqe words: {num_common_words}")
print(f"Percentage of common words in Text_1: {percentage_common_list1:.2f}%")
print(f"Percentage of common words in Text_2: {percentage_common_list2:.2f}%")

### Comparing the Documents with 'ID' number of 3284 & 4955, which have Jaccard distance equal to 0.59

In [None]:
txt5 = analyze_text_by_id(Job_df2, 3284)

In [None]:
txt6 = analyze_text_by_id(Job_df2, 4955)

In [None]:
# Comparing number of common words inside Documents with 'ID' number of 3284 & 4955

(num_words_list1, num_words_list2, num_unique_words_list1,
 num_unique_words_list2, num_common_words,
 percentage_common_list1, percentage_common_list2) = analyze_lists(txt5, txt6)

print(f"Number of words in Text_1: {num_words_list1}")
print(f"Number of words in Text_2: {num_words_list2}")
print(f"Number of Unique words in Text_1: {num_unique_words_list1}")
print(f"Number of Unique words in Text_2: {num_unique_words_list2}")
print(f"Number of common Uniqe words: {num_common_words}")
print(f"Percentage of common words in Text_1: {percentage_common_list1:.2f}%")
print(f"Percentage of common words in Text_2: {percentage_common_list2:.2f}%")

### Comparing the Documents with 'ID' number of 503 & 948, which have Jaccard distance equal to 0.



In [None]:
txt3 = analyze_text_by_id(Job_df2, 503)

In [None]:
txt4 = analyze_text_by_id(Job_df2, 948)

In [None]:
# Comparing number of common words inside Documents with 'ID' number of 503 & 948

(num_words_list1, num_words_list2, num_unique_words_list1,
 num_unique_words_list2, num_common_words,
 percentage_common_list1, percentage_common_list2) = analyze_lists(txt3, txt4)

print(f"Number of words in Text_1: {num_words_list1}")
print(f"Number of words in Text_2: {num_words_list2}")
print(f"Number of Unique words in Text_1: {num_unique_words_list1}")
print(f"Number of Unique words in Text_2: {num_unique_words_list2}")
print(f"Number of common Uniqe words: {num_common_words}")
print(f"Percentage of common words in Text_1: {percentage_common_list1:.2f}%")
print(f"Percentage of common words in Text_2: {percentage_common_list2:.2f}%")

## Cheking the Equality of documents:
#### Cheking the Equality of documents with ['ID'] number 503 & 948, which they have Jaccard Distance equal to Zero.

In [None]:
# ID number 503
row_with_id_503 = Job_df.filter(Job_df['ID'] == 503).collect()
txt = row_with_id_503[0][1:][0]
txt

In [None]:
# ID number 948
row_with_id_948 = Job_df.filter(Job_df['ID'] == 948).collect()
txt2 = row_with_id_948[0][1:][0]
txt2

In [None]:
if txt == txt2:
    print('Equal')
else:
    print('Not Equal')

### Dence vector

In [None]:
def shingles_to_one_hot_vector(shingles):
    vector = np.zeros(len(unique_shingles))
    for sh in shingles:
        if sh in shingle_index:
            vector[shingle_index[sh]] = 1.0
    return Vectors.dense(vector.tolist())

# UDF to convert shingles to one-hot vectors
one_hot_vector_udf = udf(lambda shingles: shingles_to_one_hot_vector(shingles), VectorUDT())

Job_df_proces2 = Job_df_proces.withColumn("features_dence", one_hot_vector_udf(col("shingles")))

In [None]:
Job_df_proces2.select("features_dence").show(5, truncate=False)

In [None]:
Job_df_proces2.show(3)

In [None]:
start = time.time()

# Initialize MinHashLSH
mh = MinHashLSH(inputCol="features_dence", outputCol="hashes", seed=12345, numHashTables=20)
model = mh.fit(Job_df_proces2)
print("The hashed dataset where hashed values are stored in the column 'hashes':")
hash = model.transform(Job_df_proces2)

# Compute the locality sensitive hashes for the input rows, then perform approximate
# similarity join to Calculate Jaccard Distances.
result = model.approxSimilarityJoin(hash, hash, 0.6, distCol="JaccardDistance").select(
    col("datasetA.id").alias("idA"),
    col("datasetB.id").alias("idB"),
    col("JaccardDistance")
)

# Filter out self-pairs and display the results
result_filtered = result.filter(col("idA") < col("idB"))

end = time.time()
computation_time = round(end - start, 3)
print("Computation time: {} seconds".format(computation_time))

print('------------------------------------')