In [9]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, split, udf, regexp_replace, lit, from_unixtime
from pyspark.sql.types import ArrayType, StringType, StructType, StructField, IntegerType, StringType, MapType
from pyspark.sql.functions import split
from pyspark.sql.functions import udf
from pyspark.sql.types import ArrayType, MapType, StringType
from pyspark.sql.functions import explode


from pyspark.sql.functions import split, explode, regexp_extract, col, collect_list, udf, broadcast
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, DoubleType, FloatType
from pyspark.ml.linalg import VectorUDT, Vectors
import numpy as np
import os

In [10]:
# Stop existing Spark context if any
try:
    spark.stop()
except NameError:
    pass

# Reinitialize Spark session
spark = SparkSession.builder \
    .appName("BehaviorsProcessing") \
    .config("spark.executor.memory", "8g") \
    .config("spark.driver.memory", "8g") \
    .getOrCreate()


In [11]:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"  # Replace with your Java path
os.environ["SPARK_HOME"] = "/path/to/spark"  # Replace with your Spark installation path
os.environ["PYSPARK_PYTHON"] = "/usr/bin/python3"  # Replace with your Python path
os.environ["PYSPARK_DRIVER_PYTHON"] = "jupyter"

In [12]:
# Define the schema
behaviors_schema = StructType([
    StructField("ImpressionID", StringType(), True),
    StructField("UserID", StringType(), True),
    StructField("Time", StringType(), True),
    StructField("History", StringType(), True),
    StructField("Impressions", StringType(), True)
])

# Load the behaviors.tsv file
behaviors_df = spark.read.csv(
    "data/mind/MINDsmall_train/behaviors.tsv",
    sep="\t",
    schema=behaviors_schema,
    header=False
)

# Display the schema and a sample row
behaviors_df.printSchema()
behaviors_df.show(1, truncate=False)

root
 |-- ImpressionID: string (nullable = true)
 |-- UserID: string (nullable = true)
 |-- Time: string (nullable = true)
 |-- History: string (nullable = true)
 |-- Impressions: string (nullable = true)

+------------+------+---------------------+--------------------------------------------------------------+-----------------+
|ImpressionID|UserID|Time                 |History                                                       |Impressions      |
+------------+------+---------------------+--------------------------------------------------------------+-----------------+
|1           |U13740|11/11/2019 9:05:58 AM|N55189 N42782 N34694 N45794 N18445 N63302 N10414 N19347 N31801|N55689-1 N35729-0|
+------------+------+---------------------+--------------------------------------------------------------+-----------------+
only showing top 1 row



In [13]:
# Split History into an array
behaviors_df = behaviors_df.withColumn("HistoryList", split(col("History"), " "))
behaviors_df = behaviors_df.drop("History")  # Drop original History column if not needed

# Verify the transformation
behaviors_df.select("ImpressionID", "UserID", "HistoryList").show(1, truncate=False)

+------------+------+------------------------------------------------------------------------+
|ImpressionID|UserID|HistoryList                                                             |
+------------+------+------------------------------------------------------------------------+
|1           |U13740|[N55189, N42782, N34694, N45794, N18445, N63302, N10414, N19347, N31801]|
+------------+------+------------------------------------------------------------------------+
only showing top 1 row



In [14]:
# Split Impressions into an array
behaviors_df = behaviors_df.withColumn("ImpressionsList", split(col("Impressions"), " "))
behaviors_df = behaviors_df.drop("Impressions")  # Drop original Impressions column if not needed

# Verify the transformation
behaviors_df.select("ImpressionID", "ImpressionsList").show(1, truncate=False)

+------------+--------------------+
|ImpressionID|ImpressionsList     |
+------------+--------------------+
|1           |[N55689-1, N35729-0]|
+------------+--------------------+
only showing top 1 row



In [15]:
# Explode ImpressionsList
impressions_exploded = behaviors_df.select(
    "ImpressionID",
    "UserID",
    "Time",
    "HistoryList",
    explode("ImpressionsList").alias("ImpressionItem")
)

# Extract CandidateNewsID and ClickLabel using regex
impressions_exploded = impressions_exploded \
    .withColumn("CandidateNewsID", regexp_extract(col("ImpressionItem"), r"^(N\d+)-\d+$", 1)) \
    .withColumn("ClickLabel", regexp_extract(col("ImpressionItem"), r"^N\d+-(\d+)$", 1).cast("integer")) \
    .drop("ImpressionItem")

# Verify the transformation
impressions_exploded.select("ImpressionID", "UserID", "CandidateNewsID", "ClickLabel").show(5, truncate=False)

+------------+------+---------------+----------+
|ImpressionID|UserID|CandidateNewsID|ClickLabel|
+------------+------+---------------+----------+
|1           |U13740|N55689         |1         |
|1           |U13740|N35729         |0         |
|2           |U91836|N20678         |0         |
|2           |U91836|N39317         |0         |
|2           |U91836|N58114         |0         |
+------------+------+---------------+----------+
only showing top 5 rows



In [16]:
news_tfidf_path = "news_tfidf.parquet"
news_features_df = spark.read.parquet(news_tfidf_path)

In [17]:
news_features_df.show()

+------+--------------------+
|NewsID|       TFIDFFeatures|
+------+--------------------+
| N5727|(10000,[1,9,87,90...|
|N25908|(10000,[165,280,4...|
| N2490|(10000,[1,218,262...|
|  N192|(10000,[71,2006,2...|
| N1298|(10000,[3,26,42,4...|
|N57313|(10000,[36,61,109...|
|N36185|(10000,[6,21,74,8...|
|N33743|(10000,[2,13,16,3...|
|N58255|(10000,[0,10,14,3...|
|N44291|(10000,[0,4,15,84...|
|N38233|(10000,[5,7,11,69...|
| N1970|(10000,[4,26,30,3...|
|N41692|(10000,[4,12,31,1...|
|N31209|(10000,[4,34,59,1...|
|N60452|(10000,[154,159,2...|
|N22043|(10000,[36,119,25...|
|N30368|(10000,[45,815,12...|
| N4233|(10000,[11,90,121...|
|N51387|(10000,[111,865,1...|
|N22126|(10000,[4,18,34,3...|
+------+--------------------+
only showing top 20 rows



In [18]:
# Join impressions with news_features_df on CandidateNewsID
impressions_with_features = impressions_exploded.join(
    news_features_df,
    impressions_exploded.CandidateNewsID == news_features_df.NewsID,
    how="left"
).drop(news_features_df.NewsID)  # Drop duplicate NewsID column if present


In [19]:
# Verify the join
impressions_with_features.select("ImpressionID", "UserID", "CandidateNewsID", "ClickLabel", "TFIDFFeatures").show(1, truncate=True)

                                                                                

+------------+------+---------------+----------+--------------------+
|ImpressionID|UserID|CandidateNewsID|ClickLabel|       TFIDFFeatures|
+------------+------+---------------+----------+--------------------+
|           1|U13740|         N55689|         1|(10000,[8,30,51,9...|
+------------+------+---------------+----------+--------------------+
only showing top 1 row



In [20]:
# Filter records where ClickLabel == 1
clicked_news_df = impressions_with_features.filter(col("ClickLabel") == 1)

# Verify the filtered DataFrame
clicked_news_df.show(5, truncate=True)

+------------+------+--------------------+--------------------+---------------+----------+--------------------+
|ImpressionID|UserID|                Time|         HistoryList|CandidateNewsID|ClickLabel|       TFIDFFeatures|
+------------+------+--------------------+--------------------+---------------+----------+--------------------+
|           1|U13740|11/11/2019 9:05:5...|[N55189, N42782, ...|         N55689|         1|(10000,[8,30,51,9...|
|           2|U91836|11/12/2019 6:11:3...|[N31739, N6072, N...|         N17059|         1|(10000,[25,38,108...|
|           3|U73700|11/14/2019 7:01:4...|[N10732, N25792, ...|         N23814|         1|(10000,[43,100,14...|
|           4|U34670|11/11/2019 5:28:0...|[N45729, N2203, N...|         N49685|         1|(10000,[128,170,2...|
|           5| U8125|11/12/2019 4:11:2...|[N10078, N56514, ...|          N8400|         1|(10000,[1,63,79,1...|
+------------+------+--------------------+--------------------+---------------+----------+--------------

In [21]:
from pyspark.sql.functions import avg, collect_list, udf
from pyspark.ml.linalg import Vectors, VectorUDT
from pyspark.sql.functions import size

# Define a UDF to average a list of SparseVectors or DenseVectors
def average_vectors(sum_vec, count):
    if count == 0:  # Avoid division by zero
        return Vectors.dense([0.0] * len(sum_vec[0]))  # Handle empty vector gracefully
    return Vectors.dense([sum(x) / count for x in zip(*[v.toArray() for v in sum_vec])])

avg_vector_udf = udf(average_vectors, VectorUDT())

# Filter only clicked news to build user profiles
clicked_news_df = impressions_with_features.filter(col("ClickLabel") == 1)

# Collect clicked TF-IDF vectors by user
user_profiles = clicked_news_df.groupBy("UserID").agg(
    collect_list("TFIDFFeatures").alias("UserVectors"),
    size(collect_list("TFIDFFeatures")).alias("CountVectors")
)
# Compute the averaged user profile vector
user_profiles = user_profiles.withColumn(
    "UserProfile",
    avg_vector_udf("UserVectors", "CountVectors")
).drop("UserVectors", "CountVectors")

In [22]:
user_profiles.show(1)

[Stage 13:>                                                         (0 + 1) / 1]

+------+--------------------+
|UserID|         UserProfile|
+------+--------------------+
|U10022|[0.53004950030122...|
+------+--------------------+
only showing top 1 row



                                                                                

In [8]:
def cosine_similarity_udf(u_vec, i_vec):
    # Convert Spark vectors to numpy
    u_arr = u_vec.toArray()
    i_arr = i_vec.toArray()
    sim = np.dot(u_arr, i_arr) / (np.linalg.norm(u_arr) * np.linalg.norm(i_arr))
    return float(sim)

cosine_udf = udf(cosine_similarity_udf, FloatType())

# Cross join user_profiles with news_features_df (Be aware: this can be huge for large datasets!)
user_recs = user_profiles.crossJoin(broadcast(news_features_df)) \
                         .withColumn("similarity", cosine_udf(col("UserProfile"), col("TFIDFFeatures")))
user_recs.show()

NameError: name 'user_profiles' is not defined

In [None]:
#user_recs.show(2)
!pip show py4j

In [None]:
from pyspark.sql import Window
import pyspark.sql.functions as F

# Define a window partitioned by UserID and ordered by descending similarity
windowSpec = Window.partitionBy("UserID").orderBy(F.desc("similarity"))

# Select top-N (for example, top-10) articles per user
top_n = user_recs.withColumn("rank", F.row_number().over(windowSpec)) \
                 .filter(F.col("rank") <= 10) \
                 .select("UserID", "NewsID", "similarity")

In [None]:
top_n.show(1)