In [1]:
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import matplotlib.pyplot as plt
from pyspark.sql.types import *
import pandas as pd

conf = SparkConf().set("spark.jars.packages", "com.linkedin.sparktfrecord:spark-tfrecord_2.12:0.6.0")
sc = SparkContext(conf=conf)
spark = SparkSession(sc)
print(f'The PySpark {spark.version} version is running...')

:: loading settings :: url = jar:file:/opt/homebrew/lib/python3.11/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /Users/dhaneshbaalaji/.ivy2/cache
The jars for the packages stored in: /Users/dhaneshbaalaji/.ivy2/jars
com.linkedin.sparktfrecord#spark-tfrecord_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1f88d641-4ac1-4e3f-b468-67c9a3d74c78;1.0
	confs: [default]
	found com.linkedin.sparktfrecord#spark-tfrecord_2.12;0.6.0 in central
:: resolution report :: resolve 73ms :: artifacts dl 1ms
	:: modules in use:
	com.linkedin.sparktfrecord#spark-tfrecord_2.12;0.6.0 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------------------------------------
	|      default     |   1   |   0   |   0   |   0   ||   1   |   0   |
	---------------------------------------------------------------------
:: retrieving :: org.a

The PySpark 3.5.0 version is running...


In [2]:
# Directory containing TFRecord files and the JSON file
tfrecord_dir = "my_dataset/*"

reddit_data = spark.read.format("tfrecord") \
    .option("recordType", "Example") \
    .option("pathGlobFilter", "*.tfrecord-*") \
    .load(tfrecord_dir)

reddit_data.printSchema()

                                                                                

root
 |-- normalizedBody: string (nullable = true)
 |-- subreddit_id: string (nullable = true)
 |-- subreddit: string (nullable = true)
 |-- summary: string (nullable = true)
 |-- body: string (nullable = true)
 |-- content: string (nullable = true)
 |-- author: string (nullable = true)
 |-- id: string (nullable = true)



### Using Summary field of the dataset to do Content matching based on user input

#### Pipeline: TF IDF Vectorization + Cosine Similarity

In [3]:
# All Imports
from pyspark.ml.feature import Tokenizer, StopWordsRemover
from pyspark.sql.functions import udf, col, expr
from pyspark.sql.types import IntegerType, FloatType
from pyspark.ml.feature import HashingTF, IDF
from pyspark.ml.linalg import SparseVector
import numpy as np

# Text Normalization - Convert text to lowercase and remove punctuation
reddit_data = reddit_data.withColumn("summary", lower(col("summary")))
reddit_data = reddit_data.withColumn("summary", regexp_replace(col("summary"), "[^a-zA-Z\\s]", ""))

In [4]:
# Text Tokenization
tokenizer = Tokenizer(inputCol="summary", outputCol="summary_words")
reddit_data = tokenizer.transform(reddit_data)

In [5]:
# Remove stop words - set of commonly used words in a language('a', 'the')
remover = StopWordsRemover(inputCol="summary_words", outputCol="filtered_summary")
reddit_data = remover.transform(reddit_data)

In [6]:
# Displaying Dataset after removing 
reddit_data.select("filtered_summary").show(10, truncate = False)

+----------------------------------------------------------------------------------------------------------------------------------------------+
|filtered_summary                                                                                                                              |
+----------------------------------------------------------------------------------------------------------------------------------------------+
|[cheated, long, time, platonic, friend, married, wife, found, told, three, years, later, still, dealing, fallout, many, regrets]              |
|[kids, swim, make, sound, go, , , edit, zomg, reddit, gold, holy, crap, thanks]                                                               |
|[opiates, give, powerful, burst, reward, chemicals, hence, rendering, non, drug, accomplishments, obsolete]                                   |
|[sure, im, overreacting, want, bf, show, desire, see, hes, leavingdo, something, hes, talking, sure, end]                        

In [7]:
# Removing NAs and Duplicates
reddit_data = reddit_data.na.drop(subset=["filtered_summary"])
reddit_data = reddit_data.dropDuplicates(["filtered_summary"])

# Selecting only the required fields
reddit_data = reddit_data.select("subreddit","summary","filtered_summary")

# Cleaning the final output array after preprocessing, removing any empty strings/ nulls
reddit_data = reddit_data.withColumn("cleaned_array", expr("filter(filtered_summary, x -> x is not null and trim(x) <> '')"))

# Selecting only the required fields
cleaned_reddit_data = reddit_data.select("subreddit","cleaned_array")

In [8]:
# Python function to return count of an array
def word_count(textarray):
    return len(textarray)

# Creating a Spark UDF based on the python function
word_count_udf = udf(word_count, IntegerType())

# Removing Empty arrays from the dataset
cleaned_reddit_data = cleaned_reddit_data.withColumn("word_count", word_count_udf(cleaned_reddit_data["cleaned_array"]))
cleaned_reddit_data = cleaned_reddit_data.filter(cleaned_reddit_data["word_count"] > 0)
cleaned_reddit_data = cleaned_reddit_data.dropDuplicates(["cleaned_array"])
cleaned_reddit_data.show(truncate=False)

23/12/21 14:36:15 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors

+--------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|subreddit           |cleaned_array                                                                                                                                                                                             |word_count|
+--------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+
|brisbane            |[aa, credit, rating, scared, current, debt, forecasts, placed, full, economic, context]                                                                                                                   |11        |
|seduction           |[aa, disappearing, conversatio

                                                                                

In [9]:
# Create 100k features out of the data - this number was selected based on the scale of the whole dataset
hashingTF = HashingTF(inputCol="cleaned_array", outputCol="rawFeatures_summary", numFeatures=100000)
featurizedData = hashingTF.transform(cleaned_reddit_data)

In [10]:
# Create IDF vectors based on the features 
idf = IDF(inputCol="rawFeatures_summary", outputCol="features_summary")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

                                                                                

In [12]:
# Display the rescaled data after creating TF-IDF vectors from the features
rescaledData.select("subreddit", "cleaned_array", "features_summary").show(1, truncate=False)

23/12/21 14:40:35 WARN DAGScheduler: Broadcasting large task binary with size 1636.9 KiB


+---------+---------------------------------------------------------------------------------------+--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|subreddit|cleaned_array                                                                          |features_summary                                                                                                                                                                                                                                                                      |
+---------+---------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------------------------------------------

                                                                                

In [25]:
# User Query
user_query = "Mclaren F1"
query_df = spark.createDataFrame([(user_query,)], ["summary"])

# Preprocessing steps for the user query
query_wordsData = tokenizer.transform(query_df)
query_wordsData_rem = remover.transform(query_wordsData)
query_wordsData_rem = query_wordsData_rem.withColumn("cleaned_array", col('filtered_summary'))
query_wordsData_rem.show()

+----------+-------------+----------------+-------------+
|   summary|summary_words|filtered_summary|cleaned_array|
+----------+-------------+----------------+-------------+
|Mclaren F1|[mclaren, f1]|   [mclaren, f1]|[mclaren, f1]|
+----------+-------------+----------------+-------------+



In [26]:
# Create TF-IDF vectors from the input user query
featurizedData_user = hashingTF.transform(query_wordsData_rem)
rescaledData_user = idfModel.transform(featurizedData_user)
rescaledData_user.show()

+----------+-------------+----------------+-------------+--------------------+--------------------+
|   summary|summary_words|filtered_summary|cleaned_array| rawFeatures_summary|    features_summary|
+----------+-------------+----------------+-------------+--------------------+--------------------+
|Mclaren F1|[mclaren, f1]|   [mclaren, f1]|[mclaren, f1]|(100000,[39908,89...|(100000,[39908,89...|
+----------+-------------+----------------+-------------+--------------------+--------------------+



23/12/21 15:54:38 WARN DAGScheduler: Broadcasting large task binary with size 1617.3 KiB
23/12/21 15:54:38 WARN DAGScheduler: Broadcasting large task binary with size 1617.3 KiB
23/12/21 15:54:38 WARN DAGScheduler: Broadcasting large task binary with size 1617.3 KiB


In [27]:
# Fetching the input query vector
input_features = rescaledData_user.select("features_summary").collect()[0]["features_summary"]

# Broadcast this vector to all Spark nodes for efficient processing while calculating cosine similarity
broadcast_input_features = spark.sparkContext.broadcast(input_features)

23/12/21 15:54:39 WARN DAGScheduler: Broadcasting large task binary with size 1612.0 KiB


In [28]:
# Defining python function to calculate Cosine similarity
def cosine_similarity(vec1):
    # Fetch the broadcasted input feature vector
    vec2 = broadcast_input_features.value

    # Convert vectors to arrays for cosine similarity calculation
    vec1_array = np.array(vec1.toArray()) if isinstance(vec1, SparseVector) else vec1
    vec2_array = np.array(vec2.toArray()) if isinstance(vec2, SparseVector) else vec2

    # Compute cosine similarity
    return float(np.dot(vec1_array, vec2_array) / (np.linalg.norm(vec1_array) * np.linalg.norm(vec2_array)))

# Spark UDF for Cosine similarity
cosine_similarity_udf = udf(cosine_similarity, FloatType())

In [29]:
# Apply the cosine similarity UDF to the Reddit DataFrame
reddit_data_with_similarity = rescaledData.withColumn("cosine_similarity", cosine_similarity_udf(col("features_summary")))
top_n = 10  # Number of top reddit content matches across various subreddits
top_recommendations = reddit_data_with_similarity.orderBy("cosine_similarity", ascending=False).limit(top_n)
top_recommendations.select("subreddit", "cosine_similarity").show(truncate=False)

23/12/21 15:55:27 WARN DAGScheduler: Broadcasting large task binary with size 1643.0 KiB

+------------+-----------------+
|subreddit   |cosine_similarity|
+------------+-----------------+
|formula1    |0.5814076        |
|formula1    |0.5437984        |
|cars        |0.5227868        |
|formula1    |0.5049425        |
|formula1    |0.48110324       |
|gaming      |0.4601885        |
|VirtualWDCPC|0.44697332       |
|formula1    |0.43953624       |
|spikes      |0.43941042       |
|formula1    |0.43439627       |
+------------+-----------------+



                                                                                

In [30]:
# Group Subreddits by content with highest cosine similarity(Greater than 0.7 similarity)
tp_reco_df = reddit_data_with_similarity.select("subreddit", "cleaned_array", "cosine_similarity").filter(reddit_data_with_similarity.cosine_similarity>0.5).groupBy("subreddit").count().orderBy("count",ascending=False)
tp_reco_df.show(5, truncate=False)

23/12/21 15:59:28 WARN DAGScheduler: Broadcasting large task binary with size 1639.1 KiB
23/12/21 16:03:05 WARN DAGScheduler: Broadcasting large task binary with size 1638.4 KiB


+---------+-----+
|subreddit|count|
+---------+-----+
|formula1 |3    |
|cars     |1    |
+---------+-----+



23/12/21 16:03:05 WARN DAGScheduler: Broadcasting large task binary with size 1655.8 KiB
23/12/21 16:03:05 WARN DAGScheduler: Broadcasting large task binary with size 1654.0 KiB
                                                                                

In [38]:
# Stop the Spark session
spark.stop()