# Algorithms for Massive Data


Project Finding Similar Items


Michela Mazzaglia academic year 2023/2024

## Importing libraries

In [1]:
pip install pyspark



In [2]:
pip install findspark



In [3]:
import pyspark
import findspark
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()
spark

In [4]:
! pip install -q kaggle

In [5]:
pip install pyspark nltk



In [6]:
import numpy as np
import pandas as pd
import re, math
import nltk
from pyspark.sql.functions import udf, length, expr, regexp_extract, collect_list
from pyspark.sql.types import StructType, StructField, StringType, LongType
from pyspark.sql.functions import monotonically_increasing_id, regexp_replace, col, split, size, concat_ws
from pyspark.ml.linalg import Vectors, DenseVector, VectorUDT
from pyspark.ml.feature import MinHashLSH, HashingTF
import random
from sympy import nextprime

## Uploading the dataset

In [None]:
from google.colab import files

files.upload() # upload your kaggle api key

In [None]:
! mkdir ~/.kaggle

! cp kaggle.json ~/.kaggle/

In [9]:
! chmod 600 ~/.kaggle/kaggle.json

In [10]:
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

Dataset URL: https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024
License(s): ODC Attribution License (ODC-By)
1-3m-linkedin-jobs-and-skills-2024.zip: Skipping, found more recently modified local copy (use --force to force download)


In [None]:
!unzip -q ./1-3m-linkedin-jobs-and-skills-2024.zip -d .

In [12]:
sc = spark.sparkContext

## Data Cleaning & Preprocessing

In [None]:
df_0 = pd.read_csv('/content/job_summary.csv')

In [None]:
df_0

Unnamed: 0,job_link,job_summary
0,https://www.linkedin.com/jobs/view/restaurant-...,Rock N Roll Sushi is hiring a Restaurant Manag...
1,https://www.linkedin.com/jobs/view/med-surg-re...,Schedule\n: PRN is required minimum 12 hours p...
2,https://www.linkedin.com/jobs/view/registered-...,Description\nIntroduction\nAre you looking for...
3,https://uk.linkedin.com/jobs/view/commercial-a...,Commercial account executive\nSheffield\nFull ...
4,https://www.linkedin.com/jobs/view/store-manag...,Address:\nUSA-CT-Newington-44 Fenn Road\nStore...
...,...,...
1297327,https://www.linkedin.com/jobs/view/roofing-sup...,We are currently seeking experienced commercia...
1297328,https://www.linkedin.com/jobs/view/service-cen...,Overview\nStable and growing organization\nCom...
1297329,https://www.linkedin.com/jobs/view/flight-qual...,Rôle et responsabilités\nJob Description:\nFli...
1297330,https://www.linkedin.com/jobs/view/global-sour...,Job Description\nAre You Ready to Make It Happ...


In [None]:
df_0['job_summary'][2]

'Description\nIntroduction\nAre you looking for a place to deliver excellent care patients deserve? At StoneSprings Hospital Center we support our colleagues in their positions. Join our Team as a(an) Registered Nurse Cath Lab and access programs to assist with every stage of your career.\nBenefits\nStoneSprings Hospital Center, offers a total rewards package that supports the health, life, career and retirement of our colleagues. The available plans and programs include:\nComprehensive medical coverage that covers many common services at no cost or for a low copay. Plans include prescription drug and behavioral health coverage as well as free telemedicine services and free AirMed medical transportation.\nAdditional options for dental and vision benefits, life and disability coverage, flexible spending accounts, supplemental health protection plans (accident, critical illness, hospital indemnity), auto and home insurance, identity theft protection, legal counseling, long-term care cove

Spark

In [13]:
schema = StructType([
    StructField("job_link", StringType(), True),
    StructField("job_summary", StringType(), True)
])

In [14]:
spark_df = spark.read.csv(
    '/content/job_summary.csv',
    header=True,
    schema=schema,
    sep=',',       # Specify the delimiter
    quote='"',     # Handle quotes properly
    escape='\\',   # Handle escape characters
    multiLine=True # Handle multiline fields
)

spark_df.show()
spark_df.printSchema()

+--------------------+--------------------+
|            job_link|         job_summary|
+--------------------+--------------------+
|https://www.linke...|Rock N Roll Sushi...|
|https://www.linke...|Schedule\n: PRN i...|
|https://www.linke...|"Description\nInt...|
|HCA Healthcare Co...|                NULL|
|If growth and con...| we encourage you...|
|Unlock the possib...|                NULL|
|We are an equal o...|            religion|
|           Show more|                NULL|
|          Show less"|                NULL|
|https://uk.linked...|Commercial accoun...|
|https://www.linke...|Address:\nUSA-CT-...|
|https://www.linke...|Description\nOur\...|
|https://www.linke...|Company Descripti...|
|https://uk.linked...|An exciting oppor...|
|https://www.linke...|Job Details:\nJob...|
|https://www.linke...|Our\nRestaurant T...|
|https://www.linke...|Our General Manag...|
|https://www.linke...|Earning potential...|
|https://www.linke...|Dollar General Co...|
|https://au.linked...|Restaurant

In [15]:
spark_df = spark_df.dropna(subset=['job_summary'])

In [16]:
spark_df = spark_df.withColumn('job_summary', regexp_replace(col('job_summary'), '\n', ' '))

In [17]:
spark_df = spark_df.withColumn('job_words', split(col('job_summary'), ' '))

In [18]:
spark_df = spark_df.filter(size(col('job_words')) > 4)

In [19]:
spark_df = spark_df.select('job_words').withColumn('doc_id', monotonically_increasing_id())

In [20]:
spark_df.show()

+--------------------+------+
|           job_words|doc_id|
+--------------------+------+
|[Rock, N, Roll, S...|     0|
|[Schedule, :, PRN...|     1|
|["Description, In...|     2|
|[, we, encourage,...|     3|
|[Commercial, acco...|     4|
|[Address:, USA-CT...|     5|
|[Description, Our...|     6|
|[Company, Descrip...|     7|
|[An, exciting, op...|     8|
|[Job, Details:, J...|     9|
|[Our, Restaurant,...|    10|
|[Our, General, Ma...|    11|
|[Earning, potenti...|    12|
|[Dollar, General,...|    13|
|[Restaurant, Desc...|    14|
|[Who, We, Are, We...|    15|
|[A, Place, Where,...|    16|
|[Description, The...|    17|
|["Overview, Descr...|    18|
|[, seat, them, at...|    19|
+--------------------+------+
only showing top 20 rows



In [21]:
df_scaled = spark_df.sample(withReplacement=True, fraction=0.00005, seed=42)

In [22]:
df_scaled = df_scaled.withColumn('job_words', concat_ws(' ', col('job_words')))

## Preprocessing

In [23]:
nltk.download('stopwords')
nltk.download('punkt')
stopwords = set(nltk.corpus.stopwords.words('english'))

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


In [24]:
def preprocess_text(text):
    text = text.lower()
    text = re.sub(r'\s+', ' ', text)  # Remove extra whitespace
    text = re.sub(r'[^\w\s]', '', text)  # Remove punctuation
    tokens = nltk.word_tokenize(text)
    tokens = [word for word in tokens if word not in stopwords]  # Remove stopwords
    return ' '.join(tokens)

In [25]:
df_udf = udf(preprocess_text, StringType())

In [26]:
rdd = df_scaled.select('doc_id', 'job_words').rdd.map(lambda row: (row['doc_id'], preprocess_text(row['job_words'])))

In [27]:
rdd.toDF().show()

+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| 13395|grass greener sec...|
| 34993|client childrens ...|
| 47677|possess valid cla...|
| 64146|demonstrates inte...|
| 69905|new adventure awa...|
|104317|details signon bo...|
|143336|parttime intermit...|
|152380|conference center...|
|188603|scheduled nclex w...|
|218886|northern tier hig...|
|242951|experience corps ...|
|252827|role sr serviceno...|
|273921|us join us winco ...|
|278072|win sport school ...|
|304884|job details job l...|
|328483|department econom...|
|451330|zoom take great c...|
|455372|immediate start e...|
|459748|northrop grumman ...|
|459967|position interest...|
+------+--------------------+
only showing top 20 rows



### Shingles

In [28]:
import binascii

In [29]:
k = 3

In [30]:
shingles_rdd = rdd.flatMap(lambda doc: [(doc[0], doc[1][i:i+k]) for i in range(len(doc[1]) - k + 1)])

In [31]:
shingles_rdd.take(7)

[(13395, 'gra'),
 (13395, 'ras'),
 (13395, 'ass'),
 (13395, 'ss '),
 (13395, 's g'),
 (13395, ' gr'),
 (13395, 'gre')]

In [32]:
shingles_list = (rdd
                 .groupByKey()
                 .map(lambda x: (x[0], list(x[1]))))

In [33]:
shingles_list.toDF().show()

+------+--------------------+
|    _1|                  _2|
+------+--------------------+
| 13395|[grass greener se...|
| 34993|[client childrens...|
| 47677|[possess valid cl...|
| 64146|[demonstrates int...|
| 69905|[new adventure aw...|
|104317|[details signon b...|
|143336|[parttime intermi...|
|152380|[conference cente...|
|188603|[scheduled nclex ...|
|218886|[northern tier hi...|
|242951|[experience corps...|
|252827|[role sr servicen...|
|273921|[us join us winco...|
|278072|[win sport school...|
|304884|[job details job ...|
|328483|[department econo...|
|451330|[zoom take great ...|
|455372|[immediate start ...|
|459748|[northrop grumman...|
|459967|[position interes...|
+------+--------------------+
only showing top 20 rows



Characteristic matrix

In [34]:
def hash_shingle(shingle):
  return binascii.crc32(shingle.encode('utf-8')) & 0xffffffff

In [35]:
hash_rdd = shingles_rdd.mapValues(hash_shingle).distinct()

In [36]:
hash_rdd.take(7)

[(13395, 2506444301),
 (13395, 501096268),
 (13395, 2068476598),
 (13395, 2525627878),
 (13395, 4020559682),
 (13395, 1404793474),
 (13395, 2450033172)]

In [37]:
df = hash_rdd.toDF(["doc_id", "hashed_shingle"]) \
    .groupBy("doc_id") \
    .agg(collect_list("hashed_shingle").alias("hashed_shingles"))

In [38]:
df.show()

+-------+--------------------+
| doc_id|     hashed_shingles|
+-------+--------------------+
| 608814|[2743074591, 9616...|
| 304884|[4225294584, 3496...|
|1663266|[210272711, 33932...|
|1618038|[1811742144, 3119...|
|1152253|[2189397276, 1432...|
|  64146|[2601924841, 3154...|
| 773878|[4225294584, 3496...|
| 218886|[3781580864, 4138...|
|  69905|[1810056261, 3930...|
| 909604|[1767766964, 4231...|
| 872160|[2068476598, 2576...|
| 684947|[2179260663, 2228...|
|1460189|[595022058, 58230...|
|1388218|[4006809354, 4168...|
| 898066|[2244290826, 3530...|
| 273921|[2449963348, 2434...|
|  47677|[2161764012, 1909...|
| 525916|[4225294584, 3496...|
|1450243|[2434037902, 3329...|
|1621468|[1629029770, 1643...|
+-------+--------------------+
only showing top 20 rows



In [39]:
def collect_shingles(a, b):
    return a + b

In [40]:
hashed_shingles_rdd = hash_rdd.map(lambda x: (x[0], [x[1]])).reduceByKey(collect_shingles)

In [None]:
hashed_shingles_rdd.take(7)

In [42]:
hashed_shingles_list = hashed_shingles_rdd.flatMap(lambda x: x[1]).collect()

### MinHash


---
Fast approximation to the Jaccard Similarity Coefficient between any two finite sets




In [43]:
h_functions = 120
b_bands = 10

In [44]:
def multiple(x, a, b, c):
    return (a * x + b) % c

In [45]:
params =[]

for _ in range(h_functions):
  a = random.randint(1, 10000)
  b = random.randint(1, 10000)
  max_ab = max(a, b)
  c = nextprime(max_ab + 1)
  params.append({"a": a, "b": b, "c": c})

params[:7]

[{'a': 2953, 'b': 1394, 'c': 2957},
 {'a': 8416, 'b': 2453, 'c': 8419},
 {'a': 1980, 'b': 8895, 'c': 8923},
 {'a': 46, 'b': 6695, 'c': 6701},
 {'a': 4691, 'b': 4298, 'c': 4703},
 {'a': 7594, 'b': 157, 'c': 7603},
 {'a': 6890, 'b': 5622, 'c': 6899}]

In [46]:
def enum_shingles(e, hashed_shingles_list):
    doc_id, shingle = e
    return [((doc_id), (h), (hashed_shingles_list)) for h in range(h_functions)]

In [47]:
minhash_matrix = hash_rdd.flatMap(lambda e: enum_shingles(e, hashed_shingles_list))

In [48]:
minhash_matrix.toDF().show()

+-----+---+--------------------+
|   _1| _2|                  _3|
+-----+---+--------------------+
|13395|  0|[2506444301, 5010...|
|13395|  1|[2506444301, 5010...|
|13395|  2|[2506444301, 5010...|
|13395|  3|[2506444301, 5010...|
|13395|  4|[2506444301, 5010...|
|13395|  5|[2506444301, 5010...|
|13395|  6|[2506444301, 5010...|
|13395|  7|[2506444301, 5010...|
|13395|  8|[2506444301, 5010...|
|13395|  9|[2506444301, 5010...|
|13395| 10|[2506444301, 5010...|
|13395| 11|[2506444301, 5010...|
|13395| 12|[2506444301, 5010...|
|13395| 13|[2506444301, 5010...|
|13395| 14|[2506444301, 5010...|
|13395| 15|[2506444301, 5010...|
|13395| 16|[2506444301, 5010...|
|13395| 17|[2506444301, 5010...|
|13395| 18|[2506444301, 5010...|
|13395| 19|[2506444301, 5010...|
+-----+---+--------------------+
only showing top 20 rows



In [49]:
def minhash_map(docId_hashedShingles):
    doc_id, hashed_shingles = docId_hashedShingles
    minhashes = []
    for h in range(h_functions):
        min_h = math.inf
        for shingle in hashed_shingles:
            hash_value = multiple(shingle, **params[h])
            if hash_value < min_h:
                min_h = hash_value
        minhashes.append(min_h)
    return (doc_id, minhashes)

In [50]:
sig_matrix_rdd = hashed_shingles_rdd.map(minhash_map)

In [51]:
signature_df = sig_matrix_rdd.toDF(["doc_id", "minhashes"])

In [52]:
signature_df.show()

+------+--------------------+
|doc_id|           minhashes|
+------+--------------------+
| 13395|[4, 7, 2, 9, 4, 4...|
| 34993|[4, 7, 2, 2, 1, 4...|
| 47677|[9, 39, 121, 38, ...|
| 64146|[42, 65, 72, 57, ...|
| 69905|[0, 7, 2, 9, 4, 4...|
|104317|[1, 7, 2, 0, 5, 4...|
|143336|[42, 65, 2, 82, 1...|
|152380|[168, 131, 34, 87...|
|188603|[111, 131, 2, 187...|
|218886|[4, 7, 2, 9, 4, 4...|
|242951|[9, 7, 7, 2, 4, 7...|
|252827|[4, 7, 2, 14, 4, ...|
|273921|[1, 7, 2, 9, 2, 4...|
|278072|[3, 7, 2, 52, 4, ...|
|304884|[0, 7, 2, 2, 4, 4...|
|328483|[4, 7, 2, 2, 4, 4...|
|451330|[9, 7, 2, 26, 8, ...|
|455372|[4, 7, 2, 9, 4, 4...|
|459748|[9, 7, 2, 1, 4, 4...|
|459967|[0, 7, 2, 2, 4, 4...|
+------+--------------------+
only showing top 20 rows



### Locality-Sensitive Hashing

Define the threshold

In [53]:
# having h_functions = 100 and b_bands = 10
n_per_bands = h_functions // b_bands
threshold = (1/b_bands) ** (1/n_per_bands)

In [54]:
print("Threshold for candidate pairs: ", threshold)

Threshold for candidate pairs:  0.8254041852680184


Using the MinHashLSH provided by Pyspark

In [55]:
array_to_vector_udf = udf(lambda x: DenseVector(x), VectorUDT())
signature_df = signature_df.withColumn("minhash_vector", array_to_vector_udf(col("minhashes"))) # converted with dense vector

In [56]:
signature_df = signature_df.drop("minhashes")

In [57]:
mh = MinHashLSH(inputCol="minhash_vector", outputCol="hashes", numHashTables=b_bands*n_per_bands, seed=56)

In [58]:
model = mh.fit(signature_df)

In [59]:
lsh_df = model.transform(signature_df)

## Finding similar jobs

In [60]:
def find_similar_jobs(lsh_df, model, threshold):
    # Perform self-join to find all pairs
    similar_items = model.approxSimilarityJoin(lsh_df, lsh_df, threshold, distCol="JaccardDistance")

    # Filter out pairs with the same doc_id and JaccardDistance above the threshold
    similar_pairs = (similar_items
                     .select(
                         col("datasetA.doc_id").alias("doc_id_A"),
                         col("datasetB.doc_id").alias("doc_id_B"),
                         col("JaccardDistance"))
                     .filter(col("doc_id_A") < col("doc_id_B"))
                     .filter(col("JaccardDistance") <= threshold)
                     .rdd
                     .map(lambda row: (row["doc_id_A"], row["doc_id_B"], row["JaccardDistance"]))
                     .collect())

    return similar_pairs

In [61]:
similar_pairs = find_similar_jobs(lsh_df, model, threshold)

In [62]:
similar_pairs

[(47677, 624623, 0.008333333333333304),
 (152380, 1750168, 0.050000000000000044),
 (278072, 1087080, 0.09734513274336287),
 (451330, 513391, 0.08771929824561409),
 (472496, 1388218, 0.1228070175438597),
 (513391, 1320765, 0.1166666666666667),
 (544349, 1038199, 0.18333333333333335),
 (579745, 1415018, 0.06837606837606836),
 (624623, 642420, 0.19166666666666665),
 (773878, 1087080, 0.08181818181818179),
 (1038199, 1388218, 0.14414414414414412),
 (1087080, 1415018, 0.0854700854700855),
 (1145791, 1388218, 0.125),
 (1331267, 1559819, 0.03389830508474578),
 (1415018, 1618038, 0.1282051282051282),
 (1371266, 1559819, 0.1271186440677966),
 (1390884, 1559819, 0.06779661016949157),
 (13395, 64146, 0.1512605042016807),
 (152380, 546299, 0.22499999999999998),
 (252827, 1087080, 0.10909090909090913),
 (459967, 1373757, 0.1188118811881188),
 (507283, 756249, 0.1351351351351351),
 (756249, 1199836, 0.0847457627118644),
 (34993, 1331267, 0.11304347826086958),
 (242951, 950765, 0.11504424778761058),


### Some results

In [63]:
similar_df = pd.DataFrame(similar_pairs, columns=['doc_id1', 'doc_id2', 'jaccard_sim'])

In [64]:
similar_df

Unnamed: 0,doc_id1,doc_id2,jaccard_sim
0,47677,624623,0.008333
1,152380,1750168,0.050000
2,278072,1087080,0.097345
3,451330,513391,0.087719
4,472496,1388218,0.122807
...,...,...,...
2770,218886,624623,0.116667
2771,218886,898066,0.117647
2772,451330,689406,0.120690
2773,507283,878756,0.182609


In [65]:
for pair in similar_pairs[:2]:
    doc1 = pair[0]
    doc2 = pair[1]

    # Filter DataFrame to retrieve text for doc1 and doc2
    doc1_text = df_scaled.filter(col('doc_id') == doc1).select('job_words').first()[0]
    doc2_text = df_scaled.filter(col('doc_id') == doc2).select('job_words').first()[0]

    # Display job summaries for doc1 and doc2
    print("Document 1:")
    print(doc1_text)
    print("\nDocument 2:")
    print(doc2_text)
    print("\n---")

Document 1:
 AND possess a valid Class C California driver's license with a safe driving record or driver's license from another state with a safe driving record.

Document 2:
 people and ideas. We are source of possibilities since 1964. Cascades have been providing sustainable

---
Document 1:
 and conference center clients.

Document 2:
Job Description This position will be scheduled approximately 40 hours per week at our Lindner Research Center located at 2123 Auburn Ave, Suite 424, Cincinnati, OH 45219. Accurately and professionally works with Operations Coordinators and RN’s to assist in the execution all clinical projects assigned to him/her: assists in project initiation, maintains patient continuance, assures protocol compliance, reports to Coordinators and nurses on patient/progress status. Needs phlebotomy training and experience. Contributes to the smooth patient flow during patient office visitations. Show more Show less

---
