## Imports

In [0]:
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import IntegerType, FloatType, StringType
from pyspark.ml.feature import Tokenizer, Word2Vec
import pandas as pd
from pyspark.ml.linalg import Vectors, VectorUDT
from scipy.spatial.distance import euclidean
import numpy as np
from pyspark.sql.functions import (
    udf, size, col, avg, expr, explode, countDistinct, count, length, 
    monotonically_increasing_id, concat_ws, split, row_number, when, pandas_udf, 
    min, sqrt, desc, regexp_replace, lower, lit, regexp_extract, first, broadcast
    )
from pyspark.sql.window import Window
from tqdm import tqdm
import pickle
import os
pkl_file_path = "/dbfs/FileStore/clusters_stats_results_v7_all_data_percentage.pkl"
sample_size = 1

pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

spark = SparkSession.builder.getOrCreate()


## Load Data

In [0]:
profiles = spark.read.parquet('/linkedin/people')

## Profiles Data Preprocess

In [0]:
df_profiles = (
    profiles

    # get specific values
    .withColumnRenamed("id", 'profile_id')
    .withColumn('certifications_titles', col('certifications.title'))
    .withColumn('industry', col('current_company.industry'))
    .withColumn('education_degree', col('education.degree'))
    .withColumn('education_field', col('education.field'))
    .withColumn('education_establishment', col('education.title'))
    .withColumn('volunteer_causes', col('volunteer_experience.cause'))
    .withColumn('other_experience', col('experience.title'))

    # leave only relevant cols
    .select(
        "profile_id",
        "certifications_titles",
        "country_code", 
        "industry", 
        "current_company:name",
        "education_degree", 
        "education_field", 
        "education_establishment",
        "experience",
        "other_experience",
        "followers",
        "position", # (thats the job title)
        "recommendations_count",
        "volunteer_causes"
    )

    # add experience_months col
    # Extract the first experience's duration, then parse years and months into separate columns
    .withColumn("duration_short", col("experience")[0]["duration_short"]) \
    .withColumn("years", when(col("duration_short").contains("year"), 
                              expr("CAST(SPLIT(duration_short, ' ')[0] AS INT)")).otherwise(0)) \
    .withColumn("months", when(col("duration_short").contains("month"),
                               expr("""
                                    CAST(SPLIT(duration_short, ' ')[
                                        CASE
                                            WHEN duration_short LIKE '%year%' THEN 2
                                            ELSE 0
                                        END
                                    ] AS INT)
                                """)).otherwise(0)) \
    .withColumn("experience_months", col("years") * 12 + col("months")) \

    .dropna(subset=["position", "industry", "experience_months"])
    
    # add seniority level (huristically)
    # .withColumn("seniority_level", seniority_udf("experience_months"))
    .withColumn("seniority_level",
        when((col("experience_months") == 0), "Internship")
        .when((col("experience_months") > 0) & (col("experience_months") <= 24), "Entry Level")
        .when((col("experience_months") > 24) & (col("experience_months") <= 60), "Associate")
        .when((col("experience_months") > 60) & (col("experience_months") <= 120), "Mid-Senior Level")
        .when((col("experience_months") > 120) & (col("experience_months") <= 180), "Director")
        .when(col("experience_months") > 180, "Executive")
        .otherwise("Not Applicable")
    )

    # leave only relevant cols
    .select(
        "profile_id",
        "certifications_titles",
        "country_code", 
        "industry", 
        "current_company:name",
        "education_degree", 
        "education_field", 
        "education_establishment",
        "other_experience",
        "followers",
        "position", # (thats the job title)
        "recommendations_count",
        "volunteer_causes",
        'experience_months',
        'seniority_level'
    )

    # cache this df
    .cache()
)
# for embedding: position, industry, seniority_level

In [0]:
# sample
# TODO: this is temporary for debug. in the end remove and run all.
if sample_size < 1:
    df_profiles = df_profiles.sample(False, sample_size, seed=42)


## Jobs Embedding

In [0]:
# embedding

def add_embedding_to_df(df, job_title_col_name='position', industry_col_name='industry', seniority_level_col_name='seniority_level'):

    # Combine the relevant columns into a single column
    df = df.withColumn("combined", concat_ws(" ", job_title_col_name, industry_col_name, seniority_level_col_name))

    # Tokenize the combined column
    tokenizer = Tokenizer(inputCol="combined", outputCol="tokens")
    df = tokenizer.transform(df)

    # Create a Word2Vec model
    word2Vec = Word2Vec(vectorSize=100, minCount=5, inputCol='tokens', outputCol='embedding')

    # Cache the DataFrame as fitting the Word2Vec model is expensive
    df.cache()

    # Fit the Word2Vec model
    model = word2Vec.fit(df)

    # Transform the DataFrame to add the embeddings
    result_df = model.transform(df)

    # Don't forget to unpersist the DataFrame after the transformation
    df.unpersist()

    return result_df

In [0]:
df_profiles_emb = add_embedding_to_df(df=df_profiles)

## Set Centroids

In [0]:
# get centroids 
# File location and type
path_to_centroids = "dbfs:/FileStore/tables/jobs_clusterd.csv"
file_type = "csv"

# CSV options
infer_schema = "false"
first_row_is_header = "true"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df_clusters = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(path_to_centroids) \
  .drop("embedding") 

df_centroids = df_clusters\
  .filter(col('median') == 1)\
  .filter(col('cluster') != -1)\
  .withColumnRenamed("cluster", 'centroid_id')

In [0]:
# add embeding to centroids
df_centroids_emb = add_embedding_to_df(df=df_centroids, job_title_col_name='job title', industry_col_name='industries', seniority_level_col_name='seniority level')

## Matching Centroids to Profiles

In [0]:
@udf(FloatType())
def euclidean_distance_udf(vec1, vec2):
    # Convert vectors to numpy arrays
    vec1 = np.array(vec1)
    vec2 = np.array(vec2)
    # Calculate Euclidean distance
    return float(np.linalg.norm(vec1 - vec2))

# Assuming df_profiles_emb and df_centroids_emb have been properly generated and cached
df_distances = (
    df_profiles_emb.withColumnRenamed("id", 'profile_id')
    .crossJoin(broadcast(df_centroids_emb.withColumnRenamed("embedding", "embedding_centroid").withColumnRenamed("id", 'centroid_id')))
    # Apply the UDF to calculate the Euclidean distance between the embeddings
    .withColumn("euclidean_distance", euclidean_distance_udf("embedding", "embedding_centroid"))
)


In [0]:

# Define a window spec partitioned by profile ID (assuming "id" is the profile ID in df_profiles_emb) and ordered by distance
windowSpec = Window.partitionBy(df_profiles_emb["profile_id"]).orderBy("euclidean_distance")

# Rank centroids by distance for each profile and filter to keep only the closest centroid
df_profiles_w_centroid = (
    df_distances
    .withColumn("rank", row_number().over(windowSpec))
    .filter("rank = 1")
    # .select(df_profiles_emb["profile_id"], "centroid_id", "euclidean_distance")  # Adjust column names as necessary

    .select(
        "profile_id",
        "certifications_titles",
        "country_code", 
        "industry", 
        "current_company:name",
        "education_degree", 
        "education_field", 
        "education_establishment",
        "other_experience",
        "followers",
        "position", # (thats the job title)
        "recommendations_count",
        "volunteer_causes",
        'experience_months',
        'seniority_level',
        'profile_id',
        'centroid_id'
    )
)

df_profiles_w_centroid = df_profiles_w_centroid.cache()

## Statistics Functionality

In [0]:
# consts
TOP_NUM = 5
LIST_FIELDS = [
    'certifications_titles',
    # 'education_degree',
    'education_field',
    'education_establishment',
    'other_experience',
    'volunteer_causes',
]
INT_FIELDS = [
    'followers',
    'recommendations_count',
    # 'experience_months',
]
STR_FIELDS = [
    'profile_id',
    'country_code',
    'industry',
    'current_company:name',
    'position',
    'seniority_level'
]

# List of degree variations and their normalized forms
degree_variations = [
    ("BS", "Bachelor's"),
    ("Bachelor", "Bachelor's"),
    ("M.S", "Master's"),
    ("Master", "Master's"),
    ("Masters", "Master's"),
    ("M.Eng", "Master's"),
    ("MS", "Master's"),
    ("PhD", "PhD"),
    ("Doctorate", "PhD"),
    ("Doctor", "PhD"),
    ("MBA", "MBA")
]
# Patterns for identifying degrees
patterns = [x[0].lower() for x in degree_variations]
regex_pattern = "(" + "|".join(patterns) + ")"

# Define a UDF to map degrees to their normalized forms
@udf("string")
def normalize_degree(degree):
    degree_lower = str(degree).lower()
    for pattern, normalized in degree_variations:
        if pattern.lower() in degree_lower:
            return normalized
    return None

In [0]:
# histogram dict
def get_histogram_dict_of_field(df, field):
    # Collecting all follower counts
    field_counts = df.dropna(subset=[field]).select(field).rdd.flatMap(lambda x: x).collect()

    # Generating the histogram as a dictionary
    hist_dict = {value: field_counts.count(value) for value in set(field_counts)}
    return hist_dict

In [0]:
# Get distinct cluster_ids with minimal data movement
clusters_ids_l = df_profiles_w_centroid.select('centroid_id').distinct().rdd.map(lambda row: row['centroid_id']).collect()
df_profiles.unpersist()


Out[13]: DataFrame[profile_id: string, certifications_titles: array<string>, country_code: string, industry: string, current_company:name: string, education_degree: array<string>, education_field: array<string>, education_establishment: array<string>, other_experience: array<string>, followers: bigint, position: string, recommendations_count: bigint, volunteer_causes: array<string>, experience_months: int, seniority_level: string]

In [0]:
df_profiles_w_centroid = df_profiles_w_centroid.cache()

In [0]:
# #  reset saved stats
# clusters_stats_results = {}
# with open(pkl_file_path, 'wb') as file:
#     # Use pickle.dump to write the serialized data to the file
#     pickle.dump(clusters_stats_results, file)

In [0]:
clusters_stats_results = {}
if os.path.isfile(pkl_file_path):
    with open(pkl_file_path, 'rb') as file:
        clusters_stats_results = pickle.load(file)

for cluster_id in tqdm(clusters_ids_l):
    if cluster_id in clusters_stats_results:
        continue

    df_cluster = df_profiles_w_centroid.filter(df_profiles_w_centroid['centroid_id'] == cluster_id)
    df_cluster.cache()  # Cache the filtered DataFrame

    count_similar = df_cluster.count()
    cluster_dict = {"CountSimilar": count_similar}

    # Use Spark SQL functions more effectively for aggregations
    for list_field in LIST_FIELDS:
        cluster_dict[f"MostCommon_{list_field}"] = (
            df_cluster
            .withColumn(list_field, explode(list_field))
            .dropna(subset=[list_field])
            .groupBy(list_field).count()
            .orderBy(desc("count"))
            .limit(TOP_NUM)
            .withColumn("percentage", (100 * col("count") / count_similar).cast("float"))
            .rdd.map(lambda row: row.asDict()).collect()
        )
    
    # Explode the education_degree array into separate rows and lowercase the degree names
    cluster_dict['MostCommon_degree_type'] = (
        df_cluster
        .dropna(subset=['education_degree'])
        .withColumn("degree_type", explode("education_degree"))
        .withColumn("normalized_degree", normalize_degree(col("degree_type")))
        .filter(col("normalized_degree").isNotNull())
        .groupBy("normalized_degree")
        .count()
        .orderBy(desc("count"))
        .limit(TOP_NUM)
        .withColumn("percentage", (100 * col("count") / count_similar).cast("float"))
        .rdd.map(lambda row: row.asDict()).collect()
    )

    #-----------------------------------------------

    for str_field in STR_FIELDS:
        cluster_dict[f"MostCommon_{str_field}"] = (
            df_cluster
            .dropna(subset=[list_field])
            .groupBy(str_field).count()
            .orderBy(desc("count"))
            .limit(TOP_NUM)
            .withColumn("percentage", (100 * col("count") / count_similar).cast("float"))
            .rdd.map(lambda row: row.asDict()).collect()
        )

    for int_field in INT_FIELDS:
        cluster_dict[f"HistDict_{int_field}"] = get_histogram_dict_of_field(df=df_cluster, field=int_field)

    # experience_years cast to years
    df_cluster = df_cluster.withColumn("experience_years", (col("experience_months") / 12).cast("int"))
    cluster_dict[f"HistDict_experience_years"] = get_histogram_dict_of_field(df=df_cluster, field="experience_years")

    # add cluster dict to all dicts
    clusters_stats_results[cluster_id] = cluster_dict
    df_cluster.unpersist()  # Clean up cache when done with this cluster's DataFrame

    with open(pkl_file_path, 'wb') as file:
        # Use pickle.dump to write the serialized data to the file
        pickle.dump(clusters_stats_results, file)



  0%|          | 0/89 [00:00<?, ?it/s]  1%|          | 1/89 [00:39<58:18, 39.76s/it]  2%|▏         | 2/89 [01:09<49:04, 33.85s/it]  3%|▎         | 3/89 [01:38<45:33, 31.79s/it]  4%|▍         | 4/89 [02:04<41:33, 29.33s/it]  6%|▌         | 5/89 [02:35<41:55, 29.95s/it]  7%|▋         | 6/89 [03:06<41:48, 30.22s/it]  8%|▊         | 7/89 [03:33<40:03, 29.32s/it]  9%|▉         | 8/89 [04:07<41:32, 30.77s/it] 10%|█         | 9/89 [04:49<45:43, 34.30s/it] 11%|█         | 10/89 [05:17<42:43, 32.45s/it] 12%|█▏        | 11/89 [05:50<42:18, 32.54s/it] 13%|█▎        | 12/89 [06:27<43:31, 33.92s/it] 15%|█▍        | 13/89 [06:58<41:40, 32.90s/it] 16%|█▌        | 14/89 [07:26<39:15, 31.41s/it] 17%|█▋        | 15/89 [08:00<39:53, 32.35s/it] 18%|█▊        | 16/89 [08:31<38:45, 31.86s/it] 19%|█▉        | 17/89 [09:01<37:24, 31.17s/it] 20%|██        | 18/89 [09:46<41:56, 35.45s/it] 21%|██▏       | 19/89 [10:20<40:52, 35.04s/it] 22%|██▏       | 20/89 [10:46<37:18, 32.45s/it] 24%|██▎ 

In [0]:
df_profiles_w_centroid.unpersist()  # Clean up the initial cache

Out[17]: DataFrame[profile_id: string, certifications_titles: array<string>, country_code: string, industry: string, current_company:name: string, education_degree: array<string>, education_field: array<string>, education_establishment: array<string>, other_experience: array<string>, followers: bigint, position: string, recommendations_count: bigint, volunteer_causes: array<string>, experience_months: int, seniority_level: string, profile_id: string, centroid_id: string]

## Examine Results

In [0]:
clusters_stats_results = {}
pkl_file_path = "/dbfs/FileStore/clusters_stats_results_v7_all_data_percentage.pkl"
# pkl_file_path = "/dbfs/FileStore/clusters_stats_results_v6_all_data.pkl"
if os.path.isfile(pkl_file_path):
    with open(pkl_file_path, 'rb') as file:
        clusters_stats_results = pickle.load(file)
        

In [0]:
i = 0
gap=3

In [0]:
i = i+gap
[i, i+gap]

Out[124]: [90, 93]

In [0]:
# c_key = '2'
for c_key in list(clusters_stats_results.keys())[i:i+gap]:
# for c_key in list(clusters_stats_results.keys())[87:88]:
    print(c_key + ": ", clusters_stats_results[c_key], ",")