<a href="https://colab.research.google.com/github/katyaserg/Data-Mining/blob/main/DataMining_SimilarItems.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Importing libraries**

In [67]:
pip install pyspark



In [None]:
pip install findspark

Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


In [None]:
import pyspark
import findspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [None]:
! pip install -q kaggle

In [None]:
pip install pyspark nltk



In [None]:
import numpy as np
import pandas as pd
import re, math
import nltk
nltk.download('punkt_tab')
from pyspark.sql.functions import udf, length, expr, regexp_extract, collect_list
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import monotonically_increasing_id, regexp_replace, col, split, size, concat_ws
from pyspark.ml.linalg import Vectors, DenseVector, VectorUDT
from pyspark.ml.feature import MinHashLSH, HashingTF
import random
from sympy import nextprime

[nltk_data] Downloading package punkt_tab to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt_tab.zip.


**Uploading the dataset**

In [None]:
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

Dataset URL: https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024
License(s): ODC Attribution License (ODC-By)
Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content
100% 1.88G/1.88G [01:04<00:00, 33.0MB/s]
100% 1.88G/1.88G [01:04<00:00, 31.2MB/s]


In [None]:
!unzip -q ./1-3m-linkedin-jobs-and-skills-2024.zip -d .

In [None]:
sc = spark.sparkContext

**Data Cleaning**

In [None]:
df = pd.read_csv('/content/job_summary.csv')

In [None]:
df

Unnamed: 0,job_link,job_summary
0,https://www.linkedin.com/jobs/view/restaurant-...,Rock N Roll Sushi is hiring a Restaurant Manag...
1,https://www.linkedin.com/jobs/view/med-surg-re...,Schedule\n: PRN is required minimum 12 hours p...
2,https://www.linkedin.com/jobs/view/registered-...,Description\nIntroduction\nAre you looking for...
3,https://uk.linkedin.com/jobs/view/commercial-a...,Commercial account executive\nSheffield\nFull ...
4,https://www.linkedin.com/jobs/view/store-manag...,Address:\nUSA-CT-Newington-44 Fenn Road\nStore...
...,...,...
1297327,https://www.linkedin.com/jobs/view/roofing-sup...,We are currently seeking experienced commercia...
1297328,https://www.linkedin.com/jobs/view/service-cen...,Overview\nStable and growing organization\nCom...
1297329,https://www.linkedin.com/jobs/view/flight-qual...,Rôle et responsabilités\nJob Description:\nFli...
1297330,https://www.linkedin.com/jobs/view/global-sour...,Job Description\nAre You Ready to Make It Happ...


In [None]:
df['job_summary'][2]

'Description\nIntroduction\nAre you looking for a place to deliver excellent care patients deserve? At StoneSprings Hospital Center we support our colleagues in their positions. Join our Team as a(an) Registered Nurse Cath Lab and access programs to assist with every stage of your career.\nBenefits\nStoneSprings Hospital Center, offers a total rewards package that supports the health, life, career and retirement of our colleagues. The available plans and programs include:\nComprehensive medical coverage that covers many common services at no cost or for a low copay. Plans include prescription drug and behavioral health coverage as well as free telemedicine services and free AirMed medical transportation.\nAdditional options for dental and vision benefits, life and disability coverage, flexible spending accounts, supplemental health protection plans (accident, critical illness, hospital indemnity), auto and home insurance, identity theft protection, legal counseling, long-term care cove

In [None]:
schema = StructType([
    StructField("job_link", StringType(), True),
    StructField("job_summary", StringType(), True)
])

In [None]:
spark_df = spark.read.csv(
    '/content/job_summary.csv',
    header=True,
    schema=schema,
    sep=',',       # Specify the delimiter
    quote='"',     # Handle quotes properly
    escape='\\',   # Handle escape characters
    multiLine=True # Handle multiline fields
)

spark_df.show()
spark_df.printSchema()

+--------------------+--------------------+
|            job_link|         job_summary|
+--------------------+--------------------+
|https://www.linke...|Rock N Roll Sushi...|
|https://www.linke...|Schedule\n: PRN i...|
|https://www.linke...|"Description\nInt...|
|HCA Healthcare Co...|                NULL|
|If growth and con...| we encourage you...|
|Unlock the possib...|                NULL|
|We are an equal o...|            religion|
|           Show more|                NULL|
|          Show less"|                NULL|
|https://uk.linked...|Commercial accoun...|
|https://www.linke...|Address:\nUSA-CT-...|
|https://www.linke...|Description\nOur\...|
|https://www.linke...|Company Descripti...|
|https://uk.linked...|An exciting oppor...|
|https://www.linke...|Job Details:\nJob...|
|https://www.linke...|Our\nRestaurant T...|
|https://www.linke...|Our General Manag...|
|https://www.linke...|Earning potential...|
|https://www.linke...|Dollar General Co...|
|https://au.linked...|Restaurant

In [None]:
spark_df = spark_df.dropna(subset=['job_summary'])

In [None]:
spark_df = spark_df.withColumn('job_summary', regexp_replace(col('job_summary'), '\n', ' '))


In [None]:
spark_df = spark_df.withColumn('job_words', split(col('job_summary'), ' '))

In [None]:
spark_df = spark_df.filter(size(col('job_words')) > 5)

In [None]:
spark_df = spark_df.select('job_words').withColumn('doc_id', monotonically_increasing_id())


In [None]:
spark_df.show()

+--------------------+------+
|           job_words|doc_id|
+--------------------+------+
|[Rock, N, Roll, S...|     0|
|[Schedule, :, PRN...|     1|
|["Description, In...|     2|
|[, we, encourage,...|     3|
|[Commercial, acco...|     4|
|[Address:, USA-CT...|     5|
|[Description, Our...|     6|
|[Company, Descrip...|     7|
|[An, exciting, op...|     8|
|[Job, Details:, J...|     9|
|[Our, Restaurant,...|    10|
|[Our, General, Ma...|    11|
|[Earning, potenti...|    12|
|[Dollar, General,...|    13|
|[Restaurant, Desc...|    14|
|[Who, We, Are, We...|    15|
|[A, Place, Where,...|    16|
|[Description, The...|    17|
|["Overview, Descr...|    18|
|[, the, Team, Mem...|    19|
+--------------------+------+
only showing top 20 rows



In [None]:
df_scaled = spark_df.sample(withReplacement=False, fraction=0.00005, seed=42)


In [None]:
df_scaled = df_scaled.withColumn('job_words', concat_ws(' ', col('job_words')))


**Preprocessing**

In [None]:
nltk.download('stopwords')
nltk.download('punkt')
stopwords = set(nltk.corpus.stopwords.words('english'))

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.


In [None]:
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'\s+', ' ', text)  # Removal of extra whitespace
    text = re.sub(r'[^\w\s]', '', text)  # Removal of punctuation
    tokens = nltk.word_tokenize(text)
    tokens = [word for word in tokens if word not in stopwords]  # Removal of stopwords
    return ' '.join(tokens)

In [None]:
df_udf = udf(preprocess_text, StringType())

In [None]:
rdd = df_scaled.select('doc_id', 'job_words').rdd.map(lambda row: (row['doc_id'], preprocess_text(row['job_words'])))


In [None]:
rdd.toDF().show()

+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| 71617|position function...|
|106539|weve made lot pro...|
|117032|responsibilities ...|
|125621|employee may expo...|
|129527|atlas nationwide ...|
|226369|requisition id 16...|
|249104|dont give resilie...|
|254225|overview job desc...|
|376747|build internal ex...|
|381044|fulltime opportun...|
|384188|company descripti...|
|438781|technical support...|
|443170|description pleas...|
|444387|company descripti...|
|465163|company descripti...|
|473922|job description j...|
|474810|seasoned recruitm...|
|483497|job posting posit...|
|491555|job summary excit...|
|504687|practice area eri...|
+------+--------------------+
only showing top 20 rows



**Shingles**

In [None]:
import binascii

In [None]:
k = 7

In [None]:
shingles_rdd = rdd.flatMap(lambda doc: [(doc[0], doc[1][i:i+k]) for i in range(len(doc[1]) - k + 1)])


In [None]:
shingles_rdd.take(7)

[(71617, 'positio'),
 (71617, 'osition'),
 (71617, 'sition '),
 (71617, 'ition f'),
 (71617, 'tion fu'),
 (71617, 'ion fun'),
 (71617, 'on func')]

In [None]:
shingles_list = (rdd
                 .groupByKey()
                 .map(lambda x: (x[0], list(x[1]))))

In [None]:
shingles_list.toDF().show()

+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| 71617|[position functio...|
|106539|[weve made lot pr...|
|117032|[responsibilities...|
|125621|[employee may exp...|
|129527|[atlas nationwide...|
|226369|[requisition id 1...|
|249104|[dont give resili...|
|254225|[overview job des...|
|376747|[build internal e...|
|381044|[fulltime opportu...|
|384188|[company descript...|
|438781|[technical suppor...|
|443170|[description plea...|
|444387|[company descript...|
|465163|[company descript...|
|473922|[job description ...|
|474810|[seasoned recruit...|
|483497|[job posting posi...|
|491555|[job summary exci...|
|504687|[practice area er...|
+------+--------------------+
only showing top 20 rows



Characteristic matrix

In [None]:
def hash_shingle(shingle):
  return binascii.crc32(shingle.encode('utf-8')) & 0xffffffff

In [None]:
hash_rdd = shingles_rdd.mapValues(hash_shingle).distinct()

In [None]:
hash_rdd.take(7)

In [None]:
df = hash_rdd.toDF(["doc_id", "hashed_shingle"]) \
    .groupBy("doc_id") \
    .agg(collect_list("hashed_shingle").alias("hashed_shingles"))

In [None]:
df.show()

In [None]:
def collect_shingles(a, b):
    return a + b

In [None]:
hashed_shingles_rdd = hash_rdd.map(lambda x: (x[0], [x[1]])).reduceByKey(collect_shingles)


In [None]:
hashed_shingles_rdd.take(7)

In [None]:
hashed_shingles_list = hashed_shingles_rdd.flatMap(lambda x: x[1]).collect()


**MinHash**

In [None]:
h_functions = 100
b_bands = 8

In [None]:
def multiple(x, a, b, c):
    return (a * x + b) % c

In [None]:
params =[]

for _ in range(h_functions):
  a = random.randint(1, 10000)
  b = random.randint(1, 10000)
  max_ab = max(a, b)
  c = nextprime(max_ab + 1)
  params.append({"a": a, "b": b, "c": c})

params[:7]

In [None]:
def enum_shingles(e, hashed_shingles_list):
    doc_id, shingle = e
    return [((doc_id), (h), (hashed_shingles_list)) for h in range(h_functions)]


In [None]:
minhash_matrix = hash_rdd.flatMap(lambda e: enum_shingles(e, hashed_shingles_list))


In [None]:
minhash_matrix.toDF().show()

+-----+---+--------------------+
|   _1| _2|                  _3|
+-----+---+--------------------+
|71617|  0|[1244724455, 3052...|
|71617|  1|[1244724455, 3052...|
|71617|  2|[1244724455, 3052...|
|71617|  3|[1244724455, 3052...|
|71617|  4|[1244724455, 3052...|
|71617|  5|[1244724455, 3052...|
|71617|  6|[1244724455, 3052...|
|71617|  7|[1244724455, 3052...|
|71617|  8|[1244724455, 3052...|
|71617|  9|[1244724455, 3052...|
|71617| 10|[1244724455, 3052...|
|71617| 11|[1244724455, 3052...|
|71617| 12|[1244724455, 3052...|
|71617| 13|[1244724455, 3052...|
|71617| 14|[1244724455, 3052...|
|71617| 15|[1244724455, 3052...|
|71617| 16|[1244724455, 3052...|
|71617| 17|[1244724455, 3052...|
|71617| 18|[1244724455, 3052...|
|71617| 19|[1244724455, 3052...|
+-----+---+--------------------+
only showing top 20 rows



In [None]:
def minhash_map(docId_hashedShingles):
    doc_id, hashed_shingles = docId_hashedShingles
    minhashes = []
    for h in range(h_functions):
        min_h = math.inf
        for shingle in hashed_shingles:
            hash_value = multiple(shingle, **params[h])
            if hash_value < min_h:
                min_h = hash_value
        minhashes.append(min_h)
    return (doc_id, minhashes)

In [None]:
sig_matrix_rdd = hashed_shingles_rdd.map(minhash_map)

In [None]:
signature_df = sig_matrix_rdd.toDF(["doc_id", "minhashes"])

In [None]:
signature_df.show()

+------+--------------------+
|doc_id|           minhashes|
+------+--------------------+
| 71617|[5, 10, 12, 15, 3...|
|106539|[1, 4, 3, 0, 3, 2...|
|117032|[2, 4, 1, 6, 4, 1...|
|125621|[221, 325, 68, 49...|
|129527|[2, 1, 5, 1, 1, 4...|
|226369|[1, 4, 7, 3, 0, 2...|
|249104|[136, 75, 314, 34...|
|254225|[1, 0, 4, 4, 2, 1...|
|376747|[3, 105, 80, 151,...|
|381044|[3, 4, 5, 0, 3, 0...|
|384188|[2, 0, 10, 2, 3, ...|
|438781|[7, 1, 2, 12, 3, ...|
|443170|[1, 4, 4, 3, 2, 0...|
|444387|[2, 4, 5, 5, 4, 0...|
|465163|[0, 0, 6, 0, 3, 0...|
|473922|[1, 1, 1, 0, 0, 1...|
|474810|[3, 4, 5, 4, 0, 4...|
|483497|[1, 1, 5, 0, 4, 2...|
|491555|[2, 4, 7, 14, 3, ...|
|504687|[19, 4, 7, 16, 4,...|
+------+--------------------+
only showing top 20 rows



**Locality-Sensitive Hashing**

Setting the threshold

In [None]:
n_per_bands = h_functions // b_bands
threshold = (1/b_bands) ** (1/n_per_bands)

In [None]:
print("Threshold for candidate pairs: ", threshold) #Using a universal approach to set the treshold

Threshold for candidate pairs:  0.8408964152537145


In [None]:
threshold = 0.6 #Changing the treshold manually in order to get more relevant results

Using the MinHashLSH by Pyspark

In [None]:
array_to_vector_udf = udf(lambda x: DenseVector(x), VectorUDT())
signature_df = signature_df.withColumn("minhash_vector", array_to_vector_udf(col("minhashes"))) # converted with dense vector


In [None]:
signature_df = signature_df.drop("minhashes")

In [None]:
mh = MinHashLSH(inputCol="minhash_vector", outputCol="hashes", numHashTables=b_bands*n_per_bands, seed=56)


In [None]:
model = mh.fit(signature_df)

In [None]:
lsh_df = model.transform(signature_df)

**Finding similar vacancies**

In [None]:
def find_similar_jobs(lsh_df, model, threshold):
    # Performing self-join to find all pairs
    similar_items = model.approxSimilarityJoin(lsh_df, lsh_df, threshold, distCol="JaccardDistance")

    # Filtering out pairs with the same doc_id and JaccardDistance lower the threshold
    similar_pairs = (similar_items
                     .select(
                         col("datasetA.doc_id").alias("doc_id_A"),
                         col("datasetB.doc_id").alias("doc_id_B"),
                         col("JaccardDistance"))
                     .filter(col("doc_id_A") < col("doc_id_B"))
                     .filter(col("JaccardDistance") <= threshold)
                     .rdd
                     .map(lambda row: (row["doc_id_A"], row["doc_id_B"], row["JaccardDistance"]))
                     .collect())

    return similar_pairs

In [None]:
similar_pairs = find_similar_jobs(lsh_df, model, threshold)

In [None]:
similar_pairs

[(249104, 832967, 0.010000000000000009),
 (465163, 1499220, 0.5),
 (619564, 964835, 0.29000000000000004),
 (619564, 1367282, 0.24),
 (832967, 1445985, 0.3737373737373737),
 (1057604, 1513641, 0.28865979381443296),
 (1452354, 1691604, 0.4545454545454546),
 (611352, 906430, 0.3950617283950617),
 (706750, 906430, 0.43529411764705883),
 (106539, 473922, 0.4736842105263158),
 (384188, 883786, 0.43333333333333335),
 (483497, 491555, 0.4382022471910112),
 (491555, 1215669, 0.25),
 (721965, 947030, 0.16000000000000003),
 (806204, 1505655, 0.18999999999999995),
 (947030, 1178559, 0.37),
 (964835, 1505655, 0.3191489361702128),
 (1222814, 1370809, 0.5612244897959184),
 (483497, 508737, 0.5060240963855422),
 (620863, 1691604, 0.5714285714285714),
 (804301, 1057604, 0.5111111111111111),
 (905711, 1109007, 0.4591836734693877),
 (576867, 1361632, 0.5384615384615384),
 (226369, 1109007, 0.505050505050505),
 (381044, 964835, 0.3085106382978723),
 (990545, 1452354, 0.5595238095238095),
 (473922, 906430,

**Results**

In [None]:
similar_df = pd.DataFrame(similar_pairs, columns=['doc_id1', 'doc_id2', 'jaccard_distance'])


In [None]:
similar_df

Unnamed: 0,doc_id1,doc_id2,jaccard_distance
0,249104,832967,0.010000
1,465163,1499220,0.500000
2,619564,964835,0.290000
3,619564,1367282,0.240000
4,832967,1445985,0.373737
...,...,...,...
3922,508737,1084112,0.472527
3923,721965,973215,0.278351
3924,752561,1481251,0.330000
3925,1367282,1488599,0.276596


Example of two pairs

In [70]:
for pair in similar_pairs[:2]:
    doc1 = pair[0]
    doc2 = pair[1]
    dist = pair[2]

    # Filter DataFrame to retrieve text for doc1 and doc2
    doc1_text = df_scaled.filter(col('doc_id') == doc1).select('job_words').first()[0]
    doc2_text = df_scaled.filter(col('doc_id') == doc2).select('job_words').first()[0]

    # Job summaries for doc1 and doc2
    print("Document 1:")
    print(doc1_text)
    print("\nDocument 2:")
    print(doc2_text)
    print("\nJaccard Distance: {:.4f}".format(dist))
    print("\n---")

Document 1:
 you don't give up. You are resilient and only get stronger from the ups and downs

Document 2:
 from captivating performances to vibrant art exhibitions

Jaccard Distance: 0.0100

---
Document 1:
Company Description Work with Us. Change the World. At AECOM, we're delivering a better world. Whether improving your commute, keeping the lights on, providing access to clean water, or transforming skylines, our work helps people and communities thrive. We are the world's trusted infrastructure consulting firm, partnering with clients to solve the world’s most complex challenges and build legacies for future generations. There has never been a better time to be at AECOM. With accelerating infrastructure investment worldwide, our services are in great demand. We invite you to bring your bold ideas and big dreams and become part of a global team of nearly 50,000 planners, designers, engineers, scientists, digital innovators, program and construction managers and other professionals