# Criteo data

In [1]:
import pandas as pd
import numpy as np
import torch

In [2]:
NUM_DENSE = 13
NUM_SPARSE = 26

In [3]:
dense_features = [f"DENSE_{i}" for i in range(NUM_DENSE)]
sparse_features = [f"SPARSE_{i}" for i in range(NUM_SPARSE)]

In [4]:
columns = ["labels"] + dense_features + sparse_features

In [5]:
column_types = {col: 'float32' for col in ["labels"] + dense_features}
column_types.update({col: 'str' for col in sparse_features})

In [6]:
day_to_load = 0

In [7]:
file_path = f"/data/day_{day_to_load}.gz"

In [8]:
# Function to convert hexadecimal string to integer
def hex_to_int(hex_string):
    if hex_string == 'nan':
        return -1
    return int(hex_string, 16)

def clean_chunk(chunk):
    chunk[dense_features] = chunk[dense_features].fillna(0).astype(np.float32)
    for sparse_feature_name in sparse_features:
        chunk[sparse_feature_name] = chunk[sparse_feature_name].astype(str).apply(hex_to_int)
    chunk["labels"] = chunk["labels"].astype(np.float64)
    return chunk

In [9]:
chunk_size = 1000000  # Adjust this value based on your system's memory constraints
total_rows = 0
total_size = 0
for chunk in pd.read_csv(file_path, sep='\t', header=None, names=columns,compression='gzip', chunksize=chunk_size):
    total_rows += len(chunk)
    chunk = clean_chunk(chunk)
    total_size += chunk.memory_usage(deep=True).sum() / 10**9
    print("done with ", total_rows, total_size, "gb")
# Display the resulting DataFrame

In [254]:
# all_data = pd.read_csv(file_path, sep='\t', header=None, names=columns, dtype=column_types, compression='gzip')

In [257]:
chunk_size = 1000  # Adjust this value based on your system's memory constraints
all_dfs = []
num_chunks = 100000
original_chunks = 100000

for chunk in pd.read_csv(file_path, sep='\t', header=None, names=columns,compression='gzip', chunksize=chunk_size):
    if num_chunks % 100 == 0:
        print(f"Done with {original_chunks - num_chunks}")
    chunk = clean_chunk(chunk)
    all_dfs.append(chunk)
    num_chunks -= 1
    if num_chunks == 0:
        break
result_df = pd.concat(all_dfs)
# Display the resulting DataFrame
print(len(result_df))

In [258]:
result_df.memory_usage(deep=True).sum() / 10**9

In [196]:
category_reverse_map = {feature_name: {
    value: idx for idx, value in enumerate(np.unique(result_df[feature_name].values))
} for feature_name in sparse_features}

In [197]:
def tokenize_sparse(df):
    for feature_name in sparse_features:
        df[feature_name] = df[feature_name].map(category_reverse_map[feature_name]).astype(np.int64)
    return df

In [198]:
result_df = tokenize_sparse(result_df)

In [199]:
result_df.memory_usage(deep=True).sum() / 10**9

# All Data and Dataset

In [200]:
dense_tensor_data = torch.from_numpy(result_df[dense_features].fillna(0).values)
dense_tensor_data.size()

In [201]:
sparse_tensor_data = {fn: torch.from_numpy(result_df[fn].values) for fn in sparse_features}

In [251]:
labels_tensor_data = torch.from_numpy(result_df["labels"].values.astype(np.float64))
labels_tensor_data.dtype

In [203]:
from typing import NamedTuple, Mapping, Tuple
import torch

class ModelInput(NamedTuple):
    dense_features: torch.Tensor
    sparse_features: Mapping[str, torch.Tensor]

# Model Architecture

An implementation of a deep learning recommendation model (DLRM). The model input consists of dense and sparse features. The former is a vector of floating point values. The latter is a list of sparse indices into embedding tables, which consist of vectors of floating point values. The selected vectors are passed to mlp networks denoted by triangles, in some cases the vectors are interacted through operators (Ops).

In [204]:
# output:
#                     probability of a click
# model:                        |
#                              /\
#                             /__\
#                               |
#       _____________________> Op  <___________________
#     /                         |                      \
#    /\                        /\                      /\
#   /__\                      /__\           ...      /__\
#    |                          |                       |
#    |                         Op                      Op
#    |                    ____/__\_____           ____/__\____
#    |                   |_Emb_|____|__|    ...  |_Emb_|__|___|
# input:
# [ dense features ]     [sparse indices] , ..., [sparse indices]

In [205]:
import torch.nn as nn

# MLP

In [206]:
# Define the MLP model
class MLP(nn.Module):
    def __init__(self, input_size: int, hidden_size: int, output_size: int):
        super(MLP, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.relu = nn.ReLU()
        self.fc2 = nn.Linear(hidden_size, output_size)
    
    def forward(self, x):
        x = self.fc1(x)
        x = self.relu(x)
        x = self.fc2(x)
        return x

# Dense Arch

In [207]:
import torch.nn as nn

class DenseArch(nn.Module):
    def __init__(self, dense_feature_count: int, output_size: int) -> None:
        super(DenseArch, self).__init__()  # Call the superclass's __init__ method
        self.mlp = MLP(input_size=dense_feature_count, hidden_size=output_size * 2, output_size=output_size) # D X O

    def forward(self, inputs: torch.Tensor) -> torch.Tensor:
        # Input : B X D # Output : B X O
        return self.mlp(inputs)

In [208]:
# Let's test it

In [209]:
dense_tensor = dense_tensor_data

In [210]:
output_size = 16

dense_arch = DenseArch(len(dense_features), 16)
dense_out = dense_arch(dense_tensor)

In [211]:
dense_out.size()

In [212]:
dense_out

# Sparse Arch

In [213]:
from typing import List, Dict

class SparseArch(nn.Module):
    def __init__(self, embedding_dimensions: Mapping[str, Tuple[int, int]], output_size: int) -> None:
        super(SparseArch, self).__init__()

        # Create Embedding layers for each sparse feature
        self.embeddings = nn.ModuleDict({
            feature_name: nn.Embedding(num_embeddings, embedding_dim)
            for feature_name, (num_embeddings, embedding_dim) in embedding_dimensions.items()
        })

        # Create MLP for each sparse feature
        self.mlps = nn.ModuleDict({
            feature_name: MLP(input_size=embedding_dim, hidden_size=output_size * 2, output_size=output_size)
            for feature_name, (num_embeddings, embedding_dim) in embedding_dimensions.items()
        })

    def forward(self, inputs: Dict[str, torch.Tensor]) -> List[torch.Tensor]:
        output_values = []
        for feature, input_values in inputs.items():
            embeddings = self.embeddings[feature](input_values)
            sparse_out = self.mlps[feature](embeddings)
            output_values.append(sparse_out)
        
        return output_values

In [214]:
embedding_size = 16

embedding_dimensions = {fn: (num_categories[fn] +  1, embedding_size) for fn in sparse_features}
print(embedding_dimensions)

In [215]:
output_size = 16

In [216]:
sparse_arch = SparseArch(embedding_dimensions=embedding_dimensions, output_size=output_size)

In [217]:
sparse_tensor_data["SPARSE_0"].size()

In [218]:
sparse_out = sparse_arch(sparse_tensor_data)
sparse_out

# Dense Sparse Interaction

In [219]:
class DenseSparseInteractionLayer(nn.Module):    
    def forward(self, dense_out: torch.Tensor, sparse_out: List[torch.Tensor]) -> float:
        concat = torch.cat([dense_out] + sparse_out, dim=-1).unsqueeze(2)
        out = torch.bmm(concat, torch.transpose(concat, 1, 2))
        flattened = torch.flatten(out, 1)
        return flattened

In [220]:
class PredictionLayer(nn.Module):
    def __init__(self,dense_out_size: int , sparse_out_sizes: List[int], hidden_size: int): 
        super(PredictionLayer, self).__init__()
        concat_size = sum(sparse_out_sizes) + dense_out_size
        self.mlp = MLP(input_size=concat_size * concat_size, hidden_size=hidden_size, output_size=1)
        self.sigmoid = nn.Sigmoid()

    def forward(self, inputs: torch.Tensor) -> float:
        mlp_out = self.mlp(inputs)
        result = self.sigmoid(mlp_out)
        return result

In [221]:
dense_sparse_interaction_layer = DenseSparseInteractionLayer()
ds_out = dense_sparse_interaction_layer(dense_out, sparse_out)

In [222]:
ds_out.size()

In [223]:
prediction_layer = PredictionLayer(dense_out_size=output_size, sparse_out_sizes=[output_size]*len(sparse_features), hidden_size=16)

In [224]:
pred_out = prediction_layer(ds_out)
pred_out.size()

# Model

In [225]:
from dataclasses import dataclass

# parameters
# dense_input_feature_size
# sparse_embedding_dimenstions
# dense_output_size
# sparse_output_size
# dense_hidden_size
# sparse_hidden_size
# prediction_hidden_size

@dataclass
class Parameters:
    dense_input_feature_size: int
    sparse_embedding_dimenstions: Mapping[str, Tuple[int, int]]
    dense_output_size: int
    sparse_output_size: int
    dense_hidden_size: int
    sparse_hidden_size: int
    prediction_hidden_size: int

class DLRM(nn.Module):
    def __init__(self, parameters: Parameters):
        super(DLRM, self).__init__()
        self.dense_layer = DenseArch(dense_feature_count=parameters.dense_input_feature_size, output_size=parameters.dense_output_size)
        self.sparse_layer = SparseArch(embedding_dimensions=parameters.sparse_embedding_dimenstions, output_size=parameters.sparse_output_size)
        self.interaction_layer = DenseSparseInteractionLayer()
        self.prediction_layer = PredictionLayer(
            dense_out_size=parameters.dense_output_size, 
            sparse_out_sizes=[parameters.sparse_output_size]*len(parameters.sparse_embedding_dimenstions), 
            hidden_size=parameters.prediction_hidden_size
        )

    def forward(self, dense_features, sparse_features) -> float:
        dense_out = self.dense_layer(dense_features)
        sparse_out = self.sparse_layer(sparse_features)
        ds_out = self.interaction_layer(dense_out, sparse_out)
        return self.prediction_layer(ds_out)

In [228]:
parameters = Parameters(
    dense_input_feature_size=len(dense_features),
    sparse_embedding_dimenstions=embedding_dimensions,
    dense_output_size=16,
    sparse_output_size=16,
    dense_hidden_size=32,
    sparse_hidden_size=32,
    prediction_hidden_size=32)

In [229]:
dlrm = DLRM(parameters=parameters)

In [231]:
prediction = dlrm(dense_tensor, sparse_tensor_data)

In [232]:
from torchviz import make_dot

In [233]:
make_dot(prediction.mean(), params=dict(dlrm.named_parameters()), show_attrs=True, show_saved=True)

In [234]:
out = make_dot(prediction.mean(), params=dict(dlrm.named_parameters()), show_attrs=True, show_saved=True)

In [235]:
out.render("dlrm.pdf",format="pdf", view=False)

In [237]:
model_input = ModelInput(dense_features=dense_tensor, sparse_features=sparse_tensor_data)

In [241]:
traced_model = torch.jit.trace(dlrm, [dense_tensor, sparse_tensor_data])

In [242]:
traced_model.graph

In [243]:
print(traced_model.code)

In [244]:
compiled = torch.compile(dlrm, mode='max-autotune')

In [246]:
torch._dynamo.reset()
explain_output = torch._dynamo.explain(compiled)(dense_tensor, sparse_tensor_data)
print(explain_output)

# Dataset

# Train Loop