In [0]:
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
import pandas as pd
from pyspark.ml.feature import CountVectorizer, Tokenizer as Tokenizer_feature, StringIndexer, VectorAssembler, OneHotEncoder, Word2Vec, HashingTF, IndexToString
from pyspark.ml.linalg import SparseVector, Vectors
import numpy as np
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.sql.window import Window
from datetime import datetime
import re
import matplotlib.pyplot as plt
from pyspark.sql.functions import col, concat_ws, udf, lit
from pyspark.sql.types import StringType
from pyspark.sql import functions as F
from pyspark.sql.functions import broadcast
from pyspark.ml.functions import vector_to_array
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number
from huggingface_hub import InferenceClient
from huggingface_hub import login
import time
from pyspark.ml.linalg import VectorUDT, DenseVector
import numpy as np
import sparknlp
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.types import ArrayType, FloatType
from pyspark.sql.functions import udf
from sparknlp.base import DocumentAssembler, Finisher
from sparknlp.annotator import Tokenizer, StopWordsCleaner, WordEmbeddingsModel, SentenceEmbeddings, BertEmbeddings, Word2VecModel
from pyspark.ml.classification import MultilayerPerceptronClassificationModel

pd.set_option('display.max_columns', None)
pd.set_option('display.width', 1000)

spark = SparkSession.builder.getOrCreate()

### Pre process good profile data

In [0]:
def strip_and_choose_first(str_lst):
    return str_lst.strip("[]").split(", ")[0]


def process_education(degree, field, title):
    # Extract degree, field, and school title from each education entry
    degree = strip_and_choose_first(degree)
    field = strip_and_choose_first(field)
    title = strip_and_choose_first(title)
    edu_details = f"{degree} in {field} from {title}"
    return edu_details


def preprocess_profiles(df):
    """
    df: profiles dataframe, such that name, id, city, country_code, experience, position are in the correct format
    returns pre processed dataframe.
    """
    jobs = df.select('name', 'id', 'city', 'country_code', f.col('experience')[0].getField('title').alias('job_title'), 'position')
    process_education_udf = udf(process_education, StringType())
    job_titles_df = jobs.select(
        f.when(f.col('job_title').isNotNull(), f.lower(f.col('job_title')))
        .otherwise(f.when(f.col('position').isNotNull(), f.lower(f.col('position'))).otherwise(f.lit('')))
        .alias('processed_title'), 'id'
    )

    df = df.join(job_titles_df, on='id')
    edu_filtered_df = df.filter((col("education").isNotNull()) & (col("education") != f.lit([])))
    no_edu_df = df.filter((col("education").isNull()) | (col("education") == f.lit([])))

    filtered_df = edu_filtered_df.withColumn('degree', col('education').getField('degree').cast('string'))
    filtered_df = filtered_df.withColumn('field', col('education').getField('field').cast('string'))
    filtered_df = filtered_df.withColumn('school', col('education').getField('title').cast('string'))

    # Process the DataFrame
    edu_filtered_df = filtered_df.withColumn("processed_education", 
                                            process_education_udf(col('degree'), col('field'), col('school')))
    no_edu_df = no_edu_df.withColumn("processed_education", lit(''))
    edu_filtered_df = edu_filtered_df.select(['id', 'processed_education', 'processed_title', 'name','city'])
    no_edu_df = no_edu_df.select(['id', 'processed_education', 'processed_title', 'name','city'])
    df = edu_filtered_df.union(no_edu_df)
    return df 

def generate_small_good_sample():
    profiles_with_scores = spark.read.parquet("/Workspace/Users/lihi.kaspi@campus.technion.ac.il/user_profiles_with_scores.parquet")
    profiles = spark.read.parquet('/dbfs/linkedin_people_train_data')
    profiles_with_scores = profiles_with_scores.withColumn(
        'label', 
        f.when(f.col('profile_score') < 5, 0
        ).when(f.col('profile_score') < 10, 1
        ).when(f.col('profile_score') < 15, 2
        ).when(f.col('profile_score') < 20, 3
        ).otherwise(4)
    )
    df = preprocess_profiles(profiles_with_scores)
    df = df.join(profiles_with_scores, on='id')
    good_profiles_df = df.filter(col('label').isin([3,4])).select(['id','processed_education','processed_title', 'about'])
    good_profiles_df = good_profiles_df.limit(10000)
    good_profiles_df.write.mode("overwrite").parquet("/Workspace/Users/lihi.kaspi@campus.technion.ac.il/sample_good_profile_data.parquet")
generate_small_good_sample()

In [0]:
def df_to_vector(df, good_profiles_df):
    tokenizer_title = Tokenizer_feature(inputCol="processed_title", outputCol="tokened_title")
    w2v_title = Word2Vec(inputCol="tokened_title", outputCol="vector_title", vectorSize=200, minCount=1)

    tokenizer_edu = Tokenizer_feature(inputCol="processed_education", outputCol="tokened_edu")
    w2v_edu = Word2Vec(inputCol="tokened_edu", outputCol="vector_edu", vectorSize=200, minCount=1)

    pipeline = Pipeline(stages=[tokenizer_title, w2v_title, tokenizer_edu, w2v_edu])

    model_vectorize = pipeline.fit(df)

    # Create embeddings for job titles and centroids
    df_with_vectors = model_vectorize.transform(df)
    good_with_vectors = model_vectorize.transform(good_profiles_df)
    return df_with_vectors, good_with_vectors

In [0]:
def cross_dfs(df_with_vectors, good_with_vectors):
    profiles = df_with_vectors.withColumnRenamed("vector_title", "pos_embed") \
                            .withColumnRenamed("vector_edu", "edu_embed") \
                            .withColumnRenamed("id", "profiles_id")

    good_profiles = good_with_vectors.withColumnRenamed("vector_title", "pos_embed_good") \
                                    .withColumnRenamed("vector_edu", "edu_embed_good") \
                                    .withColumnRenamed("id", "good_profile_id")

    good_profiles = good_profiles.select(["good_profile_id", "pos_embed_good", "edu_embed_good"])
    profiles = profiles.select(["profiles_id", "pos_embed", "edu_embed"])

    profiles = profiles.withColumn("edu_embed", vector_to_array(col("edu_embed")))
    profiles = profiles.withColumn("pos_embed", vector_to_array(col("pos_embed")))

    good_profiles = good_profiles.withColumn("edu_embed_good", vector_to_array(col("edu_embed_good")))
    good_profiles = good_profiles.withColumn("pos_embed_good", vector_to_array(col("pos_embed_good")))

    good_profiles_broadcast = broadcast(good_profiles)

    profiles_cross = profiles.join(good_profiles_broadcast, how="inner")
    return profiles_cross

In [0]:

def dot_product(vec1, vec2):
    return F.expr(f"""
        aggregate(transform({vec1}, (x, i) -> x * {vec2}[i]), 0D, (acc, x) -> acc + x)
    """)

def vector_norm(vec):
    return F.sqrt(F.expr(f"aggregate(transform({vec}, x -> x * x), 0D, (acc, x) -> acc + x)"))
def compute_sim(profiles_cross):
    edu_dot_product = dot_product("edu_embed", "edu_embed_good")
    pos_dot_product = dot_product("pos_embed", "pos_embed_good")


    edu_norm_profile = vector_norm("edu_embed")
    edu_norm_good = vector_norm("edu_embed_good")

    pos_norm_profile = vector_norm("pos_embed")
    pos_norm_good = vector_norm("pos_embed_good")

    profiles_cross = profiles_cross.withColumn(
        "edu_sim", edu_dot_product / (edu_norm_profile * edu_norm_good)
    ).withColumn(
        "pos_sim", pos_dot_product / (pos_norm_profile * pos_norm_good)
    ).withColumn(
        "total_sim", F.col("edu_sim") + F.col("pos_sim")
    )
    return profiles_cross

In [0]:

def get_best_matches(profiles_cross):
    # order by highest similarity
    window_spec = Window.partitionBy("profiles_id").orderBy(col("total_sim").desc())
    # Rank the matches and filter to keep only the best match per profile
    best_matches = profiles_cross.withColumn("rank", row_number().over(window_spec)).filter(col("rank") == 1)

    best_matches = best_matches.select(
        col("profiles_id"),
        col("good_profile_id").alias("matched_good_profile_id"),
        col("total_sim")
    )
    return best_matches

In [0]:
def get_match_df(best_matches):
    profiles_with_scores = spark.read.parquet("/Workspace/Users/lihi.kaspi@campus.technion.ac.il/user_profiles_with_scores.parquet")
    profiles_with_scores = profiles_with_scores.withColumn(
        'label', 
        f.when(f.col('profile_score') < 5, 0
        ).when(f.col('profile_score') < 10, 1
        ).when(f.col('profile_score') < 15, 2
        ).when(f.col('profile_score') < 20, 3
        ).otherwise(4)
    )
    good_profiles = profiles_with_scores.filter(col('label').isin([3,4])).select(['id','about']).dropna().withColumnRenamed('id', "matched_good_profile_id")

    match_df = best_matches.join(good_profiles, on="matched_good_profile_id")
    return match_df

In [0]:
def generate_sections(bad_profile_df, match_df):
    access_token = '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'
    login(access_token)


    df = match_df.withColumnRenamed('profiles_id', 'id').join(bad_profile_df.withColumnRenamed('some_column_name', 'id'), on="id")
    pd_df = df.toPandas()
    
    def create_section(user_data, procesed_edu, city, name, proccesed_title):
        client = InferenceClient(token=access_token)
        input_prompt = f"This is an about section of a user similar to me:{user_data}. build an about section for me. my name is {name}, I live in {city}. my education details are {procesed_edu} and my job title is {proccesed_title}.  Do not use things like [Assuming a similar role as Fleet Account Manager based on Josh's profession] Business Development Specialist at [Assuming a company similar to Knapheide Manufacturing], it should look like a real about section"
        completion = client.text_generation(
            model="mistralai/Mistral-7B-Instruct-v0.3", 
            prompt=input_prompt, 
            max_new_tokens=500
        )
        return completion
    i = 0
    abouts = []
    for _, row in pd_df.iterrows():
        print(i)
        i+=1
        user_data = row['about']
        name  = row['name']
        city = row['city']
        proccesed_edu = row['processed_education']
        proccesed_title = row['processed_title']
        completion = create_section(user_data, proccesed_edu, city, name, proccesed_title)
        time.sleep(2)
        abouts.append((row["id"],completion))
        if i == 400:
            break
    generated_abouts_df = spark.createDataFrame(abouts, ["id", "about"])

    return generated_abouts_df

In [0]:
def optimize_profiles(df,good_profiles=None):
    """
    Optimize the profiles based on the good profiles provided.
    If no good_profiles are provided, default to using the sample good profiles we defined.
    """
    if good_profiles is None:
        good_profiles = spark.read.parquet("/Workspace/Users/lihi.kaspi@campus.technion.ac.il/sample_good_profile_data.parquet")
    df = preprocess_profiles(df)
    df_vector, good_profiles_vector = df_to_vector(df, good_profiles)
    profiles_cross = cross_dfs(df_vector, good_profiles_vector)
    sim_cross = compute_sim(profiles_cross)
    best_matches = get_best_matches(sim_cross)
    match_df = get_match_df(best_matches)
    gen_df = generate_sections(df, match_df)
    return gen_df
profiles = spark.read.parquet('/dbfs/linkedin_people_train_data')
test_df = profiles.limit(10)
test_df.display()   
gen_about_df = optimize_profiles(test_df)
gen_about_df.display()

In [0]:
gen_about_df = gen_about_df.withColumnRenamed('about', 'about_after')

In [0]:
profiles = spark.read.parquet('/dbfs/linkedin_people_train_data')
profiles = gen_about_df.join(profiles, on="id")
profiles = profiles.withColumn("about_position_after", f.concat_ws(" ", f.col("about_after"), f.col("position")))
profiles = profiles.withColumn("about_position_before", f.concat_ws(" ", f.col("about"), f.col("position")))


In [0]:
# Preprocess `about` using Spark NLP
document_assembler = DocumentAssembler() \
    .setInputCol("about_position_after") \
    .setOutputCol("ap_document")

tokenizer = Tokenizer() \
    .setInputCols(["ap_document"]) \
    .setOutputCol("ap_token")

stopwords_cleaner = StopWordsCleaner() \
    .setInputCols(["ap_token"]) \
    .setOutputCol("ap_clean_tokens")

embeddings = BertEmbeddings.pretrained("small_bert_L2_128") \
    .setInputCols(["ap_document", "ap_clean_tokens"]) \
    .setOutputCol("ap_embeddings_bert")

sentence_embeddings = SentenceEmbeddings() \
    .setInputCols(["ap_document", "ap_embeddings_bert"]) \
    .setOutputCol("about_position_embeddings")

nlp_pipeline_about = Pipeline(stages=[document_assembler, tokenizer, stopwords_cleaner, embeddings, sentence_embeddings])

# Apply NLP Pipeline
nlp_model_about = nlp_pipeline_about.fit(profiles)
processed_data = nlp_model_about.transform(profiles)
display(processed_data.limit(100))

In [0]:
model_path = 'dbfs:/Workspace/Users/lihi.kaspi@campus.technion.ac.il/mlp_model'
model = MultilayerPerceptronClassificationModel.load(model_path)

In [0]:
# Numerical Features
processed_data = processed_data \
    .withColumn("num_education", f.when(f.size(f.col('education')).isNull(), 0).otherwise(f.size(f.col('education')))) \
    .withColumn("num_experience", f.when(f.size(f.col('experience')).isNull(), 0).otherwise(f.size(f.col('experience')))) \
    .withColumn("num_languages", f.when(f.size(f.col('languages')).isNull(), 0).otherwise(f.size(f.col('languages')))) \
    .withColumn("total_followers", f.when(f.col("followers").isNull(), 0).otherwise(f.col("followers"))) \
    .withColumn("num_recommendations", f.when(f.col("recommendations_count").isNull(), 0).otherwise(f.col("recommendations_count")))

In [0]:
def to_dense_vector(embeddings_array):
    return Vectors.dense(embeddings_array)

to_dense_udf = udf(lambda x: to_dense_vector(x), VectorUDT())

processed_data = processed_data.withColumn(
    "about_position_embeddings_dense", 
    to_dense_udf(f.expr("about_position_embeddings.embeddings[0]"))
)

# Assemble features
assembler = VectorAssembler(inputCols=[
    "about_position_embeddings_dense", "num_education", "num_experience", "num_languages",
    "total_followers", "num_recommendations",
], outputCol="features")

final_data = assembler.transform(processed_data)

final_data = final_data.select('id', "features")

display(final_data)

In [0]:
mlp_predictions = model.transform(final_data)
mlp_predictions = mlp_predictions.withColumn('prediction', f.when(f.col("prediction") < 2, 2).otherwise(f.col("prediction")))\
    .withColumn('initial_prediction', f.lit(2))

In [0]:
sample = mlp_predictions.select('prediction', 'initial_prediction').toPandas()

category_mapping = {0: 'bad', 1: 'below average', 2: 'average', 3: 'above average', 4: 'good'}

sample['prediction_category'] = sample['prediction'].map(category_mapping)
sample['initial_prediction_category'] = sample['initial_prediction'].map(category_mapping)

initial_category_counts = sample['initial_prediction_category'].value_counts().reindex(category_mapping.values(), fill_value=0)
pred_category_counts = sample['prediction_category'].value_counts().reindex(category_mapping.values(), fill_value=0)
x = np.arange(len(category_mapping))
bar_width = 0.4

plt.figure(figsize=(8, 6))
plt.bar(x - bar_width/2, initial_category_counts.values, color='#a2d5f2', width=bar_width, label='Before', edgecolor='black')
plt.bar(x + bar_width/2, pred_category_counts.values, color='#f2aac7', width=bar_width, label='After', edgecolor='black')
plt.title('Histogram of Profile Scores Before and After Optimization')
plt.xlabel('Profile Score')
plt.ylabel('Frequency')
plt.xticks(x, category_mapping.values())
plt.legend()
plt.grid(True)
plt.show()

In [0]:
data_with_pred = processed_data.join(mlp_predictions, on='id')

In [0]:
display(data_with_pred.select("id", 'about', 'about_after', 'prediction'))