In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, udf, monotonically_increasing_id
from pyspark.sql.types import IntegerType, DoubleType
import random

In [2]:
spark = SparkSession.builder \
    .appName("Event Recommendation") \
    .getOrCreate()

24/06/05 09:44:49 WARN Utils: Your hostname, wanglinhans-Laptop.local resolves to a loopback address: 127.0.0.1; using 192.168.1.141 instead (on interface en0)
24/06/05 09:44:49 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/06/05 09:44:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/06/05 09:44:49 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [6]:
df = spark.read.orc("data/meetup-20240407.orc")
df.show(1)

+--------------------+--------------------+---------+--------------------+--------------------+--------------+---------------------+--------------------+--------------------+
|                 _id|               title|hosted_by|          event_time|          gmaps_link|vanue_location|vanue_location_detail|         description|              topics|
+--------------------+--------------------+---------+--------------------+--------------------+--------------+---------------------+--------------------+--------------------+
|{65f8d6b76c9cc986...|Coffee Walk & Bea...|  Mary G.|Friday, April 5, ...|https://www.googl...|    Itnig Café| C. de Pujades, 10...|🗓️ FRI, APR 5 • ...|[Coffee, Social, ...|
+--------------------+--------------------+---------+--------------------+--------------------+--------------+---------------------+--------------------+--------------------+
only showing top 1 row



In [7]:
@udf(returnType=DoubleType())
def extract_lat(gmaps_link):
    try:
        lat, lng = gmaps_link.split('query=')[1].split('%2C%20')
        return float(lat)
    except Exception as e:
        return None

# UDF to extract longitude from Google Maps link
@udf(returnType=DoubleType())
def extract_lon(gmaps_link):
    try:
        lat, lng = gmaps_link.split('query=')[1].split('%2C%20')
        return float(lng)
    except Exception as e:
        return None

@udf(returnType=IntegerType())
def random_user_id():
    import random
    return 0 if random.random() < 0.5 else random.randint(1, 20)

In [8]:
df = df.withColumn("lat", extract_lat(col("gmaps_link")))
df = df.withColumn("lon", extract_lon(col("gmaps_link")))

In [9]:
df = df.withColumn("post_id", monotonically_increasing_id() + 1) \
       .withColumn("user_id", random_user_id())

In [10]:
df = df.select(
    col("post_id"),
    col("user_id"),
    col("title"),
    col("description"),
    col("topics"),
    col("event_time"),
    col("lon"),
    col("lat")
)

In [11]:
df.limit(1).show(truncate=False)

+-------+-------+------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------------------------------------------------------+--------------------------------------------+--------+--------+
|post_id|user_id|title                                                 |description                                                                          

In [9]:
df.write.json("data/posts.json")

# Modelling

In [13]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import udf, col, row_number, explode
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, ArrayType, IntegerType
import nltk
from nltk.stem import WordNetLemmatizer
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
from nltk import pos_tag
from pyspark.ml.feature import Tokenizer, CountVectorizer
from pyspark.ml.linalg import Vectors, VectorUDT
import numpy as np
from sklearn.metrics.pairwise import cosine_similarity

  from scipy.stats import fisher_exact


In [14]:
nltk.download('punkt')
nltk.download('averaged_perceptron_tagger')
nltk.download('wordnet')
nltk.download('stopwords')

lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))
VERB_CODES = {'VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'}

[nltk_data] Downloading package punkt to
[nltk_data]     /Users/wanglinhan/nltk_data...
[nltk_data]   Unzipping tokenizers/punkt.zip.
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /Users/wanglinhan/nltk_data...
[nltk_data]   Unzipping taggers/averaged_perceptron_tagger.zip.
[nltk_data] Downloading package wordnet to
[nltk_data]     /Users/wanglinhan/nltk_data...
[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/wanglinhan/nltk_data...
[nltk_data]   Unzipping corpora/stopwords.zip.


In [15]:
def preprocess_sentences(text):
    text = text.lower()
    temp_sent = []
    words = word_tokenize(text)
    tags = pos_tag(words)
    
    for i, word in enumerate(words):
        if tags[i][1] in VERB_CODES:
            lemmatized = lemmatizer.lemmatize(word, 'v')
        else:
            lemmatized = lemmatizer.lemmatize(word)
        if lemmatized not in stop_words and lemmatized.isalpha():
            temp_sent.append(lemmatized)
    
    finalsent = ' '.join(temp_sent)
    finalsent = finalsent.replace("n't", " not")
    finalsent = finalsent.replace("'m", " am")
    finalsent = finalsent.replace("'s", " is")
    finalsent = finalsent.replace("'re", " are")
    finalsent = finalsent.replace("'ll", " will")
    finalsent = finalsent.replace("'ve", " have")
    finalsent = finalsent.replace("'d", " would")
    
    return finalsent

In [16]:
preprocess_udf = udf(preprocess_sentences, StringType())

In [17]:
df = df.withColumn("description_proc", preprocess_udf(df["description"]))

In [18]:
tokenizer = Tokenizer(inputCol="description_proc", outputCol="description_proc_tokens")
df = tokenizer.transform(df)

In [19]:
cv = CountVectorizer(inputCol="description_proc_tokens", outputCol="features")
cv_model = cv.fit(df)
df = cv_model.transform(df)

  from scipy.stats import fisher_exact
                                                                                

In [20]:
def sparse_to_dense(sparse_vector):
    return Vectors.dense(sparse_vector.toArray())

In [21]:
sparse_to_dense_udf = udf(sparse_to_dense, VectorUDT())
df = df.withColumn("dense_features", sparse_to_dense_udf(df["features"]))

In [22]:
dense_features = df.select("dense_features").collect()
dense_feature_array = np.array([row["dense_features"].toArray() for row in dense_features])
cosine_sim = cosine_similarity(dense_feature_array)

                                                                                

In [23]:
cosine_sim_broadcast = spark.sparkContext.broadcast(cosine_sim)

In [24]:
post_id_mapping = {row["post_id"]: idx for idx, row in enumerate(df.select("post_id").collect())}
post_id_mapping_broadcast = spark.sparkContext.broadcast(post_id_mapping)

In [25]:
window_spec = Window.partitionBy("user_id").orderBy(col("post_id").desc())
df = df.withColumn("rank", row_number().over(window_spec))
last_posts_df = df.filter(col("rank") == 1).select("post_id", "user_id")

In [26]:
def get_recommendations(post_id, num_recommendations=5):
    post_idx = post_id_mapping_broadcast.value.get(post_id, None)
    if post_idx is None:
        return []
    
    sim_scores = cosine_sim_broadcast.value[post_idx]
    sim_scores = list(enumerate(sim_scores))
    sim_scores = sorted(sim_scores, key=lambda x: x[1], reverse=True)
    sim_scores = [post_id_mapping_broadcast.value.get(idx, None) for idx, score in sim_scores if idx != post_idx][:num_recommendations]
    return [pid for pid in sim_scores if pid is not None]

In [27]:
get_recommendations_udf = udf(lambda post_id: get_recommendations(post_id), ArrayType(IntegerType()))

In [28]:
last_posts_df = last_posts_df.withColumn("recommendations", get_recommendations_udf(col("post_id")))

In [29]:
recommendations_df = last_posts_df.select(col("user_id"), col("post_id").alias("last_post_id"), col("recommendations"))
recommendations_df = recommendations_df.withColumn("recommendation_post_id", explode(col("recommendations")))

In [30]:
recommendations_df = recommendations_df.withColumn("recommendation_id", row_number().over(Window.orderBy(monotonically_increasing_id())))

In [31]:
final_df = recommendations_df.select(col("recommendation_id"), col("user_id"), col("recommendation_post_id").alias("post_id"))

In [32]:
final_df.show(10, truncate=False)

24/06/05 09:48:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/05 09:48:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/05 09:48:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/05 09:48:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/05 09:48:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/05 09:48:47 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


+-----------------+-------+-------+
|recommendation_id|user_id|post_id|
+-----------------+-------+-------+
|1                |0      |51     |
|2                |0      |20     |
|3                |0      |45     |
|4                |0      |65     |
|5                |0      |35     |
|6                |1      |142    |
|7                |1      |143    |
|8                |1      |144    |
|9                |1      |145    |
|10               |1      |102    |
+-----------------+-------+-------+
only showing top 10 rows



# Check

In [33]:
post_ids_to_filter = [141, 143, 144, 145, 102] 

filtered_df = df.filter(df.post_id.isin(post_ids_to_filter)).select("post_id", "user_id", "title", "description")
filtered_df.show(truncate=False)

+-------+-------+----------------------------------------------------------------+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|post_id|user_id|title                           

# Export to CSV

In [32]:
final_df.write.csv("data/recommendations.csv", header=True)

24/06/04 23:39:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/04 23:39:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/04 23:39:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/04 23:39:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/04 23:39:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/04 23:39:23 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/06/04 2