In [19]:
!pip install pyspark


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m25.2[0m[39;49m -> [0m[32;49m25.3[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m


In [34]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col

spark = (
    SparkSession.builder
    .appName("Datacamp Pyspark Tutorial")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "10g")
    .getOrCreate()
)

In [21]:
file_path_job_skills = "../resources/job_skills.csv"
file_path_job_summary = "../resources/job_summary.csv"
file_path_job_postings = "../resources/linkedin_job_postings.csv"

df_job_skills = spark.read.csv(file_path_job_skills, header=True, escape='"', inferSchema=True)
df_job_summary = spark.read.csv(file_path_job_summary, header=True, escape='"', inferSchema=True)
df_job_postings = spark.read.csv(file_path_job_postings, header=True, escape='"', inferSchema=True)


                                                                                

In [22]:
df_job_skills.show(5,0)
df_job_summary.show(5,0)
df_job_postings.show(5,0)

+-------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|job_link                                                                                                                       |job_skills                                              

In [23]:
# Display schema of the dataframes
print("Schema of df_job_skills:")
df_job_skills.printSchema()

print("Schema of df_job_summary:")
df_job_summary.printSchema()

print("Schema of df_job_postings:")
df_job_postings.printSchema()

Schema of df_job_skills:
root
 |-- job_link: string (nullable = true)
 |-- job_skills: string (nullable = true)

Schema of df_job_summary:
root
 |-- job_link: string (nullable = true)
 |-- job_summary: string (nullable = true)

Schema of df_job_postings:
root
 |-- job_link: string (nullable = true)
 |-- last_processed_time: string (nullable = true)
 |-- got_summary: string (nullable = true)
 |-- got_ner: string (nullable = true)
 |-- is_being_worked: string (nullable = true)
 |-- job_title: string (nullable = true)
 |-- company: string (nullable = true)
 |-- job_location: string (nullable = true)
 |-- first_seen: string (nullable = true)
 |-- search_city: string (nullable = true)
 |-- search_country: string (nullable = true)
 |-- search_position: string (nullable = true)
 |-- job_level: string (nullable = true)
 |-- job_type: string (nullable = true)



In [24]:
# Print the number of rows and columns
print(f"df_job_skills: {df_job_skills.count()} rows, {len(df_job_skills.columns)} columns")
print(f"df_job_summary: {df_job_summary.count()} rows, {len(df_job_summary.columns)} columns")
print(f"df_job_postings: {df_job_postings.count()} rows, {len(df_job_postings.columns)} columns")

df_job_skills: 1296381 rows, 2 columns


                                                                                

df_job_summary: 48219735 rows, 2 columns
df_job_postings: 1348488 rows, 14 columns


In [25]:
# Obtain descriptive statistics
print("Descriptive statistics for df_job_postings:")
df_job_postings.describe().show()

Descriptive statistics for df_job_postings:


25/12/09 19:47:49 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

+-------+--------------------+--------------------+--------------------+----------+---------------+--------------------+--------------------+--------------------+----------+-----------+--------------+----------------+----------+--------+
|summary|            job_link| last_processed_time|         got_summary|   got_ner|is_being_worked|           job_title|             company|        job_location|first_seen|search_city|search_country| search_position| job_level|job_type|
+-------+--------------------+--------------------+--------------------+----------+---------------+--------------------+--------------------+--------------------+----------+-----------+--------------+----------------+----------+--------+
|  count|             1348488|             1348488|             1348488|   1348488|        1348488|             1348488|             1348443|             1348435|   1348454|    1348420|       1348420|         1348420|   1348420| 1348420|
|   mean|                NULL|                NU

                                                                                

In [35]:
# Create a new column with the primary skill
df_job_skills_with_primary = df_job_skills.withColumn("primary_skill", split(col("job_skills"), ",")[0])
df_job_skills_with_primary.show(5, 0)

+-------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+-------------------------------+
|job_link                                                                                                                       |job_skills              

In [28]:
# Aggregate functions
# Count of jobs by job level
df_job_postings.groupBy("job_level").count().show()

# Count of jobs by job type
df_job_postings.groupBy("job_type").count().show()

                                                                                

+----------+-------+
| job_level|  count|
+----------+-------+
|      NULL|     68|
| Associate| 144009|
|Mid senior|1204411|
+----------+-------+

+--------+-------+
|job_type|  count|
+--------+-------+
|  Remote|   4259|
|  Onsite|1337599|
|    NULL|     68|
|  Hybrid|   6562|
+--------+-------+



In [29]:
# Grouping and Sorting
# Group by company and count job postings, then sort
company_counts = df_job_postings.groupBy("company").count().sort("count", ascending=False)
company_counts.show(10)

+--------------------+-----+
|             company|count|
+--------------------+-----+
|     Health eCareers|41598|
|   Jobs for Humanity|27680|
|   TravelNurseSource|16142|
|      Dollar General|14815|
|        PracticeLink| 9738|
|      Energy Jobline| 9365|
|Gotham Enterprise...| 8935|
|               Jobot| 8713|
|       ClearanceJobs| 8599|
|          McDonald's| 8125|
+--------------------+-----+
only showing top 10 rows


In [30]:
# Join df_job_postings and df_job_skills
df_joined = df_job_postings.join(df_job_skills, "job_link", "inner")
df_joined.show(5)

[Stage 53:=====>                                                   (1 + 9) / 10]

+--------------------+--------------------+-----------+-------+---------------+--------------------+--------------------+--------------------+----------+------------+--------------+------------------+----------+--------+--------------------+
|            job_link| last_processed_time|got_summary|got_ner|is_being_worked|           job_title|             company|        job_location|first_seen| search_city|search_country|   search_position| job_level|job_type|          job_skills|
+--------------------+--------------------+-----------+-------+---------------+--------------------+--------------------+--------------------+----------+------------+--------------+------------------+----------+--------+--------------------+
|https://ae.linked...|2024-01-21 07:21:...|          t|      t|              f|        EVS Operator|              Sundus|Abu Dhabi, Abu Dh...|2024-01-17|Saint Joseph| United States|     Unit Operator|Mid senior|  Onsite|EVS, EVS Operator...|
|https://ae.linked...|2024-01-21

                                                                                

In [32]:
from pyspark.sql.functions import count, sum, lit

# Aggregate Window Functions
windowSpecAgg = Window.partitionBy("job_level")

# Count number of postings per job level
df_agg_window = df_job_postings.withColumn("postings_per_level", count("*").over(windowSpecAgg))
df_agg_window.select("job_title", "job_level", "postings_per_level").show(10)

# Sum of postings per job level (same as count here, but for demonstration)
df_agg_window_sum = df_job_postings.withColumn("sum_postings_per_level", sum(lit(1)).over(windowSpecAgg))
df_agg_window_sum.select("job_title", "job_level", "sum_postings_per_level").show(10)

+--------------------+---------+------------------+
|           job_title|job_level|postings_per_level|
+--------------------+---------+------------------+
|Senior Electrical...|     NULL|                68|
|      United Kingdom|     NULL|                68|
|      Parts Advisor |     NULL|                68|
|      United Kingdom|     NULL|                68|
|      Parts Advisor |     NULL|                68|
|      United Kingdom|     NULL|                68|
|      Parts Advisor |     NULL|                68|
|      United Kingdom|     NULL|                68|
|    Service Advisor |     NULL|                68|
|      United Kingdom|     NULL|                68|
+--------------------+---------+------------------+
only showing top 10 rows
+--------------------+---------+----------------------+
|           job_title|job_level|sum_postings_per_level|
+--------------------+---------+----------------------+
|Senior Electrical...|     NULL|                    68|
|      United Kingdom| 