In [None]:
!pip install kaggle pyspark
!pip install findspark
!pip install pandas==1.5.3
import warnings
warnings.filterwarnings("ignore")


In [None]:
import os
os.environ['KAGGLE_USERNAME'] = 'xxxxxxxxxxxxxxxx'  
os.environ['KAGGLE_KEY'] = 'xxxxxxxxxxxxxxxxxxxxxx' 

In [None]:
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

In [None]:
import zipfile
with zipfile.ZipFile('1-3m-linkedin-jobs-and-skills-2024.zip', 'r') as zip_ref:
    zip_ref.extractall('dataset')

In [None]:
file_path = 'dataset/job_summary.csv'

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
import re
import hashlib
import numpy as np
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, MinHashLSH
from pyspark.ml.linalg import Vectors
from pyspark.ml.pipeline import Pipeline
from pyspark.sql.functions import col
from pyspark.sql.types import IntegerType
from pyspark.sql.functions import monotonically_increasing_id


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
        .master("local") \
        .appName("JobDescriptionSimilarity") \
        .getOrCreate()

In [None]:
spark.sparkContext

In [None]:
from pyspark.sql.types import StructType, StructField, StringType

schema = StructType([
    StructField("job_link", StringType(), True),
    StructField("job_summary", StringType(), True)
])

df = spark.read \
    .option("header", "true") \
    .option("multiLine", "true") \
    .option("escape", "\"") \
    .option("quote", "\"") \
    .schema(schema) \
    .csv(file_path)


In [None]:
df.show(5, truncate=False)

In [None]:
df.printSchema()

In [None]:
df.count()

In [None]:
df.describe()

In [None]:
df.describe().show()

In [None]:
!pip install datasketch

In [None]:
import pandas as pd
from datasketch import MinHash, MinHashLSH


def preprocess_text(text):
    tokens = text.lower().split()
    tokens = [token.strip(",.!?") for token in tokens]
    return tokens

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, StringType
from pyspark.ml.feature import Tokenizer


preprocess_text_udf = udf(preprocess_text, ArrayType(StringType()))


df = df.withColumn("preprocessed_text", preprocess_text_udf("job_summary"))


In [None]:

df.show()

In [None]:

def generate_shingles(text, k=3):
    tokens = preprocess_text(text)
    shingles = set()
    for i in range(len(tokens) - k + 1):
        shingle = " ".join(tokens[i:i+k])
        shingles.add(shingle)
    return shingles

In [None]:

generate_shingles_udf = udf(lambda text: list(generate_shingles(" ".join(text))), ArrayType(StringType()))


df = df.withColumn("shingles", generate_shingles_udf("preprocessed_text"))

In [None]:

df.show()

In [None]:

def generate_minhash_signature(shingles, num_perm=128 , pd=1):
    m = MinHash(num_perm=num_perm)
    for shingle in shingles:
        m.update(shingle.encode('utf8'))
    if pd == 1:
      return m
    else:
      return m.hashvalues.tolist()




In [None]:

generate_minhash_signature_udf = udf(lambda shingles: generate_minhash_signature(shingles,pd=0), ArrayType(IntegerType()))

df = df.withColumn("minhash_signature", generate_minhash_signature_udf("shingles"))



In [None]:

df.show()

In [None]:

def create_lsh_index(data, num_perm=128, threshold=0.5):
    lsh = MinHashLSH(threshold=threshold, num_perm=num_perm)
    minhashes = {}
    for index, row in data.iterrows():
        shingles = generate_shingles(row['job_summary'])
        minhash = generate_minhash_signature(shingles, num_perm)
        minhashes[index] = minhash
        lsh.insert(index, minhash)
    return lsh, minhashes

In [None]:
# Function to Find similar job descriptions using LSH
def find_similar_jobs(data, lsh, minhashes, num_perm=128):
    similar_pairs = []
    for index, row in data.iterrows():
        shingles = generate_shingles(row['job_summary'])
        minhash = generate_minhash_signature(shingles, num_perm)
        candidates = lsh.query(minhash)
        for candidate in candidates:
            if candidate != index:
                jaccard = minhashes[index].jaccard(minhashes[candidate])
                if jaccard > 0.5:
                    similar_pairs.append((index, candidate))
    return similar_pairs



In [None]:

chunk_size = 10000
lsh = None
minhashes = {}
similar_pairs = []

df_similar_pairs = pd.DataFrame(columns=['RowNumber1', 'JobSummary1','RowNumber2',  'JobSummary2'])

for chunk in pd.read_csv(file_path, chunksize=chunk_size):
    if lsh is None:
        lsh, minhashes = create_lsh_index(chunk)
    else:
        for index, row in chunk.iterrows():
            shingles = generate_shingles(row['job_summary'])
            minhash = generate_minhash_signature(shingles)
            minhashes[index] = minhash
            lsh.insert(index, minhash)
    for pair in find_similar_jobs(chunk, lsh, minhashes):
        idx1, idx2 = pair
        # Checking if the pair is already processed
        if pair not in similar_pairs:
            job_summary1 = chunk.iloc[idx1]['job_summary']
            job_summary2 = chunk.iloc[idx2]['job_summary']
            similar_pairs.append(pair)
            df_similar_pairs = df_similar_pairs.append({
                'RowNumber1': idx1+1,
                'JobSummary1': job_summary1,
                'RowNumber2': idx2+1,
                'JobSummary2': job_summary2
            }, ignore_index=True)
    break

print(df_similar_pairs)



In [None]:
df_similar_pairs.head()