In [1]:
pip install pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m2.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=54d3384809fa03357714bde5ffcbb2669c20510ab05262a832869c93e7551948
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


In [2]:
# Importing Labraries
import pandas as pd
import numpy as np
import nltk
import pyspark
from pyspark.sql import functions as F
from pyspark.sql.functions import col, udf
from pyspark.sql import SparkSession
from pyspark.sql.types import ArrayType, StringType, FloatType,IntegerType
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from pyspark.sql.functions import monotonically_increasing_id
from pyspark.sql.functions import lower
nltk.download('punkt')
nltk.download('stopwords')
# Data Download
from google.colab import files

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [3]:
# Using Kaggle API to Download the Dataset

uploaded = files.upload()

for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

# Then move kaggle.json into the folder where the API expects to find it.
!mkdir -p ~/.kaggle/ && mv kaggle.json ~/.kaggle/ && chmod 600 ~/.kaggle/kaggle.json

Saving kaggle.json to kaggle.json
User uploaded file "kaggle.json" with length 72 bytes


In [4]:
# Download the Dataset
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content
100% 1.88G/1.88G [00:46<00:00, 43.5MB/s]
100% 1.88G/1.88G [00:46<00:00, 43.0MB/s]


In [5]:
# Unzip and extracting data into content folder
from zipfile import ZipFile
file_name = "/content/1-3m-linkedin-jobs-and-skills-2024.zip"
with ZipFile(file_name,'r') as zip:
  zip.extractall()
  print('Done')
  zip.extractall(path="/content")

Done


In [6]:
# create spark session
spark = SparkSession.builder \
    .appName("similaritems") \
    .getOrCreate()

In [7]:
# Creating Dataframe
df = spark.read \
    .format("csv") \
    .option("header", "true")\
    .load("/content/job_summary.csv").select('job_summary')

In [13]:
# Counting number of rows
print(df.count())

48219735


In [8]:
# Drop rows with null values in either of the columns
df_dropna = df.select("job_summary")
df_dropna = df_dropna.dropna()

In [14]:
# Counting number of rows after removing NaN
print(df_dropna.count())

15572837


In [9]:
# Define a UDF to count sentences using NLTK
def count_sentences(text):
    return len(nltk.sent_tokenize(text)) if text else 0

count_sentences_udf = udf(count_sentences, IntegerType())
# Add a new column with sentence count
df_filtered = df_dropna.withColumn("sentence_count", count_sentences_udf("job_summary"))


# Filter rows with more than sentence count more then 10 sentence and only 500rows
df_filtered = df_filtered.filter(col("sentence_count") > 10)
df_filtered = df_filtered.limit(500)

In [10]:
# Add an index column
df_index = df_filtered.withColumn("Index", monotonically_increasing_id() + 1)

In [11]:
 # Creating lower case text column
lower_df = df_index.withColumn("lower", lower(df_index["job_summary"]))

# stopwords from NLTK
stop_words = set(stopwords.words('english'))

# Define a UDF to remove stop words from a job_summary
def stop_words_removal(job_summary):
    words = job_summary.split()
    stopwords = [word for word in words if word.lower() not in stop_words]
    return ' '.join(stopwords)

# Register the UDF
stop_words_removal_udf = udf(stop_words_removal, StringType())

# Apply stop words removal UDF to the 'text' column
filtered_df = lower_df.withColumn("filtered_text", stop_words_removal_udf("lower"))
filtered_df = filtered_df.select("Index","job_summary","sentence_count","lower","filtered_text" )

In [12]:
# Show the result
filtered_df.show()

+-----+--------------------+--------------+--------------------+--------------------+
|Index|         job_summary|sentence_count|               lower|       filtered_text|
+-----+--------------------+--------------+--------------------+--------------------+
|    1|Job Title: Senior...|            31|job title: senior...|job title: senior...|
|    2|POSITION SUMMARYT...|            25|position summaryt...|position summaryt...|
|    3|At Dominion Energ...|            11|at dominion energ...|dominion energy l...|
|    4|Overview Discover...|            24|overview discover...|overview discover...|
|    5|POSITION ELIGIBLE...|            13|position eligible...|position eligible...|
|    6|CaroMont Health i...|            16|caromont health i...|caromont health h...|
|    7|Hello, nurses! We...|            22|hello, nurses! we...|hello, nurses! re...|
|    8|Hello, nurses! We...|            21|hello, nurses! we...|hello, nurses! re...|
|    9|Overview Home Hea...|            11|overview ho

In [15]:
# for verifying the number of sentence
print(df_filtered.select('job_summary').limit(10).collect()[7][0])

Hello, nurses! We are Reliance Home Health Caregivers! Immediate Hire $40.00 RN/ $ 33.00 LPN.We now need a SCHOOL NURSE to care for a 18-year-old male who lives in Country Club hills,IL 60478 covering Monday-Friday 6am-6pm.We are a home health care company that specializes in long-term 1-on-1 private duty nursing for disabled pediatric and adult patients on g-tubes, trachs, and vents. We have immediate openings for licensed nurses who want to truly make a difference in their patients' lives! We want to show you how a company that cares about their nurses and families can be! We have amazing schedulers who work hard to match the perfect nurse in specific cases with our precious children. We have amazing case managers and office staff that care about our employees. It is our mission that we strive to be the best pediatric nursing home health agency available. Nurses without pediatric experience will be considered. Training is available as well for pediatrics, home health and various skil

In [16]:
# Function to generate shingles from text
def generate_shingles(text, k=4):
    tokens = text.split()
    shingles = set()
    for i in range(len(tokens) - k + 1):
        shingle = " ".join(tokens[i:i+k])
        shingles.add(shingle)
    return list(shingles)
# Define a UDF for shingle generation
generate_shingles_udf = F.udf(generate_shingles, ArrayType(StringType()))
# Apply shingle generation UDF to filtertext
shingled_df = filtered_df.withColumn("shingles", generate_shingles_udf(col("filtered_text")))


In [17]:
# Define a function to calculate Jaccard similarity
def jaccard_similarity(s1, s2):
    set1 = set(s1)
    set2 = set(s2)
    intersection = len(set1.intersection(set2))
    union = len(set1.union(set2))
    return intersection / union if union != 0 else 0.0

# Define a UDF for Jaccard similarity calculation
jaccard_udf = udf(jaccard_similarity, FloatType())

In [18]:
# Cross join to create combinations
cross_joined_df2 = shingled_df.crossJoin(shingled_df.withColumnRenamed("Index", "Index2").withColumnRenamed("shingles", "shingles2"))
# Calculate Jaccard similarity for each pair of shingles
jaccard_df1 = cross_joined_df2.withColumn("jaccard_similarity_onshingles", jaccard_udf(col("shingles"), col("shingles2"))) \
                             .select("Index", "Index2", "jaccard_similarity_onshingles")
# Filter out rows where Index is not equal to Index2
jaccard_df1 = jaccard_df1.filter(col("Index") != col("Index2"))
jaccard_df1 = jaccard_df1.filter(col("jaccard_similarity_onshingles") > 0.20)


In [19]:
jaccard_df1.show()

+-----+------+-----------------------------+
|Index|Index2|jaccard_similarity_onshingles|
+-----+------+-----------------------------+
|    2|    32|                   0.94860816|
|    7|     8|                    0.7690355|
|    7|    15|                   0.77692306|
|    8|     7|                    0.7690355|
|    8|    15|                        0.872|
|   10|    74|                   0.25268817|
|   10|    82|                   0.28592163|
|   10|   102|                   0.31753555|
|   10|   167|                   0.30733946|
|   10|   214|                     0.284127|
|   10|   316|                    0.2976378|
|   10|   338|                    0.3170347|
|   10|   400|                   0.36983472|
|   10|   425|                   0.30500758|
|   10|   450|                    0.3013468|
|   10|   459|                   0.32194245|
|   10|   464|                   0.32545453|
|   10|   470|                   0.32798573|
|   12|    22|                    0.8290993|
|   12|   

In [22]:
# Function to compute MinHash signatures
def minhash_signature(shingles, num_perm=128):
    np.random.seed(42)
    signature = np.inf * np.ones(num_perm, dtype=np.uint32)
    for shingle in shingles:
        hash_vals = np.array([hash(f"{shingle}_{i}") for i in range(num_perm)])
        signature = np.minimum(signature, hash_vals)
    return signature.tolist()

# Define a UDF for MinHash signature generation
minhash_signature_udf = udf(minhash_signature, ArrayType(StringType()))

# Apply MinHash signature UDF to your DataFrame
signature_df = shingled_df.withColumn("minhash_signature", minhash_signature_udf(col("shingles")))


# Cross join to create combinations
cross_joined_signature_df = signature_df.crossJoin(signature_df.withColumnRenamed("Index", "Index2")
                                                    .withColumnRenamed("minhash_signature", "minhash_signature2"))

# Calculate Jaccard similarity on signature matrix
jaccard_signature_df = cross_joined_signature_df.withColumn("jaccard_similarity_signature",
                                                            jaccard_udf(col("minhash_signature"),
                                                                        col("minhash_signature2"))) \
                                                                        .select("Index", "Index2","jaccard_similarity_signature")

# Filter out rows where Index is not equal to Index2
jaccard_signature_df = jaccard_signature_df.filter(col("Index") != col("Index2"))
jaccard_signature_df = jaccard_signature_df.filter(col("jaccard_similarity_signature") > 0.20)


In [23]:
jaccard_signature_df.show()

+-----+------+----------------------------+
|Index|Index2|jaccard_similarity_signature|
+-----+------+----------------------------+
|    2|    32|                    0.924812|
|    7|     8|                   0.6516129|
|    7|    15|                  0.68421054|
|    8|     7|                   0.6516129|
|    8|    15|                   0.7777778|
|   10|   102|                  0.21904762|
|   10|   167|                  0.21327014|
|   10|   400|                  0.27363184|
|   10|   425|                  0.23671497|
|   10|   450|                  0.20754717|
|   10|   459|                  0.22488038|
|   10|   464|                  0.23671497|
|   10|   470|                  0.24271844|
|   12|    22|                  0.69536424|
|   12|    36|                  0.52380955|
|   13|    29|                   0.5802469|
|   13|    35|                   0.9541985|
|   13|    98|                   0.5802469|
|   13|    99|                   0.9541985|
|   15|     7|                  

In [41]:
# for verifying the result
print(filtered_df.select('filtered_text').limit(10).collect()[1][0])

position summarythe registered nurse assesses patients, coordinates plan care, implements orders, evaluates nursing care provided oncology patients physician practice. nurse leads nursing team members care directed attending physician/app collaboration health care providers accordance philosophy policies promedica cancer institute, promedica physicians group, promedica outpatient medical oncology infusion centers. actively participates creation environment fosters patient, family, physician, employee satisfaction. ensures decisions made based patient family centered care philosophy utilizing evidenced based practices, focused safety, customer satisfaction, quality outcomes.scope service: registered nurse assesses provides nursing care individuals groups require specialized knowledge, judgment, skill derived principles biological, physical, behavioral, social, spiritual/cultural, nursing sciences. registered nurse functions within full scope nursing practice noted board nursing designat

In [42]:
print(filtered_df.select('filtered_text').collect()[31][0])

position summarythe registered nurse assesses patients, coordinates plan care, implements orders, evaluates nursing care provided oncology patients physician practice. nurse leads nursing team members care directed attending physician/app collaboration health care providers accordance philosophy policies promedica cancer institute, promedica physicians group, promedica outpatient medical oncology infusion centers. actively participates creation environment fosters patient, family, physician, employee satisfaction. ensures decisions made based patient family centered care philosophy utilizing evidenced based practices, focused safety, customer satisfaction, quality outcomes.scope service: registered nurse assesses provides nursing care individuals groups require specialized knowledge, judgment, skill derived principles biological, physical, behavioral, social, spiritual/cultural, nursing sciences. registered nurse functions within full scope nursing practice noted board nursing designat

After checking the results with the actual data, we can say our algorithm is working well. The 2nd job description is about nursing and hospitals & 32nd is also for same as we can see above. This shows that our algorithm is doing a good job of finding similar job descriptions.
