# This project processes and analyzes a large dataset of job postings to extract insights about job market
## 1. Data Loading : Loaded three primary datasets:
*   LinkedIn job postings
*   Job skills data
*   Job summary information



##2. Data Cleaning


*   Applied text cleaning to remove emojis, special characters, and non-printable characters
*   Filled missing values with appropriate defaults for critical columns
*   Standardized location information
*   Processed date fields into proper format



##3. Feature Engineering

*  Location Processing: Extracted country and city information from job locations
*   Skill Indicators: Created binary columns for top 30 skills
*   Job Level Extraction: Identified entry, mid, and senior-level positions based on title analysis


##4. Dataset Merging

Merged the three datasets using **job_link** as the key
Successfully integrated information from all sources into a unified dataset

##5. Saving Dataset

In [None]:
# Install PySpark
!pip install pyspark

Defaulting to user installation because normal site-packages is not writeable


In [None]:
# Import necessary libraries
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import os
import re

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("JobMarketAnalysis") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# File paths
job_summary_path = "/scratch/stp8232/Big Data/job_summary.csv"
job_skills_path = "/scratch/stp8232/Big Data/job_skills.csv"
linkedin_path = "/scratch/stp8232/Big Data/linkedin_job_postings.csv"

# Load datasets
print("Loading datasets...")
job_summary_df = spark.read.csv(job_summary_path, header=True, inferSchema=True)
job_skills_df = spark.read.csv(job_skills_path, header=True, inferSchema=True)
linkedin_df = spark.read.csv(linkedin_path, header=True, inferSchema=True)

# Define UDF for text cleaning
def clean_text(text):
    if text is None:
        return None
    # Remove emojis and special characters
    emoji_pattern = re.compile("["
                               u"\U0001F600-\U0001F64F"  # emoticons
                               u"\U0001F300-\U0001F5FF"  # symbols & pictographs
                               u"\U0001F680-\U0001F6FF"  # transport & map symbols
                               u"\U0001F700-\U0001F77F"  # alchemical symbols
                               u"\U0001F780-\U0001F7FF"  # Geometric Shapes
                               u"\U0001F800-\U0001F8FF"  # Supplemental Arrows-C
                               u"\U0001F900-\U0001F9FF"  # Supplemental Symbols and Pictographs
                               u"\U0001FA00-\U0001FA6F"  # Chess Symbols
                               u"\U0001FA70-\U0001FAFF"  # Symbols and Pictographs Extended-A
                               u"\U00002702-\U000027B0"  # Dingbats
                               u"\U000024C2-\U0001F251"
                               "]+", flags=re.UNICODE)
    text = emoji_pattern.sub(r'', text)
    # Remove non-printable characters
    text = re.sub(r'[^\x20-\x7E]', '', text)
    # Replace multiple spaces with single space
    text = re.sub(r'\s+', ' ', text)
    return text.strip()

# Register UDF
clean_text_udf = F.udf(clean_text, StringType())

# Handle null values and clean data
print("Cleaning data...")

# Basic column cleaning function
def clean_column(df, column_name):
    if column_name in df.columns:
        df = df.withColumn(column_name, clean_text_udf(F.col(column_name)))
    return df

# Clean LinkedIn dataset
columns_to_clean = ["job_title", "company", "job_location", "job_description",
                    "salary_from", "salary_to", "salary_currency"]

linkedin_clean = linkedin_df
for column in columns_to_clean:
    linkedin_clean = clean_column(linkedin_clean, column)

# Fill missing values with appropriate defaults
linkedin_clean = linkedin_clean.na.fill({
    "job_title": "Unknown Title",
    "company": "Unknown Company",
    "job_location": "Unknown Location"
})

# Clean numeric fields
if "salary_from" in linkedin_clean.columns:
    linkedin_clean = linkedin_clean.withColumn(
        "salary_from",
        F.when(F.col("salary_from").isNull() |
              ~F.col("salary_from").cast("string").rlike("^[0-9]+\\.?[0-9]*$"),
              None)
        .otherwise(F.col("salary_from").cast("double"))
    )

if "salary_to" in linkedin_clean.columns:
    linkedin_clean = linkedin_clean.withColumn(
        "salary_to",
        F.when(F.col("salary_to").isNull() |
              ~F.col("salary_to").cast("string").rlike("^[0-9]+\\.?[0-9]*$"),
              None)
        .otherwise(F.col("salary_to").cast("double"))
    )

# Process date column
if "first_seen" in linkedin_clean.columns:
    linkedin_clean = linkedin_clean.withColumn(
        "first_seen_date",
        F.to_date(F.col("first_seen"), "yyyy-MM-dd")
    )

# Clean job_skills dataset
job_skills_clean = job_skills_df
for column in job_skills_clean.columns:
    job_skills_clean = clean_column(job_skills_clean, column)

# Clean summary dataset
job_summary_clean = job_summary_df
for column in job_summary_clean.columns:
    job_summary_clean = clean_column(job_summary_clean, column)

# Perform the merge using job_link as the key
print("Merging datasets...")
# First merge linkedin_data with job_skills
merged_df = linkedin_clean.join(
    job_skills_clean,
    F.trim(linkedin_clean.job_link) == F.trim(job_skills_clean.job_link),
    "left"
).drop(job_skills_clean.job_link)

# Then merge with job_summary
final_df = merged_df.join(
    job_summary_clean,
    F.trim(merged_df.job_link) == F.trim(job_summary_clean.job_link),
    "left"
).drop(job_summary_clean.job_link)

# Check the merged dataset
print(f"Final merged dataset record count: {final_df.count()}")
print("Sample of merged data:")
final_df.select("job_title", "company", "job_location", "job_skills", "job_summary").show(5, truncate=True)

# Extract features for analysis
print("Extracting features for analysis...")

# 1. Location features with better cleaning

final_df = final_df.withColumn(
    "country",
    F.when(F.col("job_location").contains(","),
           F.trim(F.regexp_replace(F.split(F.col("job_location"), ",").getItem(1), "[^a-zA-Z\\s]", "")))
    .otherwise("Unknown")
)

final_df = final_df.withColumn(
    "city",
    F.when(F.col("job_location").contains(","),
           F.trim(F.regexp_replace(F.split(F.col("job_location"), ",").getItem(0), "[^a-zA-Z\\s]", "")))
    .otherwise(F.trim(F.regexp_replace(F.col("job_location"), "[^a-zA-Z\\s,]", "")))
)

# 2. Create skill indicator columns (extract top skills)
# First, extract unique skills from the dataset
if "job_skills" in final_df.columns:
    # Sample a subset of data to identify common skills
    sample_skills = final_df.select("job_skills").na.drop().limit(10000)

    # Collect and process skills
    all_skills_list = []
    for row in sample_skills.collect():
        if row.job_skills:
            # Better skill splitting with proper cleaning
            skills = [s.strip().lower() for s in re.split(r',|\|', row.job_skills) if s.strip()]
            all_skills_list.extend(skills)

    # Count skill frequencies
    from collections import Counter
    skill_counts = Counter(all_skills_list)

    # Get top 30 skills
    top_skills = [skill for skill, count in skill_counts.most_common(30)]

    # Create indicator columns for top skills with FIXED PATTERN MATCHING
    for skill in top_skills:
        # Create a safe column name
        safe_skill = re.sub(r'[^a-zA-Z0-9_]', '_', skill.lower())
        skill_col = f"has_{safe_skill}"

        # Fix: Use a simpler approach to check if a skill is in the string
        # This avoids the regexp_extract pattern issue that was causing the error
        final_df = final_df.withColumn(
            skill_col,
            F.when(
                F.col("job_skills").isNotNull() &
                F.lower(F.col("job_skills")).contains(skill.lower()),
                F.lit(1)
            ).otherwise(F.lit(0))
        )

# 3. Extract job level information with improved pattern matching
seniority_keywords = {
    'entry': ['entry', 'junior', 'associate', 'trainee', 'intern'],
    'mid': ['mid', 'intermediate', 'experienced'],
    'senior': ['senior', 'lead', 'principal', 'manager', 'director', 'head']
}

for level, keywords in seniority_keywords.items():
    condition = F.lit(False)

    for keyword in keywords:
        # Simplified pattern matching to avoid errors
        condition = condition | F.lower(F.col("job_title")).contains(keyword.lower())

    final_df = final_df.withColumn(
        f"is_{level}_level",
        F.when(condition, F.lit(1)).otherwise(F.lit(0))
    )

# Create a job_level column for easier analysis
final_df = final_df.withColumn(
    "job_level",
    F.when(F.col("is_entry_level") == 1, "Entry")
     .when(F.col("is_mid_level") == 1, "Mid")
     .when(F.col("is_senior_level") == 1, "Senior")
     .otherwise("Unknown")
)

# 4. Add salary range cleaning and analysis
if all(col in final_df.columns for col in ["salary_from", "salary_to", "salary_currency"]):
    # Clean salary fields to ensure they're numeric
    final_df = final_df.withColumn(
        "salary_from_clean",
        F.when(F.col("salary_from").isNotNull(), F.col("salary_from").cast("double")).otherwise(None)
    )

    final_df = final_df.withColumn(
        "salary_to_clean",
        F.when(F.col("salary_to").isNotNull(), F.col("salary_to").cast("double")).otherwise(None)
    )

    # Create salary midpoint column
    final_df = final_df.withColumn(
        "salary_midpoint",
        F.when(
            F.col("salary_from_clean").isNotNull() & F.col("salary_to_clean").isNotNull(),
            (F.col("salary_from_clean") + F.col("salary_to_clean")) / 2
        ).otherwise(
            F.when(F.col("salary_from_clean").isNotNull(), F.col("salary_from_clean"))
            .when(F.col("salary_to_clean").isNotNull(), F.col("salary_to_clean"))
            .otherwise(None)
        )
    )

    # Clean currency
    final_df = final_df.withColumn(
        "salary_currency_clean",
        F.when(F.col("salary_currency").isNull(), "Unknown")
        .otherwise(F.upper(F.trim(F.col("salary_currency"))))
    )

# Basic statistics and data overview
print("Generating basic statistics and data overview...")
final_df.printSchema()

# Check for any remaining nulls
print("Checking for remaining null values in key columns:")
null_counts = final_df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in final_df.columns])
null_counts.show()

# Check distribution of job titles, companies, locations
print("Top 10 countries by job count:")
final_df.groupBy("country").count().orderBy(F.desc("count")).show(10)

print("Job level distribution:")
final_df.groupBy("job_level").count().orderBy(F.desc("count")).show()

# Save the merged dataset as CSV (using a single file)
print("Saving merged dataset as CSV...")
# Use coalesce(1) to ensure a single output file
final_df.coalesce(1).write.csv(
    "/scratch/stp8232/Big Data/merged_jobpredict_data_cleaned_csv",
    mode="overwrite",
    header=True
)

# For better readability in future analyses, also save as parquet
print("Saving merged dataset as Parquet...")
final_df.write.parquet(
    "/scratch/stp8232/Big Data/merged_jobpredict_data_cleaned_parquet",
    mode="overwrite"
)

print("Merge and data preparation complete!")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/05/07 00:54:41 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Loading datasets...


                                                                                

Cleaning data...
Merging datasets...


                                                                                

Final merged dataset record count: 1348488
Sample of merged data:


                                                                                

+--------------------+--------------------+--------------------+--------------------+--------------------+
|           job_title|             company|        job_location|          job_skills|         job_summary|
+--------------------+--------------------+--------------------+--------------------+--------------------+
|Technicien(ne) de...|             Sandvik|Montreal, Quebec,...|                NULL|                NULL|
|Contract Manageme...|    Primary Services|         Houston, TX|Contract Administ...|Join a dynamic te...|
|DOC CORRECTIONAL ...|   State of Arkansas|        Brickeys, AR|Correctional Secu...|Position Number: ...|
|Experienced HVAC ...|Lane Valente Indu...|         Orlando, FL|HVAC, Service Tec...|Lane Valente Indu...|
|Kroger Health: No...|       Kroger Health|Columbus, Ohio Me...|Nursing, Patient ...|    Position Summary|
+--------------------+--------------------+--------------------+--------------------+--------------------+
only showing top 5 rows

Extracting f

                                                                                

Generating basic statistics and data overview...
root
 |-- job_link: string (nullable = true)
 |-- last_processed_time: string (nullable = true)
 |-- got_summary: string (nullable = true)
 |-- got_ner: string (nullable = true)
 |-- is_being_worked: string (nullable = true)
 |-- job_title: string (nullable = false)
 |-- company: string (nullable = false)
 |-- job_location: string (nullable = false)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = false)
 |-- job_type: string (nullable = true)
 |-- first_seen_date: date (nullable = true)
 |-- job_skills: string (nullable = true)
 |-- job_summary: string (nullable = true)
 |-- country: string (nullable = true)
 |-- city: string (nullable = true)
 |-- has_communication: integer (nullable = false)
 |-- has_customer_service: integer (nullable = false)
 |-- has_teamwork: integer (n

25/05/07 00:57:22 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

+--------+-------------------+-----------+-------+---------------+---------+-------+------------+----------+-----------+--------------+---------------+---------+--------+---------------+----------+-----------+-------+----+-----------------+--------------------+------------+------------------------+--------------+-------------------+-------------------+-----------------------+----------------------+------------------+---------+------------------------+-----------------+--------------------------+-------------------------+-----------------+---------------------+------------+--------------------+----------------+----------------+----------------+------------------------+---------------+---------------------+--------------+----------------+-------------+-----------------------+---------+--------------+------------+---------------+
|job_link|last_processed_time|got_summary|got_ner|is_being_worked|job_title|company|job_location|first_seen|search_city|search_country|search_position|job_level|

25/05/07 00:59:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:05 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:05 WARN RowBasedKeyValueBatch: Calling spill() on

+-------+------+
|country| count|
+-------+------+
|     CA|117203|
|England|100176|
|     TX| 81914|
|     FL| 69616|
|     NY| 57504|
|     PA| 47042|
|     OH| 45251|
|     IL| 42841|
|     VA| 38098|
|     MA| 36883|
+-------+------+
only showing top 10 rows

Job level distribution:


25/05/07 00:59:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:46 WARN RowBasedKeyValueBatch: Calling spill() on RowBasedKeyValueBatch. Will not spill but return 0.
25/05/07 00:59:46 WARN RowBasedKeyValueBatch: Calling spill() on

+---------+------+
|job_level| count|
+---------+------+
|  Unknown|862186|
|   Senior|416299|
|    Entry| 59986|
|      Mid| 10017|
+---------+------+

Saving merged dataset as CSV...


                                                                                

Saving merged dataset as Parquet...


25/05/07 01:04:23 WARN MemoryManager: Total allocation exceeds 95.00% (3,626,971,910 bytes) of heap memory
Scaling row group sizes to 96.51% for 28 writers
25/05/07 01:04:23 WARN MemoryManager: Total allocation exceeds 95.00% (3,626,971,910 bytes) of heap memory
Scaling row group sizes to 93.18% for 29 writers
25/05/07 01:04:23 WARN MemoryManager: Total allocation exceeds 95.00% (3,626,971,910 bytes) of heap memory
Scaling row group sizes to 90.08% for 30 writers
25/05/07 01:04:23 WARN MemoryManager: Total allocation exceeds 95.00% (3,626,971,910 bytes) of heap memory
Scaling row group sizes to 87.17% for 31 writers
25/05/07 01:04:23 WARN MemoryManager: Total allocation exceeds 95.00% (3,626,971,910 bytes) of heap memory
Scaling row group sizes to 84.45% for 32 writers
25/05/07 01:04:23 WARN MemoryManager: Total allocation exceeds 95.00% (3,626,971,910 bytes) of heap memory
Scaling row group sizes to 81.89% for 33 writers
25/05/07 01:04:23 WARN MemoryManager: Total allocation exceeds 9

Merge and data preparation complete!



# Merged dataset:

*   Total Records: 1,348,488 job listings
*   Key Data Sources: LinkedIn job postings, job skills, and job summaries
*   Final Structure: 52 columns with comprehensive job information




In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("JobDataPreview") \
    .config("spark.driver.memory", "4g") \
    .getOrCreate()

# Path to the cleaned CSV
cleaned_csv_path = "/scratch/stp8232/Big Data/merged_jobpredict_data_cleaned_csv"

# Load the cleaned dataset
print("Loading the cleaned dataset...")
df = spark.read.csv(cleaned_csv_path, header=True, inferSchema=True)

# Get basic information about the dataset
print(f"\nDataset Overview:")
print(f"Total number of records: {df.count()}")
print(f"Number of columns: {len(df.columns)}")
print("\nColumn names:")
for col in df.columns:
    print(f"- {col}")

# Display sample data with better formatting
print("\nSample of cleaned data (5 rows):")
# Select important columns for display
display_columns = ["job_title", "company", "job_location", "country", "city",
                   "job_level", "job_skills", "job_summary"]
# Only include columns that exist in the dataframe
display_columns = [col for col in display_columns if col in df.columns]
df.select(display_columns).show(5, truncate=False)

# Show distribution of job levels
print("\nJob level distribution:")
df.groupBy("job_level").count().orderBy(F.desc("count")).show()

# Show top countries
print("\nTop 10 countries by job count:")
df.groupBy("country").count().orderBy(F.desc("count")).show(10)

# Check for null values in key columns
print("\nNull values in key columns:")
key_columns = ["job_title", "company", "job_location", "job_skills", "job_summary", "job_level"]
key_columns = [col for col in key_columns if col in df.columns]
null_counts = df.select([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in key_columns])
null_counts.show()

# Show summary statistics for any numeric columns (if present)
numeric_columns = [col_name for col_name, data_type in df.dtypes if data_type in ['int', 'double', 'float']]
if numeric_columns:
    print("\nSummary statistics for numeric columns:")
    df.select(numeric_columns).describe().show()

# If the dataset has skill indicator columns, show the most common skills
skill_columns = [col for col in df.columns if col.startswith("has_")]
if skill_columns:
    print("\nMost common skills (based on indicator columns):")
    skill_counts = []
    for col in skill_columns:
        skill_name = col[4:].replace('_', ' ')  # Remove 'has_' prefix and replace underscores
        count = df.filter(F.col(col) == 1).count()
        skill_counts.append((skill_name, count))

    # Sort by count in descending order
    skill_counts.sort(key=lambda x: x[1], descending=True)

    # Show top 15 skills
    for i, (skill, count) in enumerate(skill_counts[:15]):
        print(f"{i+1}. {skill}: {count}")

print("\nData preview complete!")

25/05/07 01:05:27 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


Loading the cleaned dataset...


                                                                                


Dataset Overview:
Total number of records: 1348488
Number of columns: 52

Column names:
- job_link
- last_processed_time
- got_summary
- got_ner
- is_being_worked
- job_title
- company
- job_location
- first_seen
- search_city
- search_country
- search_position
- job_level
- job_type
- first_seen_date
- job_skills
- job_summary
- country
- city
- has_communication
- has_customer_service
- has_teamwork
- has_communication_skills
- has_leadership
- has_problem_solving
- has_time_management
- has_attention_to_detail
- has_project_management
- has_problemsolving
- has_sales
- has_interpersonal_skills
- has_data_analysis
- has_microsoft_office_suite
- has_organizational_skills
- has_collaboration
- has_analytical_skills
- has_training
- has_microsoft_office
- has_adaptability
- has_patient_care
- has_multitasking
- has_inventory_management
- has_flexibility
- has_bachelor_s_degree
- has_scheduling
- has_organization
- has_budgeting
- has_high_school_diploma
- has_excel
- is_entry_level
- i

                                                                                

+-------+------+
|country| count|
+-------+------+
|     CA|117203|
|England|100176|
|     TX| 81914|
|     FL| 69616|
|     NY| 57504|
|     PA| 47042|
|     OH| 45251|
|     IL| 42841|
|     VA| 38098|
|     MA| 36883|
+-------+------+
only showing top 10 rows


Null values in key columns:


                                                                                

+---------+-------+------------+----------+-----------+---------+
|job_title|company|job_location|job_skills|job_summary|job_level|
+---------+-------+------------+----------+-----------+---------+
|       28|     57|           0|     54114|      51637|        0|
+---------+-------+------------+----------+-----------+---------+


Summary statistics for numeric columns:


                                                                                

+-------+------------------+--------------------+-----------------+------------------------+-------------------+-------------------+-------------------+-----------------------+----------------------+-------------------+-------------------+------------------------+-------------------+--------------------------+-------------------------+-------------------+---------------------+-------------------+--------------------+-------------------+-------------------+-------------------+------------------------+-------------------+---------------------+-------------------+-------------------+--------------------+-----------------------+-------------------+-------------------+--------------------+-------------------+
|summary| has_communication|has_customer_service|     has_teamwork|has_communication_skills|     has_leadership|has_problem_solving|has_time_management|has_attention_to_detail|has_project_management| has_problemsolving|          has_sales|has_interpersonal_skills|  has_data_analysis|ha

                                                                                

TypeError: 'descending' is an invalid keyword argument for sort()