<a href="https://colab.research.google.com/github/riccardotenuta/market_basket_analysis/blob/main/Market_Basket_Analysis.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Import dataset from Kaggle

In [1]:
from google.colab import files
files.upload()

Saving kaggle.json to kaggle.json


{'kaggle.json': b'{"username":"riccardotenuta0023","key":"9fe14c68d229c6d70a59ac7ce2172410"}'}

In [2]:
!ls -lha kaggle.json
!pip install -q kaggle # installing the kaggle package
!mkdir -p ~/.kaggle # creating .kaggle folder where the key should be placed
!cp kaggle.json ~/.kaggle/ # move the key to the folder
!pwd # checking the present working directory

-rw-r--r-- 1 root root 74 May  1 14:16 kaggle.json
/content


In [3]:
!chmod 600 ~/.kaggle/kaggle.json

In [4]:
!kaggle datasets download -d asaniczka/1-3m-linkedin-jobs-and-skills-2024 -p /content/drive/MyDrive/

Dataset URL: https://www.kaggle.com/datasets/asaniczka/1-3m-linkedin-jobs-and-skills-2024
License(s): ODC Attribution License (ODC-By)
Downloading 1-3m-linkedin-jobs-and-skills-2024.zip to /content/drive/MyDrive
100% 1.87G/1.88G [00:34<00:00, 98.9MB/s]
100% 1.88G/1.88G [00:34<00:00, 57.9MB/s]


In [5]:
!unzip /content/drive/MyDrive/1-3m-linkedin-jobs-and-skills-2024.zip -d /content/drive/MyDrive/

Archive:  /content/drive/MyDrive/1-3m-linkedin-jobs-and-skills-2024.zip
  inflating: /content/drive/MyDrive/job_skills.csv  
  inflating: /content/drive/MyDrive/job_summary.csv  
  inflating: /content/drive/MyDrive/linkedin_job_postings.csv  


In [6]:
import pandas as pd
import numpy as np
import os
!pip install pyspark
from pyspark.sql import SparkSession
import pyspark

Collecting pyspark
  Downloading pyspark-3.5.1.tar.gz (317.0 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m317.0/317.0 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.5.1-py2.py3-none-any.whl size=317488491 sha256=4ad287b5b5874d58587b629fe932f753293ef730ab88207fc645bfd46e7610c3
  Stored in directory: /root/.cache/pip/wheels/80/1d/60/2c256ed38dddce2fdd93be545214a63e02fbd8d74fb0b7f3a6
Successfully built pyspark
Installing collected packages: pyspark
Successfully installed pyspark-3.5.1


## Data preparation

In [7]:
spark = SparkSession.builder.appName("linkedin_project_SON").getOrCreate()

In [8]:
# import of the csv and selecting only the job_skills column
rdd_from_csv = spark.read.csv('./drive/MyDrive/job_skills.csv', header=True).dropna()
rdd_from_csv = rdd_from_csv.select(rdd_from_csv['job_skills']).rdd

In [9]:
# splitting every basket to create the item list
rdd_from_csv = rdd_from_csv.map(lambda basket: basket['job_skills'].split(', '))

In [10]:
# example of the first two baskets
rdd_from_csv.take(2)

[['Building Custodial Services',
  'Cleaning',
  'Janitorial Services',
  'Materials Handling',
  'Housekeeping',
  'Sanitation',
  'Waste Management',
  'Floor Maintenance',
  'Equipment Maintenance',
  'Safety Protocols',
  'Communication Skills',
  'Attention to Detail',
  'Physical Strength',
  'Experience in Housekeeping'],
 ['Customer service',
  'Restaurant management',
  'Food safety',
  'Training',
  'Supervision',
  'Scheduling',
  'Inventory',
  'Cost control',
  'Sales',
  'Communication',
  'Problemsolving',
  'Leadership',
  'Motivation',
  'Teamwork',
  'High School Diploma',
  "Bachelor's Degree",
  'ServSafe Certification',
  "Valid Driver's License",
  'Physical ability to perform job duties']]

In [11]:
# sampling the 10% of the whole dataset to compute easily the SON algorithm
rdd_son = rdd_from_csv.sample(withReplacement=False, fraction=0.02)
num_baskets = rdd_son.count()
num_baskets

25597

In [12]:
initial_partitions = rdd_son.getNumPartitions()
"""
Spark suggests to use 2-4 partitions for each CPU on the machine, since running the algorithm only on a single machine
I'll take this value from the cpu_count() function inside the multiprocessing library

https://spark.apache.org/docs/latest/rdd-programming-guide.html#parallelized-collections
"""
from multiprocessing import cpu_count
optimal_partitions = 4*cpu_count()

rdd_son.repartition(numPartitions=optimal_partitions)

print(f'Partitions before -> {initial_partitions}')
print(f'Optimal partitions -> {optimal_partitions}')

Partitions before -> 6
Optimal partitions -> 8


In [13]:
# define the support for each partition

support_threshold = round(0.02*num_baskets)
st_partition = round(support_threshold / rdd_son.getNumPartitions())

print(f'Support threshold for each partition/chunk of data is {st_partition}')

Support threshold for each partition/chunk of data is 85


In [14]:
def first_pass(partition, support: int) -> list:

  item_count = {}

  for basket in partition:
    for item in basket:
      item_count[item] = item_count.get(item, 0) + 1

  frequent_singleton = [(item, count) for item, count in item_count.items() if count >= support]

  return sorted(frequent_singleton, key=lambda x: x[1], reverse=True)

first_pass_rdd = rdd_son.mapPartitions(lambda partition: first_pass(partition, st_partition))

In [15]:
first_pass_rdd.take(10)

[('Communication', 1342),
 ('Teamwork', 839),
 ('Leadership', 640),
 ('Customer service', 582),
 ('Communication skills', 439),
 ('Customer Service', 396),
 ('Problem Solving', 383),
 ('Attention to detail', 339),
 ('Problemsolving', 332),
 ('Communication Skills', 324)]

In [16]:
frequent_singleton = first_pass_rdd.map(lambda item: item[0]).collect()
frequent_singleton[:10]

['Communication',
 'Teamwork',
 'Leadership',
 'Customer service',
 'Communication skills',
 'Customer Service',
 'Problem Solving',
 'Attention to detail',
 'Problemsolving',
 'Communication Skills']

In [17]:
from itertools import combinations

def second_pass(partition, support: int) -> list:

  couples_count = {}

  for basket in partition:
    candidate_couples = list(combinations(basket, 2))
    for c in candidate_couples:

      if all(x in frequent_singleton for x in c):
          couples_count[c] = couples_count.get(c, 0) + 1

  frequent_couples = [(couple, count) for couple, count in couples_count.items() if count >= support]
  return sorted(frequent_couples, reverse=True, key=lambda x: x[1])

second_pass_rdd = rdd_son.mapPartitions(lambda partition: second_pass(partition, st_partition))

In [27]:
frequent_couples = second_pass_rdd.reduceByKey(lambda x,y: x+y).collect()

In [19]:
second_pass_rdd.take(10)

[(('Communication', 'Teamwork'), 410),
 (('Customer service', 'Communication'), 225),
 (('Leadership', 'Communication'), 217),
 (('Communication', 'Leadership'), 206),
 (('Communication', 'Problemsolving'), 194),
 (('Communication', 'Problem Solving'), 186),
 (('Customer service', 'Teamwork'), 163),
 (('Communication', 'Adaptability'), 150),
 (('Communication', 'Attention to detail'), 148),
 (('Communication', 'Time Management'), 145)]

In [20]:
def get_frequent_itemset(partition, support, frequent_itemset, n_pass):

  item_set_count = {}
  for basket in partition:

    candidate_itemset = list(combinations(basket, n_pass))
    for c in candidate_itemset:

      if n_pass > 2: subset = list(combinations(c, n_pass-1))
      else: subset = c

      # check if all the item of the candidate set are in the basket and if its subsets are contained in the previous frequent itemsets
      if all(x in frequent_itemset for x in subset):
        item_set_count[c] = item_set_count.get(c, 0) + 1

  new_frequent_itemset = [(itemset, count) for itemset, count in item_set_count.items() if count >= support]

  return sorted(new_frequent_itemset, reverse=True, key=lambda x: x[1])


In [21]:
# get frequent singleton

rdd_iter = []
n_pass = 1

rdd_frequent_itemset = rdd_son.mapPartitions(lambda partition: first_pass(partition, st_partition))
print(f'Pass {n_pass}')
print(rdd_frequent_itemset.take(10))
rdd_iter.append(rdd_frequent_itemset)

frequent_itemset = set(rdd_frequent_itemset.map(lambda item: item[0]).collect())
frequent_itemset_len = rdd_frequent_itemset.count()

n_pass = 2

while len(frequent_itemset) > 0:

  rdd_frequent_itemset = rdd_son.mapPartitions(lambda partition: get_frequent_itemset(partition, st_partition, frequent_itemset, n_pass)).reduceByKey(lambda x,y: x+y)
  rdd_iter.append(rdd_frequent_itemset)
  print(f'Pass {n_pass}')

  print(rdd_frequent_itemset.take(10))

  frequent_itemset = set(rdd_frequent_itemset.map(lambda item: item[0]).collect())

  n_pass += 1


Pass 1
[('Communication', 1342), ('Teamwork', 839), ('Leadership', 640), ('Customer service', 582), ('Communication skills', 439), ('Customer Service', 396), ('Problem Solving', 383), ('Attention to detail', 339), ('Problemsolving', 332), ('Communication Skills', 324)]
Pass 2
[(('Communication', 'Time management'), 695), (('Communication', 'Flexibility'), 587), (('Customer service', 'Communication skills'), 472), (('Customer service', 'Leadership'), 457), (('Leadership', 'Training'), 499), (('Communication', 'Microsoft Office Suite'), 276), (('Problem Solving', 'Time Management'), 175), (('Training', 'Communication'), 90), (('Communication', 'Attention to detail'), 659), (('Customer service', 'Problemsolving'), 758)]
Pass 3
[(('Leadership', 'Communication', 'Teamwork'), 86), (('Customer service', 'Communication', 'Teamwork'), 393), (('Customer service', 'Communication', 'Problemsolving'), 370), (('Communication', 'Teamwork', 'Problemsolving'), 85)]
Pass 4
[]


In [28]:
def get_couples_support(couple, frequent_couples):
  for c in frequent_couples:
    if list(c[0]) == couple:
      return c[1]

In [29]:
def compute_association_rules(frequent_itemset_support, frequent_couples):
  for frequent_itemset in frequent_itemset_support[:5]:
    for item in frequent_itemset[0]:
      fi = list(frequent_itemset[0])

      fi.remove(item)
      couple_support = get_couples_support(fi, frequent_couples)
      support_with_item = frequent_itemset[1]

      confidence = round((support_with_item / couple_support)*100, 1)

      print(f'{fi} --> {item} with {confidence}%')


In [30]:
# TODO reduce to all frquent couples from all the partitions (maintain unique frequent couples)
frequent_triplets = rdd_iter[2].reduceByKey(lambda x, y: x+y).collect()
compute_association_rules(frequent_triplets, frequent_couples)

['Communication', 'Teamwork'] --> Leadership with 3.9%
['Leadership', 'Teamwork'] --> Communication with 10.4%
['Leadership', 'Communication'] --> Teamwork with 6.7%
['Communication', 'Teamwork'] --> Customer service with 17.7%
['Customer service', 'Teamwork'] --> Communication with 40.4%
['Customer service', 'Communication'] --> Teamwork with 32.1%
['Communication', 'Problemsolving'] --> Customer service with 35.2%
['Customer service', 'Problemsolving'] --> Communication with 48.8%
['Customer service', 'Communication'] --> Problemsolving with 30.2%
['Teamwork', 'Problemsolving'] --> Communication with 15.0%
['Communication', 'Problemsolving'] --> Teamwork with 8.1%
['Communication', 'Teamwork'] --> Problemsolving with 3.8%
