In [3]:
from pyspark.sql import SparkSession
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml.linalg import Vectors
from pyspark.sql.functions import col, desc, max as max_fn, min as min_fn
from pyspark.sql.functions import udf
from pyspark.sql.types import DoubleType

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [4]:
spark = SparkSession.builder.appName("recsys").getOrCreate()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [5]:
bucket = "recsys-aws"
key_ads_content_prefix = "silver_data/ads_content_transformed/"
key_user_ad_matrix_prefix = "silver_data/user_ad_matrix/"

ads_content_df = spark.read.option("multiline", "true").csv(
    f"s3://{bucket}/{key_ads_content_prefix}",
    header=True,
    inferSchema=True,
    quote='"', 
    escape='"'
)
user_ad_interactions_df = spark.read.csv(f"s3://{bucket}/{key_user_ad_matrix_prefix}",
                                         header=True,
                                         inferSchema=True)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [6]:
ads_content_df.show(5)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+--------------------+--------+--------------------+--------------------+
|                adid|               title|         description|category|                tags|    content_combined|
+--------------------+--------------------+--------------------+--------+--------------------+--------------------+
|6fd47814-93f5-47f...|Stand-alone solut...|Specific argue yo...|    Tech|     plan,same,start|Stand-alone solut...|
|acceb9fa-4711-4ef...|Managed secondary...|State they least ...| Fashion|audience,single,p...|Managed secondary...|
|4f2a159b-606f-4e2...|Extended high-lev...|Important goal te...| Fashion|identify,relation...|Extended high-lev...|
|1601cd49-e527-4df...|Public-key non-vo...|Carry together wh...|  Travel| campaign,stuff,over|Public-key non-vo...|
|446bccdd-5984-4da...|Phased non-volati...|Pay other door al...|    Tech|  dog,thousand,seven|Phased non-volati...|
+--------------------+--------------------+--------------------+--------

In [7]:
tokenizer = Tokenizer(inputCol="content_combined", outputCol="words")
ads_content_df = tokenizer.transform(ads_content_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
tf = hashingTF.transform(ads_content_df)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [10]:
idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(tf)
tfidf_matrix = idfModel.transform(tf)
tfidf_matrix.cache()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

DataFrame[adid: string, title: string, description: string, category: string, tags: string, content_combined: string, words: array<string>, rawFeatures: vector, features: vector]

In [11]:
def get_user_profile(user_id, interactions_df, tfidf_matrix):
    user_interactions = interactions_df.filter(col("userId") == user_id).collect()[0].asDict()
    del user_interactions["userId"]
    interacted_ads = [ad for ad, score in user_interactions.items() if float(score) > 0]
    user_tfidf = tfidf_matrix.filter(col("adId").isin(interacted_ads)) 
    avg_vector = Vectors.dense([0] * tfidf_matrix.select("features").limit(1).collect()[0].features.size)
    for row in user_tfidf.collect():
        avg_vector += row.features
    avg_vector /= len(interacted_ads)
    return avg_vector

def content_based_recommendation(user_id, user_interactions_df, tfidf_matrix, top_n=10):
    user_profile = get_user_profile(user_id, user_interactions_df, tfidf_matrix)
    if user_profile is None:
        return None
    dot_product = udf(lambda x: float(x.dot(user_profile)), DoubleType())
    similarity_scores = tfidf_matrix.withColumn("similarity_score", dot_product(col("features")))
    max_score = similarity_scores.agg(max_fn("similarity_score")).collect()[0][0]
    min_score = similarity_scores.agg(min_fn("similarity_score")).collect()[0][0]
    normalize = udf(lambda x: (x - min_score) / (max_score - min_score) if max_score != min_score else 0.5, DoubleType())
    similarity_scores = similarity_scores.withColumn("normalized_similarity", normalize(col("similarity_score")))
    top_ads = (similarity_scores
               .orderBy(desc("normalized_similarity"))
               .limit(top_n)
               .select("adId", "normalized_similarity"))
    return top_ads

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
user_id_to_recommend = "4f3aecdc-f7d8-4718-925c-96d81c3765f3"
recommended_ads_df = content_based_recommendation(user_id_to_recommend, user_ad_interactions_df, tfidf_matrix)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [13]:
recommended_ads_df.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---------------------+
|                adId|normalized_similarity|
+--------------------+---------------------+
|a4596d16-e59d-40c...|                  1.0|
|023d7aba-d5df-459...|   0.9212657713315731|
|bc4d3e39-7e06-4c3...|   0.8916957595411507|
|d7f30c52-c3c5-410...|  0.15562010244980043|
|e123eaef-e554-46a...|  0.14181959445060083|
|60e6c683-e7da-4c5...|  0.13230808450847303|
|1601cd49-e527-4df...|  0.13169043193727614|
|f8931240-4505-4d3...|  0.12726839598649586|
|9b8b83b2-36da-4ce...|   0.1268438520136554|
|9f15ae6f-2c64-4d9...|  0.11723234333812588|
+--------------------+---------------------+