In [1]:
import pyspark
import pandas as pd
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession.builder.appName('Test').getOrCreate()
spark

In [3]:
df_pyspark=spark.read.parquet('cleaned_postings.parquet')
df_pyspark=spark.read.option('header','true').parquet('cleaned_postings.parquet',inferSchema=True)


In [4]:
type(df_pyspark)
df_pyspark.printSchema()

root
 |-- job_id: long (nullable = true)
 |-- company_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- skills_desc: string (nullable = true)
 |-- normalized_salary: double (nullable = true)
 |-- formatted_experience_level: string (nullable = true)
 |-- formatted_work_type: string (nullable = true)
 |-- remote_allowed: boolean (nullable = true)
 |-- posting_domain: string (nullable = true)
 |-- location: string (nullable = true)
 |-- listed_time: double (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- extracted_skills: string (nullable = true)



In [5]:
from pyspark.sql import functions as F
df_rows = df_pyspark.withColumn("extracted_individual_skill", F.explode(F.split(F.col("extracted_skills"), "\|")))
df_rows.show(100)

  df_rows = df_pyspark.withColumn("extracted_individual_skill", F.explode(F.split(F.col("extracted_skills"), "\|")))


+----------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------------+-------------------+--------------+--------------------+--------------------+-----------+--------+--------------------+--------------------------+
|    job_id|        company_name|               title|         description|         skills_desc|normalized_salary|formatted_experience_level|formatted_work_type|remote_allowed|      posting_domain|            location|listed_time|zip_code|    extracted_skills|extracted_individual_skill|
+----------+--------------------+--------------------+--------------------+--------------------+-----------------+--------------------------+-------------------+--------------+--------------------+--------------------+-----------+--------+--------------------+--------------------------+
|    921716|Corcoran Sawyer S...|Marketing Coordin...|Job descriptionA ...|Requirements: \n\...|          38480.0|                      

In [20]:
(
    df_rows
    .filter(F.col("extracted_individual_skill").isNotNull())
    .filter(F.trim(F.col("extracted_individual_skill")) != "")
    .groupBy("extracted_individual_skill")
    .count()
    .orderBy(F.desc("count"))
    .show(1000, truncate=False)
)


+------------------------------+-----+
|extracted_individual_skill    |count|
+------------------------------+-----+
|excel                         |17851|
|go                            |8762 |
|agile                         |5932 |
|c                             |5406 |
|sql                           |5039 |
|python                        |4536 |
|aws                           |3034 |
|forecasting                   |2956 |
|azure                         |2825 |
|rest                          |2712 |
|java                          |2631 |
|oracle                        |2301 |
|statistics                    |1943 |
|javascript                    |1904 |
|express                       |1791 |
|machine learning              |1704 |
|scrum                         |1527 |
|jira                          |1480 |
|r                             |1479 |
|react                         |1358 |
|ci/cd                         |1325 |
|html                          |1237 |
|c++                     

In [8]:
skill_map = {
    "node.js": "nodejs",
    "ci / cd": "ci/cd",
    "amazon web services": "aws",
    "natural language processing": "nlp",
    "golang": "go",
    "postgres": "postgresql",
    "c #": "c#",
    "apache spark": "spark",
    "ab testing": "a/b testing"
    #add more if needed
}
mapping_expr = F.create_map(
    *[F.lit(x) for pair in skill_map.items() for x in pair]
)

df_norm = df_rows.withColumn(
    "skill",
    F.coalesce(mapping_expr[F.col("extracted_individual_skill")], F.col("extracted_individual_skill"))
)

In [18]:
(
    df_norm
    .filter(F.col("skill").isNotNull())
    .filter(F.trim(F.col("skill")) != "")
    .dropDuplicates(["job_id", "skill"])
    .groupBy("skill")
    .count()
    .orderBy(F.desc("count"))
    .show(1000, truncate=False)
)

+------------------------------+-----+
|skill                         |count|
+------------------------------+-----+
|excel                         |17851|
|go                            |8893 |
|agile                         |5932 |
|c                             |5406 |
|sql                           |5039 |
|python                        |4536 |
|aws                           |3118 |
|forecasting                   |2956 |
|azure                         |2825 |
|rest                          |2712 |
|java                          |2631 |
|oracle                        |2301 |
|statistics                    |1943 |
|javascript                    |1904 |
|express                       |1791 |
|machine learning              |1704 |
|scrum                         |1527 |
|jira                          |1480 |
|r                             |1479 |
|react                         |1358 |
|ci/cd                         |1339 |
|html                          |1237 |
|c++                     

In [10]:
skills_by_level = (
    df_norm
    .filter(F.col("skill").isNotNull())
    .filter(F.trim(F.col("skill")) != "")
    .fillna({"formatted_experience_level": "Unknown"})
    .groupBy("formatted_experience_level", "skill")
    .agg(F.countDistinct("job_id").alias("job_count"))
    .orderBy("formatted_experience_level", F.desc("job_count"))
)

skills_by_level.show(1000, truncate=False)

+--------------------------+------------------------------+---------+
|formatted_experience_level|skill                         |job_count|
+--------------------------+------------------------------+---------+
|Associate                 |excel                         |1961     |
|Associate                 |go                            |614      |
|Associate                 |agile                         |429      |
|Associate                 |sql                           |351      |
|Associate                 |c                             |303      |
|Associate                 |python                        |267      |
|Associate                 |forecasting                   |244      |
|Associate                 |statistics                    |181      |
|Associate                 |oracle                        |158      |
|Associate                 |azure                         |148      |
|Associate                 |aws                           |127      |
|Associate          

In [None]:
from pyspark.sql.functions import expr
job_skills = (
    df_norm
    .filter(F.col("skill").isNotNull())
    .filter(F.trim(F.col("skill")) != "")
    .groupBy("job_id")
    .agg(F.collect_set("skill").alias("skills"))
)
skill_pairs = (
    job_skills
    .withColumn(
        "skill_pairs",
        expr("""
            transform(
                filter(
                    sequence(0, size(skills) - 1),
                    i -> i < size(skills) - 1
                ),
                i -> transform(
                    sequence(i + 1, size(skills) - 1),
                    j -> struct(
                        skills[i] as skill_a,
                        skills[j] as skill_b
                    )
                )
            )
        """)
    )
    .select("job_id", F.explode("skill_pairs").alias("pairs"))
    .select("job_id", F.explode("pairs").alias("pair"))
    .select("job_id","pair.skill_a", "pair.skill_b")
)
skill_bundle_counts = (
    skill_pairs
    .groupBy("skill_a", "skill_b")
    .agg(F.countDistinct("job_id").alias("job_count"))
    .orderBy(F.desc("job_count"))
)
skill_bundle_counts.show(1000, truncate=False)

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `job_id` cannot be resolved. Did you mean one of the following? [`skill_a`, `skill_b`].;
'Aggregate [skill_a#971, skill_b#972], [skill_a#971, skill_b#972, 'count(distinct 'job_id) AS job_count#977]
+- Project [pair#968.skill_a AS skill_a#971, pair#968.skill_b AS skill_b#972]
   +- Project [job_id#28L, pair#968]
      +- Generate explode(pairs#964), false, [pair#968]
         +- Project [job_id#28L, pairs#964]
            +- Generate explode(skill_pairs#956), false, [pairs#964]
               +- Project [job_id#28L, skills#951, transform(filter(sequence(0, (size(skills#951, true) - 1), None, Some(America/New_York)), lambdafunction((lambda i#957 < (size(skills#951, true) - 1)), lambda i#957, false)), lambdafunction(transform(sequence((lambda i#958 + 1), (size(skills#951, true) - 1), None, Some(America/New_York)), lambdafunction(struct(skill_a, skills#951[lambda i#958], skill_b, skills#951[lambda j#959]), lambda j#959, false)), lambda i#958, false)) AS skill_pairs#956]
                  +- Aggregate [job_id#28L], [job_id#28L, collect_set(skill#154, 0, 0) AS skills#951]
                     +- Filter NOT (trim(skill#154, None) = )
                        +- Filter isnotnull(skill#154)
                           +- Project [job_id#28L, company_name#29, title#30, description#31, skills_desc#32, normalized_salary#33, formatted_experience_level#34, formatted_work_type#35, remote_allowed#36, posting_domain#37, location#38, listed_time#39, zip_code#40, extracted_skills#41, extracted_individual_skill#57, coalesce(map(node.js, nodejs, ci / cd, ci/cd, amazon web services, aws, natural language processing, nlp, golang, go, postgres, postgresql, c #, c#, apache spark, spark, ab testing, a/b testing)[extracted_individual_skill#57], extracted_individual_skill#57) AS skill#154]
                              +- Project [job_id#28L, company_name#29, title#30, description#31, skills_desc#32, normalized_salary#33, formatted_experience_level#34, formatted_work_type#35, remote_allowed#36, posting_domain#37, location#38, listed_time#39, zip_code#40, extracted_skills#41, extracted_individual_skill#57]
                                 +- Generate explode(split(extracted_skills#41, \|, -1)), false, [extracted_individual_skill#57]
                                    +- Relation [job_id#28L,company_name#29,title#30,description#31,skills_desc#32,normalized_salary#33,formatted_experience_level#34,formatted_work_type#35,remote_allowed#36,posting_domain#37,location#38,listed_time#39,zip_code#40,extracted_skills#41] parquet
