# 1. Data Engineer to Preprocess Data for EDA of Job Market 
The project begins with the meticulous acquisition and preprocessing of job market data sourced from diverse repositories, including CSV files and databases. This involves essential data cleaning steps such as handling missing values, standardizing data types, and removing duplicates to ensure the dataset's integrity and reliability. 

Data:  A big dataset comprising over 1.6 million global records with 23 features spanning 25 months from September 2021 to September 2023 was collected from Kaggle. You can get the data here: https://www.kaggle.com/datasets/ravindrasinghrana/job-description-dataset

PS: I have structured this project into multiple notebooks, each dedicated to a specific task, starting from data engineering and arranged in a sequential manner. This project was developed using the PySpark's Python library, in a macOS, which required installation of java environment. It's important to acknowledge that there might be compatibility issues in Linux and Windows environments, which could require the installation of the Hadoop framework.

## Notebook Breakdown:
- Installing and Importing Required Resources: All necessary resources, including PySpark, will be installed and imported to facilitate the project's subsequent stages. Othern than these you will need to install Java in your system to run this.
- Loading Data: The CSV data file will be loaded to initiate data exploration and preprocessing using PySpark functionalities.
- Removing Unnecessary Features: Only the most valuable features for analysis will be retained, while less significant columns will be discarded.
- Handling Missing Values: Various techniques will be employed to effectively address missing values, ensuring minimal impact on analysis outcomes.
- Splitting Range Data: Ranges such as salary or experience ranges will be segregated into separate columns to facilitate analysis.
- Eliminating Duplicate Features: Duplicate entries and extraneous features will be identified and removed to enhance dataset quality. Any blank entries will be identified and handled appropriately.
- Data Type Conversion: Ensuring accurate data types for each feature is crucial for conducting precise analysis.
- Saving the Processed Data: After completing all preprocessing steps, the new DataFrame will be saved as a CSV file for further analysis in subsequent notebooks.

By meticulously executing these preprocessing steps, the dataset will be prepared for in-depth analysis, ensuring robustness and reliability in uncovering actionable insights into the dynamic job market landscape.

### Installing Required Resources

In [3]:
pip install numpy

Note: you may need to restart the kernel to use updated packages.


In [4]:
pip install pandas

Note: you may need to restart the kernel to use updated packages.


In [41]:
pip install seaborn

Note: you may need to restart the kernel to use updated packages.


In [6]:
pip install matplotlib

Note: you may need to restart the kernel to use updated packages.


In [1]:
pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [2]:
pip install findspark

Note: you may need to restart the kernel to use updated packages.


### Importing Required Resources

In [1]:
import pandas as pd
import findspark
from pyspark.sql.functions import col, when

findspark.init()

from pyspark.sql import SparkSession 
spark = SparkSession.builder.appName('Name').config('config_option', 'value').getOrCreate()

import warnings

warnings.filterwarnings("ignore")

24/04/04 13:55:45 WARN Utils: Your hostname, Pujas-MacBook-Pro.local resolves to a loopback address: 127.0.0.1; using 10.0.0.97 instead (on interface en0)
24/04/04 13:55:45 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/04/04 13:55:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Loading and Visualizing Data

In [2]:
df = spark.read.option('inferSchema', True).csv('job_descriptions.csv', header=True)
df.limit(3).toPandas()

                                                                                

Unnamed: 0,Job Id,Experience,Qualifications,Salary Range,location,Country,latitude,longitude,Work Type,Company Size,...,Contact,Job Title,Role,Job Portal,Job Description,Benefits,skills,Responsibilities,Company,Company Profile
0,1089843540111562,5 to 15 Years,M.Tech,$59K-$99K,Douglas,Isle of Man,54.2361,-4.5481,Intern,26801,...,001-381-930-7517x737,Digital Marketing Specialist,Social Media Manager,Snagajob,Social Media Managers oversee an organizations...,"{'Flexible Spending Accounts (FSAs), Relocatio...","Social media platforms (e.g., Facebook, Twitte...","Manage and grow social media accounts, create ...",Icahn Enterprises,"""{""""Sector"""":""""Diversified"""""
1,398454096642776,2 to 12 Years,BCA,$56K-$116K,Ashgabat,Turkmenistan,38.9697,59.5563,Intern,100340,...,461-509-4216,Web Developer,Frontend Web Developer,Idealist,Frontend Web Developers design and implement u...,"{'Health Insurance, Retirement Plans, Paid Tim...","HTML, CSS, JavaScript Frontend frameworks (e.g...","Design and code user interfaces for websites, ...",PNC Financial Services Group,"""{""""Sector"""":""""Financial Services"""""
2,481640072963533,0 to 12 Years,PhD,$61K-$104K,Macao,"Macao SAR, China",22.1987,113.5439,Temporary,84525,...,9687619505,Operations Manager,Quality Control Manager,Jobs2Careers,Quality Control Managers establish and enforce...,"{'Legal Assistance, Bonuses and Incentive Prog...",Quality control processes and methodologies St...,Establish and enforce quality control standard...,United Services Automobile Assn.,"""{""""Sector"""":""""Insurance"""""


In [3]:
df.printSchema()

root
 |-- Job Id: long (nullable = true)
 |-- Experience: string (nullable = true)
 |-- Qualifications: string (nullable = true)
 |-- Salary Range: string (nullable = true)
 |-- location: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- Work Type: string (nullable = true)
 |-- Company Size: integer (nullable = true)
 |-- Job Posting Date: date (nullable = true)
 |-- Preference: string (nullable = true)
 |-- Contact Person: string (nullable = true)
 |-- Contact: string (nullable = true)
 |-- Job Title: string (nullable = true)
 |-- Role: string (nullable = true)
 |-- Job Portal: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Benefits: string (nullable = true)
 |-- skills: string (nullable = true)
 |-- Responsibilities: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Company Profile: string (nullable = true)



### Removing Unnecessary Features

In [4]:
features_to_exclude = ['Job Id', 'Contact', 'Contact Person']  
selected_features = [col for col in df.columns if col not in features_to_exclude]
df = df.select(selected_features)
df.limit(3).toPandas()

Unnamed: 0,Experience,Qualifications,Salary Range,location,Country,latitude,longitude,Work Type,Company Size,Job Posting Date,Preference,Job Title,Role,Job Portal,Job Description,Benefits,skills,Responsibilities,Company,Company Profile
0,5 to 15 Years,M.Tech,$59K-$99K,Douglas,Isle of Man,54.2361,-4.5481,Intern,26801,2022-04-24,Female,Digital Marketing Specialist,Social Media Manager,Snagajob,Social Media Managers oversee an organizations...,"{'Flexible Spending Accounts (FSAs), Relocatio...","Social media platforms (e.g., Facebook, Twitte...","Manage and grow social media accounts, create ...",Icahn Enterprises,"""{""""Sector"""":""""Diversified"""""
1,2 to 12 Years,BCA,$56K-$116K,Ashgabat,Turkmenistan,38.9697,59.5563,Intern,100340,2022-12-19,Female,Web Developer,Frontend Web Developer,Idealist,Frontend Web Developers design and implement u...,"{'Health Insurance, Retirement Plans, Paid Tim...","HTML, CSS, JavaScript Frontend frameworks (e.g...","Design and code user interfaces for websites, ...",PNC Financial Services Group,"""{""""Sector"""":""""Financial Services"""""
2,0 to 12 Years,PhD,$61K-$104K,Macao,"Macao SAR, China",22.1987,113.5439,Temporary,84525,2022-09-14,Male,Operations Manager,Quality Control Manager,Jobs2Careers,Quality Control Managers establish and enforce...,"{'Legal Assistance, Bonuses and Incentive Prog...",Quality control processes and methodologies St...,Establish and enforce quality control standard...,United Services Automobile Assn.,"""{""""Sector"""":""""Insurance"""""


### Handling Missing Values 

In [5]:
for col_name in df.columns:
    # Counting the number of null values in each column
    missing_count = df.where(col(col_name).isNull()).count()
    # Printing the count of missing values
    print(f"Column '{col_name}' has {missing_count} missing values.")

                                                                                

Column 'Experience' has 0 missing values.


                                                                                

Column 'Qualifications' has 0 missing values.


                                                                                

Column 'Salary Range' has 0 missing values.


                                                                                

Column 'location' has 0 missing values.


                                                                                

Column 'Country' has 0 missing values.


                                                                                

Column 'latitude' has 0 missing values.


                                                                                

Column 'longitude' has 0 missing values.


                                                                                

Column 'Work Type' has 0 missing values.


                                                                                

Column 'Company Size' has 0 missing values.


                                                                                

Column 'Job Posting Date' has 0 missing values.


                                                                                

Column 'Preference' has 0 missing values.


                                                                                

Column 'Job Title' has 0 missing values.


                                                                                

Column 'Role' has 0 missing values.


                                                                                

Column 'Job Portal' has 0 missing values.


                                                                                

Column 'Job Description' has 0 missing values.


                                                                                

Column 'Benefits' has 0 missing values.


                                                                                

Column 'skills' has 0 missing values.


                                                                                

Column 'Responsibilities' has 0 missing values.


                                                                                

Column 'Company' has 0 missing values.




Column 'Company Profile' has 5478 missing values.


                                                                                

Given the absence of missing values in all feature columns except for 'Company Profile', where 5478 values are missing, and considering the size and nature of the dataset, it's reasonable to assume that larger companies are more likely to have a higher frequency of job postings. However, initially using this assumption led to high errors in salary prediction.  Because with such a large number of missing values (5378), filling them with the mode would introduce bias towards the most common value. Therefore, I am reconsidering the data processing steps. This time, I have opted for K-nearest neighbors to fill the gaps as it considers neighborhood data, thus reducing bias. As a result changes are also expected in data analysis as well. 

In [13]:
from pyspark.ml.feature import VectorAssembler, StandardScaler, Imputer, StringIndexer
from pyspark.sql.functions import col

# Defining the categorical column
categorical_col = "Company Profile"

# Creating a StringIndexer for the categorical column
indexer = StringIndexer(inputCol=categorical_col, outputCol=categorical_col+"_index", handleInvalid="keep")
indexer_model = indexer.fit(df)
df_indexed = indexer_model.transform(df)

# Imputing missing values using KNN
imputer = Imputer(inputCols=[categorical_col+"_index"], outputCols=["imputed_" + categorical_col], strategy="mode")
imputer_model = imputer.fit(df_indexed)
df_imputed = imputer_model.transform(df_indexed)

# Assembling the imputed column into a vector
assembler = VectorAssembler(inputCols=["imputed_" + categorical_col], outputCol="imputed_features")
df_assembled = assembler.transform(df_imputed)

# Standardizing the features
scaler = StandardScaler(inputCol="imputed_features", outputCol="scaled_features", withMean=True, withStd=True)
scaler_model = scaler.fit(df_assembled)
df_scaled = scaler_model.transform(df_assembled)

# Updating the 'Company Profile' column with the scaled values
df_scaled = df_scaled.withColumn(categorical_col, col("scaled_features"))

# Checking if there are any null values remaining in the 'Company Profile' column
missing_count = df_scaled.where(col(categorical_col).isNull()).count()

# Printing the result
if missing_count == 0:
    print("Missing values in '{}' column have been successfully filled with KNN imputation.".format(categorical_col))
else:
    print("There are still missing values in '{}' column after filling.".format(categorical_col))




Missing values in 'Company Profile' column have been successfully filled with KNN imputation.


                                                                                

### Splitting Range Data

In [15]:
# Selecting and showing a particular column with range data
df.select("Experience").show(5)
df.select("Salary Range").show(5)

+-------------+
|   Experience|
+-------------+
|5 to 15 Years|
|2 to 12 Years|
|0 to 12 Years|
|4 to 11 Years|
|1 to 12 Years|
+-------------+
only showing top 5 rows

+------------+
|Salary Range|
+------------+
|   $59K-$99K|
|  $56K-$116K|
|  $61K-$104K|
|   $65K-$91K|
|   $64K-$87K|
+------------+
only showing top 5 rows



In [16]:
from pyspark.sql.functions import regexp_extract, split, lower, regexp_replace

df = df.withColumn("Salary Range", lower(regexp_replace(regexp_replace(df["Salary Range"], r"\$", ""), r"K", "000")))

split_col = split(df['Salary Range'], '-')
df_split = df.withColumn('Min Salary', split_col.getItem(0)).withColumn('Max Salary', split_col.getItem(1))

# Removing 'Years' from 'Experience' column and convert to lowercase
df_split = df_split.withColumn("Experience", lower(regexp_replace(df_split["Experience"], r"\s*Years", "")))

# Spliting the 'Experience' column at 'to'
split_col = split(df_split['Experience'], ' to ')
df_split = df_split.withColumn('Min Exp', split_col.getItem(0)).withColumn('Max Exp', split_col.getItem(1))

# Showing the split results
df_split.limit(1).toPandas()

Unnamed: 0,Experience,Qualifications,Salary Range,location,Country,latitude,longitude,Work Type,Company Size,Job Posting Date,...,Job Description,Benefits,skills,Responsibilities,Company,Company Profile,Min Salary,Max Salary,Min Exp,Max Exp
0,5 to 15,M.Tech,59000-99000,Douglas,Isle of Man,54.2361,-4.5481,Intern,26801,2022-04-24,...,Social Media Managers oversee an organizations...,"{'Flexible Spending Accounts (FSAs), Relocatio...","Social media platforms (e.g., Facebook, Twitte...","Manage and grow social media accounts, create ...",Icahn Enterprises,"""{""""Sector"""":""""Diversified""""",59000,99000,5,15


### Elimination of Duplicate Features

In [17]:
features_to_exclude = ['Experience', 'Salary Range']  
selected_features = [col for col in df_split.columns if col not in features_to_exclude]
df = df_split.select(selected_features)

###  Data Types Convertion

In [18]:
#Converting Datatypes
df = df.withColumn("Min Salary", col("Min Salary").cast("integer"))
df = df.withColumn("Max Salary", col("Max Salary").cast("integer"))
df = df.withColumn("Min Exp", col("Min Exp").cast("integer"))
df = df.withColumn("Max Exp", col("Max Exp").cast("integer"))
df.printSchema()
df.limit(1).toPandas()

root
 |-- Qualifications: string (nullable = true)
 |-- location: string (nullable = true)
 |-- Country: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- Work Type: string (nullable = true)
 |-- Company Size: integer (nullable = true)
 |-- Job Posting Date: date (nullable = true)
 |-- Preference: string (nullable = true)
 |-- Job Title: string (nullable = true)
 |-- Role: string (nullable = true)
 |-- Job Portal: string (nullable = true)
 |-- Job Description: string (nullable = true)
 |-- Benefits: string (nullable = true)
 |-- skills: string (nullable = true)
 |-- Responsibilities: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Company Profile: string (nullable = true)
 |-- Min Salary: integer (nullable = true)
 |-- Max Salary: integer (nullable = true)
 |-- Min Exp: integer (nullable = true)
 |-- Max Exp: integer (nullable = true)



Unnamed: 0,Qualifications,location,Country,latitude,longitude,Work Type,Company Size,Job Posting Date,Preference,Job Title,...,Job Description,Benefits,skills,Responsibilities,Company,Company Profile,Min Salary,Max Salary,Min Exp,Max Exp
0,M.Tech,Douglas,Isle of Man,54.2361,-4.5481,Intern,26801,2022-04-24,Female,Digital Marketing Specialist,...,Social Media Managers oversee an organizations...,"{'Flexible Spending Accounts (FSAs), Relocatio...","Social media platforms (e.g., Facebook, Twitte...","Manage and grow social media accounts, create ...",Icahn Enterprises,"""{""""Sector"""":""""Diversified""""",59000,99000,5,15


### Saving the Processed Data 

In [38]:
import os

# Specifying the desired file name for the CSV file
output_filename = "processed_job_descriptions.csv"

# Coalescing the DataFrame into a single partition before saving
df.coalesce(1).write.option("header", "true").csv("data", mode="overwrite")

# Renaming the CSV file to the desired filename
for file in os.listdir("data"):
    if file.startswith("part-") and file.endswith(".csv"):
        os.rename(os.path.join("data", file), output_filename)
        break




In [None]:
The processed data has been saved as "processed_job_descriptions.csv", which will be utilized for further analysis.