**Project preparation**

- Starting the Spark session
- Installing the libraries
- Downloading the dataset from kaggle
- Eliminating irrelevant files from the environment

In [2]:
!apt-get update

Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Hit:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Hit:3 https://ppa.launchpadcontent.net/c2d4u.team/c2d4u4.0+/ubuntu jammy InRelease
Hit:4 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Hit:5 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Hit:6 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:7 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:8 http://archive.ubuntu.com/ubuntu jammy InRelease
Get:9 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:10 http://archive.ubuntu.com/ubuntu jammy-backports InRelease
Fetched 261 kB in 3s (76.8 kB/s)
Reading package lists... Done


In [3]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()
spark.conf.set("spark.sql.repl.eagerEval.enabled", True)
spark

In [3]:
os.environ['KAGGLE_USERNAME'] = "xxxxxxxx"
os.environ['KAGGLE_KEY'] = "xxxxxxxx"

In [4]:
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024

Dataset URL: https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024
License(s): ODC Attribution License (ODC-By)
Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content
 99% 1.86G/1.88G [00:24<00:00, 112MB/s] 
100% 1.88G/1.88G [00:24<00:00, 83.2MB/s]


In [5]:
!unzip 1-3m-linkedin-jobs-and-skills-2024.zip

Archive:  1-3m-linkedin-jobs-and-skills-2024.zip
  inflating: job_skills.csv          
  inflating: job_summary.csv         
  inflating: linkedin_job_postings.csv  


In [6]:
unused_files = ['job_summary.csv', 'linkedin_job_postings.csv', '1-3m-linkedin-jobs-and-skills-2024.zip']
for file in unused_files:
    if os.path.exists(file):
        os.remove(file)
        print(f"{file} has been removed.")
    else:
        print(f"{file} does not exist.")

print(os.listdir('.'))

job_summary.csv has been removed.
linkedin_job_postings.csv has been removed.
1-3m-linkedin-jobs-and-skills-2024.zip has been removed.
['.config', 'job_skills.csv', 'spark-3.1.1-bin-hadoop3.2', 'spark-3.1.1-bin-hadoop3.2.tgz', 'sample_data']


**Data Preprocessing**
- Installing the libraries used for data preprocessing and visualization
- Load the dataframe and visualize the main info
- Prepare the data for further processing

In [4]:
import pandas as pd
import numpy as np
from pyspark.sql.functions import col, lower, split, explode, collect_list, trim, udf
from pyspark.sql.types import StringType
import pyspark.sql.functions as F
import matplotlib.pyplot as plt

In [5]:
from pyspark.sql.functions import regexp_replace
from pyspark.sql.types import ArrayType, StringType
from pyspark.sql.functions import collect_list

In [6]:
job_skills_df = spark.read.options(header=True).csv('job_skills.csv')
job_skills_df = job_skills_df.dropna(subset=["job_skills"])

In [10]:
df_no_prep = job_skills_df.withColumn("job_skills", split(col("job_skills"), ",\s*"))
skills_no_prep = df_no_prep.withColumn("skill", explode(col("job_skills")))
skills_count_no_prep = skills_no_prep.groupBy("skill").count().orderBy("count", ascending=False)
skills_count_no_prep.show(20)
skills_count_no_prep.count()

+--------------------+------+
|               skill| count|
+--------------------+------+
|       Communication|368202|
|            Teamwork|226205|
|          Leadership|184292|
|    Customer service|166158|
|Communication skills|116169|
|    Customer Service|110400|
|     Problem Solving|102020|
|               Sales| 92718|
|      Problemsolving| 92489|
|             Nursing| 87419|
|       Collaboration| 86774|
|            Training| 83178|
|  Project Management| 81080|
|Communication Skills| 78700|
| Attention to detail| 75448|
|Microsoft Office ...| 73351|
|     Time management| 72460|
|     Time Management| 69752|
|          Scheduling| 64081|
|    Microsoft Office| 60260|
+--------------------+------+
only showing top 20 rows



3298454

In [11]:
df_lowercase = job_skills_df.withColumn("job_skills", split(lower(col("job_skills")), ",\s*"))
skills_lowercase = df_lowercase.withColumn("skill", explode(col("job_skills")))
skills_count_lowercase = skills_lowercase.groupBy("skill").count().orderBy("count", ascending=False)
skills_count_lowercase.show(20)
skills_count_lowercase.count()

+--------------------+------+
|               skill| count|
+--------------------+------+
|       communication|370052|
|    customer service|278033|
|            teamwork|227548|
|communication skills|195837|
|          leadership|185138|
|     problem solving|148992|
|     time management|142873|
| attention to detail|133929|
|      problemsolving|129299|
|  project management|121525|
|interpersonal skills|100223|
|        patient care| 99912|
|               sales| 92983|
|             nursing| 87949|
|       collaboration| 87086|
|            training| 83639|
|       data analysis| 81949|
|microsoft office ...| 75508|
|organizational sk...| 75257|
|inventory management| 71902|
+--------------------+------+
only showing top 20 rows



2770596

In [12]:
remove_whitespace_udf = udf(lambda skills: [s.replace(" ", "") for s in skills], ArrayType(StringType()))
df_no_whitespace = df_lowercase.withColumn("job_skills_no_whitespace", remove_whitespace_udf(col("job_skills")))
skills_no_whitespace = df_no_whitespace.withColumn("skill", explode(col("job_skills_no_whitespace")))
skills_count_no_whitespace = skills_no_whitespace.groupBy("skill").count().orderBy("count", ascending=False)
skills_count_no_whitespace.show(20)
skills_count_no_whitespace.count()

+--------------------+------+
|               skill| count|
+--------------------+------+
|       communication|370052|
|      problemsolving|278295|
|     customerservice|278069|
|            teamwork|245192|
| communicationskills|195844|
|          leadership|185139|
|      timemanagement|143255|
|   attentiontodetail|133971|
|   projectmanagement|121540|
| interpersonalskills|100229|
|         patientcare| 99926|
|               sales| 92983|
|             nursing| 87949|
|       collaboration| 87086|
|            training| 83639|
|        dataanalysis| 81958|
|microsoftofficesuite| 75544|
|organizationalskills| 75261|
| inventorymanagement| 71904|
|   highschooldiploma| 67357|
+--------------------+------+
only showing top 20 rows



2724867

In [7]:
def preprocess_column(c):
    return lower(regexp_replace(col(c), "\s+", ""))

for column in job_skills_df.columns:
    if isinstance(job_skills_df.schema[column].dataType, StringType):
        job_skills_df = job_skills_df.withColumn(column, preprocess_column(column))

df_lowercase = job_skills_df.withColumn("job_skills", lower(col("job_skills")))

df_processed = df_lowercase.withColumn("job_skills", split(col("job_skills"), ",\s*"))

baskets = df_processed.select("job_skills").rdd.map(lambda row: row[0])

In [8]:
baskets.first()

['buildingcustodialservices',
 'cleaning',
 'janitorialservices',
 'materialshandling',
 'housekeeping',
 'sanitation',
 'wastemanagement',
 'floormaintenance',
 'equipmentmaintenance',
 'safetyprotocols',
 'communicationskills',
 'attentiontodetail',
 'physicalstrength',
 'experienceinhousekeeping']

In [9]:
count = baskets.count()

In [43]:
print(count)

1294374


In [46]:
support_threshold_percentage = 0.01
min_support = int(support_threshold_percentage * count)

In [47]:
print(min_support)

12943


In [11]:
hash = baskets.flatMap(lambda line: line)
hash.first()

'buildingcustodialservices'

In [12]:
hash = hash.distinct()
skills = hash.count()
skills

2724866

In [13]:
hash = hash.zipWithIndex()
hash.take(5)

[('professionalliabilityinsurance', 0),
 ('estimating', 1),
 ('planning', 2),
 ('workpackaging', 3),
 ('industryguides', 4)]

In [14]:
hash_index = hash.collectAsMap()

In [15]:
first = list(hash_index.items())[:5]
for key, value in first:
    print(f"{key}: {value}")

professionalliabilityinsurance: 0
estimating: 1
planning: 2
workpackaging: 3
industryguides: 4


In [16]:
def hashing(basket):
    return {hash_index[skill] for skill in basket}

baskets = baskets.map(hashing)

In [17]:
baskets.first()

{454820,
 454821,
 909292,
 909293,
 909294,
 909295,
 909296,
 909297,
 1363219,
 1363220,
 1817358,
 1817359,
 1817360,
 2270567}

In [18]:
from itertools import combinations

In [19]:
def count_skills(baskets, min_support):
  skill_count = baskets.flatMap(lambda basket: [(skill, 1) for skill in basket])
  aggregated_count = skill_count.reduceByKey(lambda count1, count2: count1 + count2)
  frequent_skills = aggregated_count.filter(lambda skill_count: skill_count[1] > min_support)
  return frequent_skills

In [49]:
#Print the values of the most frequent itemset based on the hashtable
#Return the count of the frequent itemsets
def frequent_values(set, table, is_single):
    count = set.count()
    print(f"Count: {count}")

    most_frequent_hash = set.max(lambda x: x[1])
    if is_single:
      most_frequent_value = (list(table.keys())[list(table.values()).index(most_frequent_hash[0])])
      print("Most frequent: ", most_frequent_value)

    else:
      itemset = []
      for skill in most_frequent_hash[0]:
        itemset.append(list(table.keys())[list(table.values()).index(skill)])
      print("Most frequent:", itemset)
    return count

def apriori(baskets, min_support, hash_table):
    print("Singletons: ")
    frequent_singletons = count_skills(baskets, min_support)
    count = frequent_singletons.count()



    if count > 0:
      print("Counted singletons:")
      print(count)
      most_frequent_hash = frequent_singletons.max(lambda x: x[1])
      most_frequent_value = (list(hash_table.keys())[list(hash_table.values()).index(most_frequent_hash[0])])
      print("Most frequent: ", most_frequent_value)

      frequent_set = set(frequent_singletons.map(lambda x: (x[0], )).collect())
      k = 2

      while True:
          print(f"Itemsets of size {k}")

          def generate_candidates(basket):
            basket = sorted(basket)
            return [tuple(sorted(element)) for element in combinations(basket, k) if all(item in frequent_set for item in combinations(element, k - 1))]

          itemsets = baskets.flatMap(lambda basket: [(element, 1) for element in generate_candidates(basket)]) \
                                                          .reduceByKey(lambda x, y: x + y) \
                                                          .filter(lambda x: x[1] > min_support)

          count = itemsets.count()

          if count > 0:
            print(f"Counted itemsets of size {k}:")
            print(count)
            most_frequent_hash = itemsets.max(lambda x: x[1])
            most_frequent_itemset = [list(hash_table.keys())[list(hash_table.values()).index(skill)] for skill in most_frequent_hash[0]]
            print("Most frequent: ", most_frequent_itemset)

            print("Continue counting")
            frequent_set = set(itemsets.map(lambda x: x[0]).collect())
            k += 1
          else:
            print("No more itemsets")
            break
    else:
      print("No singletons found")

In [21]:
sample_rdd = baskets.sample(False, 0.0001, seed=42)

In [22]:
sample_count = sample_rdd.count()

In [23]:
print(sample_count)

127


In [40]:
apriori(sample_rdd, 5, hash_index)

Singletons: 
Counted singletons:
26
Most frequent:  communication
Itemsets of size 2
Counted itemsets if size 2:
20
Most frequent:  ['customerservice', 'communication']
Continue counting
Itemsets of size 3
Counted itemsets if size 3:
8
Most frequent:  ['teamwork', 'customerservice', 'communication']
Continue counting
Itemsets of size 4
No more itemsets


In [None]:
apriori(baskets, min_support, hash_index)

Singletons: 
Counted singletons:
190
Most frequent:  communication
Itemsets of size 2
Counted itemsets of size 2:
266
Most frequent:  ['problemsolving', 'communication']
Continue counting
Itemsets of size 3
Counted itemsets of size 3:
165
Most frequent:  ['teamwork', 'problemsolving', 'communication']
Continue counting
Itemsets of size 4
