# Data Processing 

## Hydrating a System 

Hydrating a system is an essential part of a well-oiled recommendation system. As datasets grow larger and larger, it proved benificial to use specialized computing libraries to help handle the data at scale. 

**Spark**

A natively distributed programming language that is built to scale horizontally and allows for lazy evaluation of expressions

Spark employs a driver program that oversees:
- Cluster management 
- Worker nodes each with an executor, cache and tasks

**PySpark**

Python API access to the Spark computing library that can be used to help process and transform large-scale datasets 

PySpark provides a convenient SQL API that allows us to write what seems to be SQL queries against large datasets

**Lambda Architecture**

Daily jos with smaller, and more frequent batch jobs

Batch and Speed layers are inversly identified by the frequeny of processing and the volume per run of the data seen
- Speed layers may have varying frequencies
- One may employ multiple speed layers for varying batch cadences

**DataLoaders**

Originating from PyTorch but later adopted by other gradient-optimized workflows, DataLoaders allow us to define how data is batched and sent to models for training

## Data Structure for Learning and Inference

**Vector Search and ANN Index**

Our goal with vectors is to use the latent space and associated similarity metric to quickly retrieve similar items in the space, but how can we make it faster? 

Inverted Indices 
- Carefully construct a large hash between tokens of the query and candidates
- Good approach for sentences or small-lexicon data
- Speed costs are incurred as the dataset grows larger
    - Requires two steps 
    - Similarity distributions may not be correlated with token similarity required 

ANN Lookups 
- Allow us to move away from deterministic paradigms and introduce assumptions that help to prune larger datasets


**Bloom Filters**
**Feature Stores**

## Simple PySpark Recommender System 

### Get Familiar with Spark

`SparkSession` serves as an entry point for all Spark functionalitites. 

You *must* start a session if you want to do any building/manipulating in Spark 

In [1]:
from pyspark.sql import SparkSession

# start a spark session 
spark = (
    SparkSession.builder
    .appName("simple-spark-recommender")
    .config("spark.memory.offHeap.enabled", "true") # 
    .config("spark.memory.offHeap.size", "10g")
    .getOrCreate()
)

24/07/09 11:36:02 WARN Utils: Your hostname, srmarshall-mac.local resolves to a loopback address: 127.0.0.1; using 10.64.6.58 instead (on interface en0)
24/07/09 11:36:02 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/09 11:36:02 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# create a dataframe 
df = spark.read.csv("../data/saved_track_features.csv", header=True, escape="\"")

In [3]:
# show the first 5 rows without truncation (second arg = 0 == no truncation)
df.show(5, 0)

+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+----------------------+------------------------------------+--------------------------------------------------------+----------------------------------------------------------------+-----------+--------------+
|danceability|energy|key|loudness|mode|speechiness|acousticness|instrumentalness|liveness|valence|tempo  |type          |id                    |uri                                 |track_href                                              |analysis_url                                                    |duration_ms|time_signature|
+------------+------+---+--------+----+-----------+------------+----------------+--------+-------+-------+--------------+----------------------+------------------------------------+--------------------------------------------------------+----------------------------------------------------------------+-----------+--------------+
|0.756 

In [4]:
# get n observations
print(f"There are {df.count()} observations in this dataset")

There are 1716 observations in this dataset


In [5]:
# how many unique ids are there in this dataset
n_unique_ids = df.select("id").distinct().count()

print(f"There are {n_unique_ids} unique IDs in this datase")

There are 1716 unique IDs in this datase


Get aggregate tables using `.groupBy()`, `.agg()` and, `.countDistinct()` methods on a DataFrame

In [6]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

# what is the most common key -- generate a frequency table
key_freq = (
    df.groupBy("key")
        .agg(countDistinct("id").alias("count"))
        .orderBy(desc("count"))
)

key_freq.show()



CodeCache: size=131072Kb used=23710Kb max_used=23722Kb free=107361Kb
 bounds [0x000000010b1d8000, 0x000000010c928000, 0x00000001131d8000]
 total_blobs=9341 nmethods=8402 adapters=850
 compilation: disabled (not enough contiguous free space left)
+---+-----+
|key|count|
+---+-----+
|  1|  224|
|  0|  210|
|  7|  163|
|  5|  156|
|  6|  141|
|  2|  140|
|  9|  132|
| 11|  131|
|  8|  130|
|  4|  126|
| 10|  104|
|  3|   59|
+---+-----+



### Build a Recommender

Generate a sample dataset using Spotify listening history to emulate a RFM dataset

In [13]:
from pyspark.sql import SparkSession

# start a new spark session 
spark = (
    SparkSession.builder
    .appName("simple-spark-recommender")
    .config("spark.memory.offHeap.enabled", "true")
    .config("spark.memory.offHeap.size", "10g")
    .getOrCreate()
)

24/07/09 13:37:32 WARN Utils: Your hostname, srmarshall-mac.local resolves to a loopback address: 127.0.0.1; using 10.64.6.58 instead (on interface en0)
24/07/09 13:37:32 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/07/09 13:37:32 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [22]:
# create a dataframe 
df = spark.read.csv("../data/listening_history.csv", header=True, escape="\"")

In [23]:
# preview it
df.show(5, 0)

+----------------------+------------------------+
|id                    |timestamp               |
+----------------------+------------------------+
|5cT3Zcnx3DKezVFAYUPpqg|2024-07-09T14:30:45.920Z|
|3l3JdIcEn1lZ6mwnZSO2BV|2024-07-09T14:28:17.056Z|
|1CoW9K4Sabt7H8bspY6dI1|2024-07-09T14:24:43.086Z|
|5TR5odtJghbnXb9bQv6ubl|2024-07-09T14:21:41.051Z|
|2BxwnvgDYhBP3LZbd8tkDu|2024-07-09T14:18:17.057Z|
+----------------------+------------------------+
only showing top 5 rows



In [28]:
# convert string time to timestamp 
df = df.withColumn("timestamp", df["timestamp"].cast("timestamp"))

# confirm
df.dtypes

[('id', 'string'), ('timestamp', 'timestamp'), ('base_date', 'string')]

In [38]:
from pyspark.sql import functions as F
from pyspark.sql.functions import lit, col


# get base date
base_date = df.select(F.min('timestamp').alias('min_date')).collect()[0]["min_date"]

# set base date 
df = df.withColumn("base_date", lit(base_date))

# calcualte recency 
df_r = df.withColumn("recency", col("timestamp").cast("long") - col("base_date").cast("long"))

In [39]:
df_r.show(5, 0)

+----------------------+-----------------------+-----------------------+-------+
|id                    |timestamp              |base_date              |recency|
+----------------------+-----------------------+-----------------------+-------+
|5cT3Zcnx3DKezVFAYUPpqg|2024-07-09 10:30:45.92 |2024-07-02 14:44:14.382|589591 |
|3l3JdIcEn1lZ6mwnZSO2BV|2024-07-09 10:28:17.056|2024-07-02 14:44:14.382|589443 |
|1CoW9K4Sabt7H8bspY6dI1|2024-07-09 10:24:43.086|2024-07-02 14:44:14.382|589229 |
|5TR5odtJghbnXb9bQv6ubl|2024-07-09 10:21:41.051|2024-07-02 14:44:14.382|589047 |
|2BxwnvgDYhBP3LZbd8tkDu|2024-07-09 10:18:17.057|2024-07-02 14:44:14.382|588843 |
+----------------------+-----------------------+-----------------------+-------+
only showing top 5 rows



In [40]:
df_r.printSchema()

root
 |-- id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- base_date: timestamp (nullable = false)
 |-- recency: long (nullable = true)



In [42]:
from pyspark.sql.functions import count 

df_rf = df_r.groupBy("id").agg(count("timestamp").alias("frequency"))

In [43]:
df_rf.show()

+--------------------+---------+
|                  id|frequency|
+--------------------+---------+
|5TR5odtJghbnXb9bQ...|        1|
|2DbwsaYDvznsSkh4c...|        1|
|4D32XPyh0L8GX07mU...|        1|
|1zZIGnXIcDKU8QIUk...|        1|
|7uTqmYA0sSjmDVpQN...|        1|
|4KyZLFDRAVvs2naDw...|        1|
|4v9Iq4LANThJsxnMN...|        1|
|3ecLObZD4KDhSTokl...|        1|
|1CoW9K4Sabt7H8bsp...|        1|
|3rgTS3933lMWoPiN6...|        1|
|26DCohYR7X4PJGRQo...|        1|
|1k2pQc5i348DCHwbn...|        1|
|1rqqCSm0Qe4I9rUvW...|        1|
|0Qwn9VwFUlhrhsNGB...|        1|
|3l3JdIcEn1lZ6mwnZ...|        1|
|5cT3Zcnx3DKezVFAY...|        1|
|4o4y2ZK2pso1lHYAG...|        1|
|6u0x5ad9ewHvs3z6u...|        1|
|7qTaDOcld0VmBWXnk...|        1|
|1SjsVdSXpwm1kTdYE...|        1|
+--------------------+---------+
only showing top 20 rows

