In [4]:
from pyspark.sql import SparkSession

# Start a Spark session
spark = SparkSession.builder.appName("JobPostingsAnalysis").getOrCreate()

# Load the CSV file into a Spark DataFrame
df = spark.read.option("header", "true").option("inferSchema", "true").option("multiLine","true").option("escape", "\"").csv("data/lightcast_job_postings.csv")

# Show schema
df.printSchema()

[Stage 3:>                                                          (0 + 1) / 1]

root
 |-- ID: string (nullable = true)
 |-- LAST_UPDATED_DATE: string (nullable = true)
 |-- LAST_UPDATED_TIMESTAMP: timestamp (nullable = true)
 |-- DUPLICATES: integer (nullable = true)
 |-- POSTED: string (nullable = true)
 |-- EXPIRED: string (nullable = true)
 |-- DURATION: integer (nullable = true)
 |-- SOURCE_TYPES: string (nullable = true)
 |-- SOURCES: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- ACTIVE_URLS: string (nullable = true)
 |-- ACTIVE_SOURCES_INFO: string (nullable = true)
 |-- TITLE_RAW: string (nullable = true)
 |-- BODY: string (nullable = true)
 |-- MODELED_EXPIRED: string (nullable = true)
 |-- MODELED_DURATION: integer (nullable = true)
 |-- COMPANY: integer (nullable = true)
 |-- COMPANY_NAME: string (nullable = true)
 |-- COMPANY_RAW: string (nullable = true)
 |-- COMPANY_IS_STAFFING: boolean (nullable = true)
 |-- EDUCATION_LEVELS: string (nullable = true)
 |-- EDUCATION_LEVELS_NAME: string (nullable = true)
 |-- MIN_EDULEVELS: integer (

                                                                                

In [7]:
pdf = df.limit(5000).toPandas()

print("Dataset Info:")
pdf.info()

print("First 5 Rows:")
print(pdf.head())

print("Missing Values:")
print(pdf.isnull().sum())

                                                                                

Dataset Info:
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 5000 entries, 0 to 4999
Columns: 131 entries, ID to NAICS_2022_6_NAME
dtypes: datetime64[ns](1), float64(38), object(92)
memory usage: 5.0+ MB
First 5 Rows:
                                         ID LAST_UPDATED_DATE  \
0  1f57d95acf4dc67ed2819eb12f049f6a5c11782c          9/6/2024   
1  0cb072af26757b6c4ea9464472a50a443af681ac          8/2/2024   
2  85318b12b3331fa490d32ad014379df01855c557          9/6/2024   
3  1b5c3941e54a1889ef4f8ae55b401a550708a310          9/6/2024   
4  cb5ca25f02bdf25c13edfede7931508bfd9e858f         6/19/2024   

   LAST_UPDATED_TIMESTAMP  DUPLICATES    POSTED    EXPIRED  DURATION  \
0 2024-09-06 20:32:57.352         0.0  6/2/2024   6/8/2024       6.0   
1 2024-08-02 17:08:58.838         0.0  6/2/2024   8/1/2024       NaN   
2 2024-09-06 20:32:57.352         1.0  6/2/2024   7/7/2024      35.0   
3 2024-09-06 20:32:57.352         1.0  6/2/2024  7/20/2024      48.0   
4 2024-06-19 07:00:00.000   

In [6]:
# Delete duplicate lines
df_cleaned = df.dropDuplicates()

# Check the number of rows after cleaning
original_count = df.count()
cleaned_count = df_cleaned.count()

print(f"removed duplicates: {original_count - cleaned_count} rows deleted.")
print(f"Remaining rows: {cleaned_count}")



removed duplicates: 21 rows deleted.
Remaining rows: 72477


                                                                                

In [8]:
# Handle Missing Values

from pyspark.sql import functions as F

# Convert to numeric type
df = df.withColumn("SALARY", F.col("SALARY").cast("double"))

# Calculate the median and fill it in missing values
median_salary = df.approxQuantile("SALARY", [0.5], 0.01)[0]
df = df.fillna({"SALARY": median_salary})

print(f"Filled missing salaries with median: {median_salary}")

[Stage 15:>                                                         (0 + 1) / 1]

Filled missing salaries with median: 115024.0


                                                                                

In [None]:
from pyspark.sql import functions as F

#The date of conversion
date_cols = ["LAST_UPDATED_DATE","POSTED", "EXPIRED",]

for c in date_cols:
    df = df.withColumn(
        c,
        F.when(F.col(c).rlike(r"^\d{1,2}/\d{1,2}/\d{4}$"), F.to_date(F.col(c), "M/d/yyyy")).otherwise(None)
    )
    print(f"✅ Safely converted {c} to date type (invalid values set to NULL).")

df.select(date_cols).show(5)
df.printSchema()

In [23]:
#Count Total Job Postings

total_postings = df.count()
print(f"Total Job Postings: {total_postings}")

[Stage 36:>                                                         (0 + 1) / 1]

Total Job Postings: 72498


                                                                                

In [25]:
#Find Top 5 Job Titles

from pyspark.sql import functions as F
top_job_titles = df.groupBy("TITLE_RAW").count().orderBy(F.desc("count")).limit(5)
top_job_titles.show()

[Stage 39:>                                                         (0 + 1) / 1]

+--------------------+-----+
|           TITLE_RAW|count|
+--------------------+-----+
|        Data Analyst| 4201|
|Enterprise Architect|  808|
| Senior Data Analyst|  724|
|Business Intellig...|  686|
|        Data Modeler|  281|
+--------------------+-----+



                                                                                

In [27]:
#Calculate Average Salary by Job Type
avg_salary_by_job_type = df.groupBy("TITLE_RAW").agg(F.avg("SALARY").alias("Average_Salary")).orderBy(F.desc("Average_Salary"))
avg_salary_by_job_type.show()

[Stage 42:>                                                         (0 + 1) / 1]

+--------------------+-----------------+
|           TITLE_RAW|   Average_Salary|
+--------------------+-----------------+
|Pulmonology/Criti...|         500000.0|
|Upcoming Oracle C...|         500000.0|
|                 ENT|         470000.0|
|Svp, Digital Tran...|         437500.0|
|Urgent Care Physi...|         370275.0|
|Urgent Care Physi...|         370275.0|
|Director, Busines...|         353500.0|
|Geriatrics, Palli...|         347075.0|
|AI Portfolio & Ec...|         345350.0|
|Enterprise Princi...|333833.3333333333|
|Managing Principa...|         328600.0|
|Creative Technolo...|323366.6666666667|
|Vice President, P...|         319100.0|
|Security & Privac...|         317000.0|
|Nephrologist - An...|         315175.0|
|I.P. - Patent Pro...|         312500.0|
|(USA) Senior Dist...|         312000.0|
|Dir PMPD Data Ena...|         311600.0|
|Vp, Data Analytic...|         311000.0|
|Oracle ERP Cloud ...|         310050.0|
+--------------------+-----------------+
only showing top

                                                                                