In [None]:
!pip install pyspark




In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("JobRecommender") \
    .getOrCreate()

spark


In [None]:
from google.colab import files
uploaded = files.upload()



Saving naukri_com-job_sample.csv to naukri_com-job_sample.csv


In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("JobRecommenderSafe") \
    .getOrCreate()

spark


In [None]:
df = spark.read.csv("naukri_com-job_sample.csv", header=True, inferSchema=True)


In [None]:
df.limit(5).toPandas()
print("Approx rows:", df.count())


Approx rows: 22000


In [None]:
df_small = df.limit(40000)   # safe cap
print("Using rows:", df_small.count())


Using rows: 22000


In [None]:
from pyspark.sql.functions import lower, regexp_replace, col

data = df_small.select(
    col("jobtitle").alias("title"),
    col("skills").alias("skills"),
    col("jobdescription").alias("description"),
    col("industry").alias("industry")
).na.drop(subset=["skills", "description"])

cleaned = data.withColumn("clean_desc", lower(regexp_replace("description", "[^a-zA-Z ]", ""))) \
              .withColumn("clean_skills", lower(regexp_replace("skills", "[^a-zA-Z ]", "")))

# IMPORTANT FIX: Combine description + skills
from pyspark.sql.functions import concat_ws

cleaned = cleaned.withColumn(
    "full_text",
    concat_ws(" ", col("clean_desc"), col("clean_skills"), col("clean_skills"))
)


cleaned.show(5)


+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|               title|              skills|         description|            industry|          clean_desc|        clean_skills|           full_text|
+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|Walkin Data Entry...|                ITES|Job Description  ...|Media / Entertain...|job description  ...|                ites|job description  ...|
|Work Based Onhome...|           Marketing|Job Description  ...|Advertising / PR ...|job description  ...|           marketing|job description  ...|
|Pl/sql Developer ...|IT Software - App...|Job Description  ...|IT-Software / Sof...|job description  ...|it software  appl...|job description  ...|
|Manager/ad/partne...|            Accounts|Job Description  ...|Banking / Financi...|job description  ...|

In [None]:
from pyspark.ml.feature import Tokenizer, StopWordsRemover

# IMPORTANT CHANGE: use "full_text" instead of "clean_desc"
tokenizer = Tokenizer(inputCol="full_text", outputCol="desc_words")
tokenized = tokenizer.transform(cleaned)

remover = StopWordsRemover(inputCol="desc_words", outputCol="filtered_desc")
processed = remover.transform(tokenized)

processed.select("title", "filtered_desc").show(5, truncate=False)



+----------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [None]:
from pyspark.ml.feature import HashingTF, IDF
from pyspark.sql.functions import when, length, trim, split, slice, concat_ws

# TF stage
tf = HashingTF(inputCol="filtered_desc", outputCol="raw_features", numFeatures=20000)
tf_data = tf.transform(processed)

# IDF stage
idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(tf_data)
tfidf_df = idf_model.transform(tf_data)

# -------- CLEANING STARTS HERE --------

# 1. Clean job titles (remove empty/short/garbage titles)
cleaned_tfidf = tfidf_df.withColumn(
    "clean_title",
    when(length(trim(col("title"))) < 5, "Unknown Job Title")
    .otherwise(trim(col("title")))
)

# 2. Clean skills (keep first 10 words only)
cleaned_tfidf = cleaned_tfidf.withColumn(
    "clean_skills",
    concat_ws(" ", slice(split(col("skills"), " "), 1, 10))
)

# 3. Remove numeric-only titles
cleaned_tfidf = cleaned_tfidf.filter(
    ~col("clean_title").rlike("^[0-9]+$")
)

# Cache cleaned TF-IDF dataset
cleaned_tfidf = cleaned_tfidf.cache()
print("Caching cleaned TF-IDF data…")
cleaned_tfidf.count()


Caching cleaned TF-IDF data…


21279

In [None]:
from pyspark.sql.functions import regexp_replace, when

# keywords that usually indicate a job description, not a title
bad_words = ["skills", "experience", "years", "responsibilities", "requirements", "knowledge"]

# remove newline/extra spaces
cleaned_tfidf = cleaned_tfidf.withColumn(
    "clean_title", regexp_replace("clean_title", "\s+", " ")
)

# detect titles that look like descriptions → replace with "Unknown Job Title"
cleaned_tfidf = cleaned_tfidf.withColumn(
    "clean_title_final",
    when(
        (cleaned_tfidf.clean_title.rlike("|".join(bad_words))) |
        (length(col("clean_title")) > 40),          # too long → description
        "Unknown Job Title"
    ).otherwise(cleaned_tfidf.clean_title)
)


  "clean_title", regexp_replace("clean_title", "\s+", " ")


In [None]:
from pyspark.ml.feature import BucketedRandomProjectionLSH

brp = BucketedRandomProjectionLSH(
    inputCol="features",
    outputCol="hashes",
    bucketLength=2.0,
    numHashTables=3
)

brp_model = brp.fit(cleaned_tfidf)


In [None]:
user_skills = [("1", "python sql machine learning data analysis big data")]
user_df = spark.createDataFrame(user_skills, ["user_id", "user_skills"])

user_df = user_df.withColumn("clean_user", lower(regexp_replace("user_skills", "[^a-zA-Z ]", "")))

tokenizer_user = Tokenizer(inputCol="clean_user", outputCol="user_words")
user_tokens = tokenizer_user.transform(user_df)

remover_user = StopWordsRemover(inputCol="user_words", outputCol="filtered_user")
user_clean = remover_user.transform(user_tokens)

tf_user = HashingTF(inputCol="filtered_user", outputCol="raw_features", numFeatures=20000)
user_tf = tf_user.transform(user_clean)

user_tfidf = idf_model.transform(user_tf)
user_vec = user_tfidf.select("features").first()["features"]


In [None]:
filtered_jobs = cleaned_tfidf.filter(
    (col("skills").contains("python")) |
    (col("skills").contains("sql")) |
    (col("skills").contains("machine")) |
    (col("skills").contains("data"))
)

print("Filtered job count:", filtered_jobs.count())


Filtered job count: 15


In [None]:
k = 100
neighbors = brp_model.approxNearestNeighbors(
    dataset=filtered_jobs,
    key=user_vec,
    numNearestNeighbors=k
)

neighbors.select("clean_title", "industry", "clean_skills", "distCol") \
         .orderBy("distCol") \
         .show(10, truncate=False)


+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------+---------------------------------------------------------------------------------------------+------------------+
|clean_title                                                                                                                                                                                                                                                                                                                                         

In [None]:
# Normalize similarity
maxd = neighbors.agg({"distCol": "max"}).collect()[0][0]
if maxd == 0:
    maxd = 1.0

neighbors = neighbors.withColumn(
    "similarity",
    1.0 - (col("distCol") / float(maxd))
)


In [None]:
top_jobs = neighbors.select(
    "clean_title_final",
    "industry",
    "clean_skills",
    "similarity"
).orderBy(col("similarity").desc()).limit(20)

top_jobs.show(truncate=False)


+-----------------+-------------------------------+---------------------------------------------------------------------------------------------+------------------+
|clean_title_final|industry                       |clean_skills                                                                                 |similarity        |
+-----------------+-------------------------------+---------------------------------------------------------------------------------------------+------------------+
|Unknown Job Title|IT-Software / Software Services| with strong competencies in data structures                                                 |0.8368703556558038|
|Government       |IT-Software / Software Services| Maintenance Role Category:Programming & Design Role:Software Developer Keyskills development|0.8037379231476828|
|Unknown Job Title|IT-Software / Software Services| Mobile Device Development. Experience with version control systems. Good                    |0.7642326764887811|
|Unknown J

In [None]:
from pyspark.sql.functions import when, col, lower

final_output = top_jobs.withColumn(
    "final_title",

    # Machine Learning
    when(lower(col("clean_skills")).like("%machine%") & lower(col("clean_skills")).like("%learning%"),
         "Machine Learning Engineer")

    # Data Analysis / Analytics
    .when(lower(col("clean_skills")).like("%analysis%") | lower(col("clean_skills")).like("%analyst%"),
         "Data Analyst")

    # Python + SQL + Data
    .when(lower(col("clean_skills")).like("%python%") & lower(col("clean_skills")).like("%sql%"),
         "Data Engineer")

    # Big Data stack
    .when(lower(col("clean_skills")).like("%big%") & lower(col("clean_skills")).like("%data%"),
         "Big Data Engineer")

    # Software roles
    .when(lower(col("clean_skills")).like("%developer%") | lower(col("clean_skills")).like("%programming%"),
         "Software Developer")

    # Shell / scripts / ETL
    .when(lower(col("clean_skills")).like("%scripts%") | lower(col("clean_skills")).like("%script%"),
         "ETL Developer")

    # Mobile development
    .when(lower(col("clean_skills")).like("%mobile%") | lower(col("clean_skills")).like("%android%"),
         "Mobile Application Developer")

    # If nothing matches — fallback
    .otherwise("Technical Specialist")
)


In [None]:
final_output.select(
    "final_title",
    "industry",
    "clean_skills",
    "similarity"
).show(truncate=False)


+----------------------------+-------------------------------+---------------------------------------------------------------------------------------------+------------------+
|final_title                 |industry                       |clean_skills                                                                                 |similarity        |
+----------------------------+-------------------------------+---------------------------------------------------------------------------------------------+------------------+
|Technical Specialist        |IT-Software / Software Services| with strong competencies in data structures                                                 |0.8368703556558038|
|Software Developer          |IT-Software / Software Services| Maintenance Role Category:Programming & Design Role:Software Developer Keyskills development|0.8037379231476828|
|Mobile Application Developer|IT-Software / Software Services| Mobile Device Development. Experience with version contro