In [0]:
## initial pyspark session

# import relevant libraries and columns
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pandas as pd
from pyspark.sql.functions import array, size, udf,col, explode, countDistinct, col, size, avg, col, log, lit, coalesce, expr, array, col, when
from pyspark.sql.types import IntegerType

pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

spark = SparkSession.builder.getOrCreate()

In [0]:
# gets the static people and companies tables from linked in
profiles = spark.read.parquet('/linkedin/people')
companies = spark.read.parquet('/linkedin/companies')

In [0]:
## filter the profiles who doesn't have or had any position, and have at least one education

# filter profiles without position
profiles = profiles.filter(col('position').isNotNull() & 
                    (col('position') != '--'))

#filter profiles without past positions
array_length_udf = udf(lambda x: len(x) if x is not None else 0, IntegerType())
profiles = profiles.withColumn('len', array_length_udf(col('experience')))
profiles = profiles.filter(col('len') > 1)

#filter profiles without education
profiles = profiles.withColumn('len2', array_length_udf(col('education')))
profiles = profiles.filter(col('len2') > 0)

print(profiles.count())
profiles.display(10)

In [0]:
## pre processing for experience

def experience_preprocessing(profiles_df):
    jobs_table = profiles_df.select(['id', 'experience', 'current_company:name', 'position'])
    #jobs_table.cache()
    # jobs_table.display(10)


    ## companies' names
    def extract_companies(experience, current_company):
        companies = [current_company.lower()] if current_company else []
        for company_dict in experience:
            company = company_dict['company']
            if company:
                company_lower = company.lower()
                # if company_lower not in companies:
                companies.append(company_lower)
        return companies

    # turn the data about companies to document for bm25
    companiesUdf = udf(extract_companies, ArrayType(StringType()))
    jobs_table = jobs_table.\
            withColumn('companies', companiesUdf(jobs_table['experience'], jobs_table['current_company:name']))

    ## positions
    def extract_positions(experience, current_position):
        positions = []
        if current_position:
            current_position = current_position.lower()
            if ' at ' in current_position:
                current_position = current_position[0:current_position.index(' at ')]
            if ' . ' in current_position:
                current_position = current_position[0:current_position.index(' . ')]
            positions.append(current_position)
            
        if experience:
            for company_dict in experience:
                if 'positions' in company_dict and company_dict['positions']:
                    for position_dict in company_dict['positions']:
                        position_title = position_dict['title']
                        if position_title:
                            position = position_title.lower()
                            # if position not in positions:
                            positions.append(position)
        return positions

    # turn the data about positions to document for bm25
    positionsUdf = udf(extract_positions, ArrayType(StringType()))
    jobs_table = jobs_table.\
            withColumn('positions', positionsUdf(jobs_table['experience'], jobs_table['position']))

    # combine them
    mergeCols = udf(lambda companies, positions: companies + positions, ArrayType(StringType()))
    jobs_table = jobs_table.withColumn("full_experience", mergeCols(col("companies"), col("positions")))

    # jobs_table.display(10)
    return jobs_table

In [0]:
## pre processing for education

def education_preprocessing(profiles_df):
    study_table = profiles_df.select(['id', 'education', 'educations_details', 'сourses'])
    # study_table.display(10)

    ## education column
    def extract_degrees(educations):
        degrees = []
        for education_dict in educations:
            degree = education_dict['degree']
            if degree:
                degree_lower = degree.lower()
                # if degree_lower not in degrees:
                degrees.append(degree_lower)
        return degrees

    # turn the data about degrees to document for bm25
    degreesUdf = udf(extract_degrees, ArrayType(StringType()))
    study_table = study_table.\
            withColumn('degrees', degreesUdf(study_table['education']))

    def extract_fields(educations):
        fields = []
        for education_dict in educations:
            field = education_dict['field']
            if field:
                field_lower = field.lower()
                # if degree_lower not in degrees:
                fields.append(field_lower)
        return fields

    # turn the data about fields to document for bm25
    fieldsUdf = udf(extract_fields, ArrayType(StringType()))
    study_table = study_table.\
            withColumn('fields', fieldsUdf(study_table['education']))

    def extract_titles(educations):
        titles = []
        for education_dict in educations:
            title = education_dict['title']
            if title:
                title_lower = title.lower()
                # if degree_lower not in degrees:
                titles.append(title_lower)
        return titles

    # turn the data about titles to document for bm25
    titlesUdf = udf(extract_titles, ArrayType(StringType()))
    study_table = study_table.\
            withColumn('titles', titlesUdf(study_table['education']))

    ## details columns
    titlesUdf = udf(lambda x: [x.lower()] if x else [], ArrayType(StringType()))
    study_table = study_table.\
            withColumn('details', titlesUdf(study_table['educations_details']))

    ## courses column
    def extract_titles(courses):
        titles = []
        for course_dict in courses:
            title = course_dict['title']
            if title:
                title_lower = title.lower()
                # if degree_lower not in degrees:
                titles.append(title_lower)
        return titles

    # turn the data about courses to document for bm25
    titlesUdf = udf(extract_titles, ArrayType(StringType()))
    study_table = study_table.\
            withColumn('courses_titles', titlesUdf(study_table['сourses']))

    def extract_subtitles(courses):
        subtitles = []
        for course_dict in courses:
            subtitle = course_dict['subtitle'].replace('-','')
            if subtitle:
                subtitle_lower = subtitle.lower()
                # if degree_lower not in degrees:
                subtitles.append(subtitle_lower)
        return subtitles

    # turn the data about subtitles to document for bm25
    subtitlesUdf = udf(extract_subtitles, ArrayType(StringType()))
    study_table = study_table.\
            withColumn('courses_subtitles', titlesUdf(study_table['сourses']))

    # combine them
    mergeCols = udf(lambda x: [item for sublist in x for item in sublist], ArrayType(StringType()))
    study_table = study_table.withColumn("full_education", mergeCols(array(col("degrees"), col("fields"), col("titles"), col("details"), col("courses_titles"), col("courses_subtitles"))))

    #study_table.display(10)
    return study_table


In [0]:
## pre processing for general information

def information_preprocessing(profiles_df):
    info_table = profiles_df.select(['id', 'about', 'certifications'])
    info_table.display(10)

    ## about column
    infoUdf = udf(lambda x: x.split(' ') if x else '', ArrayType(StringType()))
    info_table = info_table.\
            withColumn('abouts', infoUdf(info_table['about']))

    ## certifications column
    def extract_metas(certifications):
        metas = []
        for certification_dict in certifications:
            meta = certification_dict['meta']
            if meta:
                meta_lower = meta.lower()
                # if degree_lower not in degrees:
                metas.append(meta_lower)
        return metas

    # turn the data about metas to document for bm25
    metasUdf = udf(extract_metas, ArrayType(StringType()))
    info_table = info_table.\
            withColumn('metas', metasUdf(info_table['certifications']))

    def extract_titles(certifications):
        titles = []
        for certification_dict in certifications:
            title = certification_dict['title']
            if title:
                title_lower = title.lower()
                # if degree_lower not in degrees:
                titles.append(title_lower)
        return titles

    # turn the data about certifications titles to document for bm25
    titlesUdf = udf(extract_titles, ArrayType(StringType()))
    info_table = info_table.\
            withColumn('certifications_titles', titlesUdf(info_table['certifications']))

    def extract_subtitles(certifications):
        subtitles = []
        for certification_dict in certifications:
            subtitle = certification_dict['subtitle'].replace('-','')
            if subtitle:
                subtitle_lower = subtitle.lower()
                # if degree_lower not in degrees:
                subtitles.append(subtitle_lower)
        return subtitles

    # turn the data about certifications subtitles to document for bm25
    subtitlesUdf = udf(extract_subtitles, ArrayType(StringType()))
    info_table = info_table.\
            withColumn('certifications_subtitles', titlesUdf(info_table['certifications']))

    # combine them
    mergeCols = udf(lambda x: [item for sublist in x if sublist and all(sublist) for item in sublist], ArrayType(StringType()))
    info_table = info_table.withColumn("information", mergeCols(array(col("abouts"), col("metas"), col("certifications_titles"), col("certifications_subtitles"))))

    #info_table.display(10)
    return info_table

In [0]:
# apply the pre processing for all segmants

# pre processing for education 
study_table = education_preprocessing(profiles)

# pre processing
jobs_table = experience_preprocessing(profiles)

#pre processing for imformation
info_table = information_preprocessing(profiles)

In [0]:
## preper given table to bm25 on certain column in it(create IR tf-idf data)

def preper_bm25(jobs_table, column):
    jobs_table_exploded = jobs_table.withColumn("token", explode(f"{column}"))

    # calculating df
    jobs_table_df = jobs_table_exploded.groupBy("token").agg(countDistinct("id").alias("doc_freq"))
    # jobs_table_df.display(10)

    # calculating tf
    jobs_table_tf = jobs_table_exploded.groupBy("id", "token").count().withColumnRenamed('count', 'term_freq')
    # jobs_table_tf.display(10)

    # Calculate the length of each document
    jobs_table_size = jobs_table.withColumn("doc_length", size(f"{column}")).select('id','doc_length')
    # jobs_table_size.display(10)

    # calculate total number of documents and average document length
    N = jobs_table.select(countDistinct("id")).collect()[0][0]
    avg_doc_len = jobs_table_size.groupBy().avg("doc_length").collect()[0][0]
    # print("Number of rows in the table:", N)
    # print("Average value of the column:", avg_doc_len)

    # Join all calculation
    jobs_table_joined = jobs_table_tf.join(jobs_table_df, "token").join(jobs_table_size, 'id')
    # jobs_table_joined.display(10)
    
    return jobs_table_joined, N, avg_doc_len

In [0]:
# preper the segmants in tf-idf manner

# prepare the experience to bm25
jobs_table_joined, N, avg_doc_len = preper_bm25(jobs_table, 'full_experience')

# prepare the education to bm25
study_table_joined, N, avg_doc_len = preper_bm25(study_table, 'full_education')

# prepare the information to bm25
info_table_joined, N, avg_doc_len = preper_bm25(info_table, 'information')

In [0]:
# ## interact with new profile not from profiles table

# profile_table


# # pre processing for education 
# new_study_table = education_preprocessing(profile_table)

# # pre processing
# new_jobs_table = experience_preprocessing(profile_table)

# #pre processing for imformation
# new_info_table = information_preprocessing(profile_table)


# # get the experience query for the new profile
# specific_profile_experience = new_jobs_table.first()['full_experience']

# # get the education query for the new profile
# specific_profile_education = new_study_table.first()['full_education']

# # get the query for the new profile
# specific_profile_info = new_info_table.first()['information']


In [0]:
## interact with new profile from profiles table

# get the new profile id
specific_profile_id = profiles.filter(col('id')=='denise-rathburn-9138a961').select('id').first()['id']
print(specific_profile_id)

# get the experience query for the new profile
specific_profile_experience = jobs_table.filter(col('id')==specific_profile_id).first()['full_experience']

# get the education query for the new profile
specific_profile_education = study_table.filter(col('id')==specific_profile_id).first()['full_education']

# get the query for the new profile
specific_profile_info = info_table.filter(col('id')==specific_profile_id).first()['information']

In [0]:
## given table with tf-idf data, return the score of the bm25 for a specific profile

def get_bm25(joined, N, avg_doc_len, specific_profile_values):

    # Define constants k1 and b
    k1 = 2.0
    b = 0.75

    # go over the term in the profile query
    for i, term in enumerate(specific_profile_values):
        if i != specific_profile_values.index(term):
            joined = joined.withColumn(f'bm25_score_{i}', col(f'bm25_score_{specific_profile_values.index(term)}'))
        
        # if its the first time
        else:
            # filter profile with those term
            filtered_df = joined.filter(col("token") == term)

            # Calculate BM25 score for the specific term
            filtered_df = filtered_df.withColumn(f"bm25_score_{i}", log((N - filtered_df["doc_freq"] + 0.5) / 
                            (filtered_df["doc_freq"] + 0.5)) * \
                        (filtered_df["term_freq"] * (k1 + 1)) / (filtered_df["term_freq"] + k1 * (1 - b + b * filtered_df["doc_length"] / avg_doc_len)))
            
            # add to total bm25 score
            joined = joined.join(filtered_df.select("id", f"bm25_score_{i}"), "id", "left_outer")
            joined = joined.fillna(0)

    # write the total bm25 score
    joined = joined.withColumn('bm25_score', expr('+'.join([f"bm25_score_{i}"\
                                    for i in range(len(specific_profile_values))])))

    ranked_profiles = joined.select('id','bm25_score').distinct()
    ranked_profiles = ranked_profiles.orderBy(ranked_profiles['bm25_score'].desc())
    # ranked_profiles.display(10)

    return ranked_profiles

In [0]:
## rank the profiles based on experience

# calculate bm25 scores
ranked_profiles_experience = get_bm25(jobs_table_joined, N, avg_doc_len, specific_profile_experience)

# normelize the scores
max_value = ranked_profiles_experience.selectExpr("max(bm25_score)").collect()[0][0]
ranked_profiles_experience = ranked_profiles_experience.withColumn("bm25_score", col('bm25_score') / max_value)

# display and save
ranked_profiles_experience.display(10)
ranked_profiles_experience.write.csv("/mnt/lab94290/results/experience_table.csv", header=True, mode='overwrite')

In [0]:
## rank the profiles based on education

# calculate bm25 scores
ranked_profiles_education = get_bm25(study_table_joined, N, avg_doc_len, specific_profile_education)

# normelize the scores
max_value = ranked_profiles_education.selectExpr("max(bm25_score)").collect()[0][0]
ranked_profiles_education = ranked_profiles_education.withColumn("bm25_score", col('bm25_score') / max_value)

# display and save
ranked_profiles_education.display(10)
ranked_profiles_education.write.csv("/mnt/lab94290/results/education_table.csv", header=True, mode='overwrite')

In [0]:
## rank the profiles based on general information

# calculate bm25 scores
ranked_profiles_info = get_bm25(info_table_joined, N, avg_doc_len, specific_profile_info)

# normelize the scores
max_value = ranked_profiles_info.selectExpr("max(bm25_score)").collect()[0][0]
ranked_profiles_info = ranked_profiles_info.withColumn("bm25_score", col('bm25_score') / max_value)

# display and save
ranked_profiles_info.display(10)
ranked_profiles_info.write.csv("/mnt/lab94290/results/information_table.csv", header=True, mode='overwrite')

In [0]:
# file_path = "/FileStore/"
# experience = spark.read.csv(file_path+'experience_1.csv', header=True, inferSchema=True)
# education = spark.read.csv(file_path+'education_1.csv', header=True, inferSchema=True)
# information = spark.read.csv(file_path+'information_1.csv', header=True, inferSchema=True)

# Read the CSV file into a DataFrame
experience = spark.read.csv("/mnt/lab94290/results/experience_table.csv", header=True, inferSchema=True).limit(200)
education = spark.read.csv("/mnt/lab94290/results/education_table.csv", header=True, inferSchema=True).limit(200)
information = spark.read.csv("/mnt/lab94290/results/information_table.csv", header=True, inferSchema=True).limit(200)

all_score = profiles.join(experience, "id", "left_outer").withColumnRenamed('bm25_score','experience_score')
all_score = all_score.join(education, "id", "left_outer").withColumnRenamed('bm25_score','education_score')
all_score = all_score.join(information, "id", "left_outer").withColumnRenamed('bm25_score','information_score')

wanted_weights = [3,1,1]

all_score = all_score.\
        withColumn("experience_score", when(col("experience_score").isNull(), 0).otherwise(wanted_weights[0]*col("experience_score"))) \
       .withColumn("education_score", when(col("education_score").isNull(), 0).otherwise(wanted_weights[1]*col("education_score"))) \
       .withColumn("information_score", when(col("information_score").isNull(), 0).otherwise(wanted_weights[2]*col("information_score")))
score_columns = ['experience_score', 'education_score', 'information_score']
total_score = all_score.withColumn('total_score', expr('+'.join([f"{column}"\
                                     for column in score_columns])))

ranked_profiles = total_score.orderBy(total_score['total_score'].desc())
ranked_profiles.display(10)

# ranked_profiles.select('id','total_score').head(100).display()

In [0]:
## used for visualization - unrunable

# top20 = ranked_profiles.limit(20).select('id')
# top20 = top20.join(jobs_table.select('id','full_experience'), "id", "left_outer")
# top20 = top20.join(study_table.select('id','full_education'), "id", "left_outer")
# top20 = top20.join(info_table.select('id','information'), "id", "left_outer")
# mergeCols = udf(lambda x: [item for sublist in x for item in sublist], ArrayType(StringType()))
# top20 = top20.withColumn('full_data', mergeCols(array(col('full_experience'),col('full_education'),col('information'))))
# top20.display()

# top20_exploded = top20.withColumn("word", explode('full_data'))
# top20_exploded.display()

# from pyspark.sql.functions import collect_list
# word_list = top20_exploded.select(collect_list("word")).collect()[0][0]
# word_string = ", ".join(word_list)

# def removing(list, word):
#     while word in list:
#         list.remove(word)
#     return list

# text_list = removing(text_list, 'and')
# text_list = removing(text_list, 'to')
# text_list = removing(text_list, 'I')
# text_list = removing(text_list, 'a')
# text_list = removing(text_list, 'on')
# text_list = removing(text_list, 'with')
# text_list = removing(text_list, 'of')
# text_list = removing(text_list, 'that')
# text_list = removing(text_list, 'in')
# text_list = removing(text_list, 'of')
# text_list = removing(text_list, 'any')
# text_list = removing(text_list, 'the')
# text_list = removing(text_list, 'am')
# text_list = removing(text_list, 'his')
# text_list = removing(text_list, 'for')

# text_list