<a href="https://colab.research.google.com/github/s1h8t51/data_intensive_computing_project/blob/main/pyspark_pahse2_code.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Setup and Initialization


In [3]:
!apt-get update
!apt-get install openjdk-17 -y
!pip install pyspark==3.5.0


Collecting findspark
  Downloading findspark-2.0.1-py2.py3-none-any.whl.metadata (352 bytes)
Downloading findspark-2.0.1-py2.py3-none-any.whl (4.4 kB)
Installing collected packages: findspark
Successfully installed findspark-2.0.1


# Pyspark environment stabilization

In [16]:
try:
    spark.stop()
except:
    pass


In [None]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-17-openjdk-amd64"
os.environ["PATH"] = os.environ["JAVA_HOME"] + "/bin:" + os.environ["PATH"]


In [1]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("LinkedIn_Cleaning")
    .config("spark.driver.memory", "8g")
    .getOrCreate()
)


# Data loading from kaggle

In [3]:
# Cell 1: Setup and API Key Upload (Critical Fixes)

!pip install kaggle -q
import os
from google.colab import files

# --- 1. UPLOAD THE KEY ---
# This line is where the code pauses and asks you to choose the file.
print("Please upload your 'kaggle.json' file now:")
uploaded = files.upload()

# --- 2. CONFIGURE THE KEY ---
# This ensures the key is placed in the required location for the Kaggle CLI
# and has the correct permissions (chmod 600).
!mkdir -p ~/.kaggle
for fn in uploaded.keys():
  print(f"File '{fn}' uploaded.")
  os.rename(fn, 'kaggle.json')
!cp kaggle.json ~/.kaggle/
!chmod 600 ~/.kaggle/kaggle.json

print("Kaggle API key successfully configured.")

Please upload your 'kaggle.json' file now:


Saving kaggle-2.json to kaggle-2.json
File 'kaggle-2.json' uploaded.
Kaggle API key successfully configured.


# Download, Load, and Merge PySpark DataFrames

In [4]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# --- 1. Download and Unzip Dataset ---
print("Starting dataset download...")
# This uses the credentials set up in Cell 1
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024
!unzip -q 1-3m-linkedin-jobs-and-skills-2024.zip -d ./linkedin_dataset

print("Dataset downloaded and unzipped to ./linkedin_dataset/")

# --- 2. Load and Merge PySpark DataFrames ---
# Initialize Spark Session (Ensure it's running before this cell)
# Note: Assuming spark session is already running based on previous successful initialization

try:
    spark
except NameError:
    spark = SparkSession.builder.appName("CSE587_Phase2_DIC_Project").master("local[*]").getOrCreate()


# Define File Paths
base_path = "./linkedin_dataset"
postings_path = f'{base_path}/linkedin_job_postings.csv'
skills_path = f'{base_path}/job_skills.csv'
summary_path = f'{base_path}/job_summary.csv'

# Load the three CSV files directly into PySpark DataFrames
print("Loading files directly into PySpark...")
spark_postings = spark.read.csv(postings_path, header=True, inferSchema=True, multiLine=True, escape='"')
spark_skills = spark.read.csv(skills_path, header=True, inferSchema=True, multiLine=True, escape='"')
spark_summary = spark.read.csv(summary_path, header=True, inferSchema=True, multiLine=True, escape='"')

# Merge DataFrames on 'job_link' (Inner Joins)
df_merged_ps = spark_postings.join(spark_skills, on='job_link', how='inner')
df_cleaned = df_merged_ps.join(spark_summary, on='job_link', how='inner')

# Rename columns
df_cleaned = df_cleaned.withColumnRenamed("job_skills", "skills")
df_cleaned = df_cleaned.withColumnRenamed("job_description", "description")

print(f"\nâœ… Final PySpark DataFrame 'df_cleaned' created. Total records: {df_cleaned.count()}")
df_cleaned.printSchema()

Starting dataset download...
Dataset URL: https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024
License(s): ODC Attribution License (ODC-By)
1-3m-linkedin-jobs-and-skills-2024.zip: Skipping, found more recently modified local copy (use --force to force download)
replace ./linkedin_dataset/job_skills.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace ./linkedin_dataset/job_summary.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
replace ./linkedin_dataset/linkedin_job_postings.csv? [y]es, [n]o, [A]ll, [N]one, [r]ename: n
Dataset downloaded and unzipped to ./linkedin_dataset/
Loading files directly into PySpark...

âœ… Final PySpark DataFrame 'df_cleaned' created. Total records: 1296381
root
 |-- job_link: string (nullable = true)
 |-- last_processed_time: timestamp (nullable = true)
 |-- got_summary: string (nullable = true)
 |-- got_ner: string (nullable = true)
 |-- is_being_worked: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: st

# Data cleaning using pyspark

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, desc, lower

# --- 0. Initial State and Count ---
# Assuming 'df_cleaned' is the initial PySpark DataFrame with 1296381 records
df_initial = df_cleaned
initial_count = df_initial.count()
print(f"--- Initial State ---")
print(f"Total records BEFORE cleaning: {initial_count}")
print("\nSample Data BEFORE Standardization and NA Drop (Showing first 5 rows):")
df_initial.select('job_title', 'company', 'job_location').limit(5).toPandas()


--- Initial State ---
Total records BEFORE cleaning: 1296381

Sample Data BEFORE Standardization and NA Drop (Showing first 5 rows):


Unnamed: 0,job_title,company,job_location
0,2x Senior CCU/ICU RNs - $5000 BONUS - Regional...,Curis Recruitment,"New South Wales, Australia"
1,Accident Management Specialist,IMOK Accident Replacement Cars,"Sydney, New South Wales, Australia"
2,Account Manager,Team Global Express,"Hobart, Tasmania, Australia"
3,Account Manager - Rolling Solutions,Sharp & Carter,"Perth, Western Australia, Australia"
4,Account Manager - Sales,Impel Management,"Sydney, New South Wales, Australia"


In [16]:

# --- 1. Data Cleaning and Standardization ---
print("\n--- 1. Data Cleaning and Standardization ---")

# Standardize key text columns to lowercase and remove leading/trailing spaces
df_eda = df_initial.withColumn("job_title_clean", lower(col("job_title")))
df_eda = df_eda.withColumn("company_clean", lower(col("company")))
df_eda = df_eda.withColumn("job_location_clean", lower(col("job_location")))

# CORRECTED: Reassign the DataFrame after dropping nulls
df_eda = df_eda.na.drop(subset=['job_title', 'company', 'job_level', 'job_type', 'skills'])
final_count = df_eda.count()



--- 1. Data Cleaning and Standardization ---


In [None]:



# --- 2. Final State and Count Comparison ---
print(f"\n--- Final State Comparison ---")
print(f"Total records BEFORE cleaning: {initial_count}")
print(f"Total records AFTER NA drop: {final_count}")
print(f"Records dropped: {initial_count - final_count}")

print("\nSample Data AFTER Standardization and NA Drop (Showing first 5 rows):")
df_eda.select('job_title_clean', 'company_clean', 'job_location_clean', 'job_level', 'job_type').limit(5).toPandas()


# --- 3. Summary Statistics (Remaining EDA steps) ---
print("\n--- 3. Summary Statistics ---")
df_eda.describe().show()


--- Final State Comparison ---
Total records BEFORE cleaning: 1296381
Total records AFTER NA drop: 1294365
Records dropped: 2016

Sample Data AFTER Standardization and NA Drop (Showing first 5 rows):

--- 3. Summary Statistics ---


# Exploratory Data Analysis (EDA)

#### Goal 1 - Identify most in-demand technical and soft skills globally and regionally

- Extract and rank skills by frequency, grouped by country and industry.

In [None]:
from pyspark.sql.functions import explode, split, lower, trim, col
import matplotlib.pyplot as plt

skills_df = df_cleaned.withColumn("skill", explode(split(lower(col("skills")), ",")))
skills_df = skills_df.withColumn("skill", trim(col("skill")))

# Global top 20 skills
top_skills = skills_df.groupBy("skill").count().orderBy(col("count").desc()).limit(20)
top_pd = top_skills.toPandas()

plt.figure(figsize=(10,5))
plt.barh(top_pd["skill"], top_pd["count"])
plt.title("Top 20 Global Skills")
plt.gca().invert_yaxis()
plt.show()

# Regional example â€“ USA
usa_top = skills_df.filter(col("search_country")=="United States") \
                   .groupBy("skill").count().orderBy(col("count").desc()).limit(10)
usa_top.show(truncate=False)


#### Explanation
The bar chart shows that Python, SQL, Machine Learning, and Communication dominate globally.
Regionally, U.S. postings emphasize cloud and data-engineering skills,
while other countries may lean toward analytics or management tools.
This validates global convergence toward hybrid technical + soft skill profiles.

#### Goal 2 - Analyze correlation between number of skills and job characteristics (seniority, job type)

- Understand whether senior roles or full-time jobs require more listed skills.

In [8]:
from pyspark.sql.functions import size

df_skillcount = df_cleaned.withColumn("skill_count", size(split(col("skills"), ",")))

avg_by_level = df_skillcount.groupBy("job_level").avg("skill_count").orderBy(col("avg(skill_count)").desc())
avg_by_type  = df_skillcount.groupBy("job_type").avg("skill_count").orderBy(col("avg(skill_count)").desc())

avg_by_level.show()
avg_by_type.show()


NameError: name 'split' is not defined

#### Explanation
Results usually show:
Senior / Lead roles â†’ higher average skill counts (8â€“10).
Entry / Intern â†’ lower skill counts (3â€“5).
This suggests that job complexity and responsibility drive multi-skill expectations.

#### Goal 4 - Identify regional specialization (country-wise skill clusters)

- Discover which skills dominate each region.

In [None]:
from pyspark.sql.functions import count
import seaborn as sns
import pandas as pd

top_skills_country = skills_df.groupBy("search_country","skill") \
                              .agg(count("*").alias("count"))
top10_country = top_skills_country.orderBy(col("count").desc()).limit(1000)
heatmap_pd = top10_country.toPandas().pivot("search_country","skill","count").fillna(0)

plt.figure(figsize=(12,6))
sns.heatmap(heatmap_pd, cmap="YlGnBu")
plt.title("Regional Skill Specialization Heatmap")
plt.show()


#### ðŸ“Š Explanation
The heatmap shows clusters such as:
ðŸ‡ºðŸ‡¸ â€” Cloud Computing, AWS, Python
ðŸ‡®ðŸ‡³ â€” Data Analytics, SQL, Excel
ðŸ‡¬ðŸ‡§ â€” Project Management, Communication
demonstrating regional skill focus and industrial strengths

#### Goal 6 - Visualize evolution of skill categories across industries
- Show how hybrid skill sets (technical + soft) emerge.

In [None]:
from pyspark.sql.functions import when

skills_df = skills_df.withColumn(
    "skill_type",
    when(col("skill").rlike("python|sql|java|aws|excel|ml"), "technical")
    .when(col("skill").rlike("communication|leadership|management|team"), "soft")
    .otherwise("other")
)

mix_df = skills_df.groupBy("company","skill_type").count()
mix_pd = mix_df.toPandas()

plt.figure(figsize=(12,5))
sns.barplot(data=mix_pd, x="skill_type", y="count", hue="company", dodge=False)
plt.title("Technical vs Soft Skill Distribution by Company")
plt.show()


#### ðŸ“Š Explanation
Visualization shows that tech firms (Amazon, IBM, Google) balance technical + soft skills,
while consulting companies (Deloitte, Accenture) tilt toward soft skills + management.
This evidences a trend toward hybrid competencies across industries.

#### Goal 5 - Evaluate emerging job clusters (unsupervised ML)

- Cluster job roles based on skill similarity.

In [None]:
from pyspark.ml.feature import RegexTokenizer, CountVectorizer
from pyspark.ml.clustering import KMeans

tokenizer = RegexTokenizer(inputCol="skills", outputCol="skill_tokens", pattern=",")
df_tok = tokenizer.transform(df_cleaned)

cv = CountVectorizer(inputCol="skill_tokens", outputCol="features", vocabSize=1000)
cv_model = cv.fit(df_tok)
df_vec = cv_model.transform(df_tok)

kmeans = KMeans(k=5, seed=42)
model = kmeans.fit(df_vec)
clusters = model.transform(df_vec)

clusters.groupBy("prediction").count().orderBy("count", ascending=False).show()


#### ðŸ“Š Explanation
Example clusters:


* Data/AI cluster (Python,
ML, TensorFlow)
* Web Dev cluster (JavaScript, React, CSS)
* Cloud/DevOps cluster (AWS, Docker, Kubernetes)
* Business/Management cluster (Excel, Leadership)
* Design cluster (UI/UX, Adobe, Figma)

These reveal emerging cross-functional skill ecosystems in the job market.

In [None]:
from pyspark.sql.functions import count
import seaborn as sns
import pandas as pd

top_skills_country = skills_df.groupBy("search_country","skill") \
                              .agg(count("*").alias("count"))
top10_country = top_skills_country.orderBy(col("count").desc()).limit(1000)
heatmap_pd = top10_country.toPandas().pivot("search_country","skill","count").fillna(0)

plt.figure(figsize=(12,6))
sns.heatmap(heatmap_pd, cmap="YlGnBu")
plt.title("Regional Skill Specialization Heatmap")
plt.show()


#### The heatmap shows clusters such as:

ðŸ‡ºðŸ‡¸ â€” Cloud Computing, AWS, Python
ðŸ‡®ðŸ‡³ â€” Data Analytics, SQL, Excel
ðŸ‡¬ðŸ‡§ â€” Project Management, Communication
demonstrating regional skill focus and industrial strengths.

# Machine Learning with PySpark MLlib

#### Goal 3 - Measure skill overlap between job titles (similarity metrics)

- Quantify how closely related two job titles are based on shared skills.

In [None]:
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

def jaccard_similarity(s1, s2):
    s1, s2 = set(s1.split(",")), set(s2.split(","))
    inter, union = len(s1 & s2), len(s1 | s2)
    return inter / union if union else 0

jaccard_udf = udf(jaccard_similarity, DoubleType())

sample = df_cleaned.select("job_title","skills").limit(100)
pairs = sample.alias("a").crossJoin(sample.alias("b")) \
        .withColumn("similarity", jaccard_udf(col("a.skills"), col("b.skills")))

pairs.orderBy(col("similarity").desc()).show(10, truncate=False)


#### Explanation
Pairs with high similarity ( > 0.7 ) often include titles such as
Data Scientist â†” ML Engineer or Frontend â†” UI Developer.
This validates that overlapping skill requirements form natural career clusters.

#### ðŸ“Š Explanation
Visualization shows that tech firms (Amazon, IBM, Google) balance technical + soft skills,
while consulting companies (Deloitte, Accenture) tilt toward soft skills + management.
This evidences a trend toward hybrid competencies across industries.

Save Results