# Data Ingestion

In [1]:
import zipfile
import os
from recommenders.datasets import mind

# Download dataset
mind.download_mind(size='small', dest_path='../data/mind')

# Unzip all zip files in the download directory
zip_dir = '../data/mind'
for filename in os.listdir(zip_dir):
    if filename.endswith('.zip'):
        zip_path = os.path.join(zip_dir, filename)
        extract_path = os.path.join(zip_dir, filename.replace('.zip', ''))
        os.makedirs(extract_path, exist_ok=True)
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(extract_path)
        print(f"Extracted: {filename} → {extract_path}")

  4%|▍         | 1.98k/51.8k [00:01<00:28, 1.75kKB/s]

100%|██████████| 51.8k/51.8k [00:36<00:00, 1.42kKB/s]
100%|██████████| 30.2k/30.2k [00:21<00:00, 1.40kKB/s]


Extracted: MINDsmall_dev.zip → ../data/mind/MINDsmall_dev
Extracted: MINDsmall_train.zip → ../data/mind/MINDsmall_train


# Feature Engineering


In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import split, col, when

# 1. Start Spark
spark = SparkSession.builder \
    .appName("MIND Preprocessing") \
    .getOrCreate()

# 2. Load TSV files (no headers)
news_df = spark.read.option("delimiter", "\t").csv("../data/mind/MINDsmall_train/news.tsv")
behaviors_df = spark.read.option("delimiter", "\t").csv("../data/mind/MINDsmall_train/behaviors.tsv")

# 3. Assign column names
news_df = news_df.toDF("id", "category", "subcategory", "title", "abstract", "url", "title_entities", "abstract_entities")
behaviors_df = behaviors_df.toDF("impression_id", "user_id", "timestamp", "history", "impressions")

# 4. Transform history: split string into array (handling nulls)
behaviors_df = behaviors_df.withColumn(
    "history",
    when(col("history").isNull(), "").otherwise(col("history"))
)
behaviors_df = behaviors_df.withColumn("history", split(col("history"), " "))

# 5. Transform impressions: split into array of item-click pairs
behaviors_df = behaviors_df.withColumn("impressions", split(col("impressions"), " "))

# 6. Save as Parquet
news_df.write.mode("overwrite").parquet("../data/processed/news.parquet")
behaviors_df.write.mode("overwrite").parquet("../data/processed/behaviors.parquet")

25/04/24 16:42:23 WARN Utils: Your hostname, phoenix resolves to a loopback address: 127.0.1.1; using 10.97.160.179 instead (on interface eno1)
25/04/24 16:42:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/24 16:42:24 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/04/24 16:42:35 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers
25/04/24 16:42:35 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 84.44% for 9 writers
25/04/24 16:42:35 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 76.00% for 10 writers
25/04/24 16:42:36 WARN Memory

In [None]:
news_df

In [None]:
import numpy as np

def load_embedding(file_path):
    embed_dict = {}
    with open(file_path, 'r', encoding='utf-8') as f:
        for line in f:
            tokens = line.strip().split()
            key = tokens[0]
            vec = np.array([float(x) for x in tokens[1:]])
            embed_dict[key] = vec
    return embed_dict

entity_embedding = load_embedding("../data/mind/MINDsmall_train/entity_embedding.vec")