```text
# Copyright (c) 2020 NVIDIA CORPORATION. All rights reserved.
# Modifications copyright Intel
# 
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#       http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
```

# An implementation of a deep learning recommendation model (DLRM)
This is a modified version of DRLM implementation . 

## Data Preprocess with Apache Spark
The following functions are used for data preprocessing.

In [None]:
import os
del os.environ["http_proxy"]
del os.environ["https_proxy"]

from contextlib import contextmanager
from operator import itemgetter
from time import time

from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
LABEL_COL = 0
INT_COLS = list(range(1, 14))
CAT_COLS = list(range(14, 40))

In [None]:
_benchmark = {}

@contextmanager
def _timed(step):
    start = time()
    yield
    end = time()
    _benchmark[step] = end - start

In [None]:
def get_column_counts_with_frequency_limit(df, frequency_limit=None):
    # column_id, data, count
    cols = ['_c%d' % i for i in CAT_COLS]
    df = (df
          .select(posexplode(array(*cols)))
          .withColumnRenamed('pos', 'column_id')
          .withColumnRenamed('col', 'data')
          .filter('data is not null')
          .groupBy('column_id', 'data')
          .count())

    if frequency_limit:
        frequency_limit = frequency_limit.split(",")
        exclude = []
        default_limit = None
        for fl in frequency_limit:
            frequency_pair = fl.split(":")
            if len(frequency_pair) == 1:
                default_limit = int(frequency_pair[0])
            elif len(frequency_pair) == 2:
                df = df.filter((col('column_id') != int(frequency_pair[0]) - CAT_COLS[0]) | (
                        col('count') >= int(frequency_pair[1])))
                exclude.append(int(frequency_pair[0]))
        if default_limit:
            remain = [x - CAT_COLS[0] for x in CAT_COLS if x not in exclude]
            df = df.filter((~col('column_id').isin(remain)) | (col('count') >= default_limit))
            # for comparing isin and separate filter
            # for i in remain:
            #     df = df.filter((col('column_id') != i - CAT_COLS[0]) | (col('count') >= default_limit))
    return df


def assign_id_with_window(df):
    # column_id, data, model_count, id
    windowed = Window.partitionBy('column_id').orderBy(desc('count'))
    return (df
            .withColumn('id', row_number().over(windowed) - 1)
            .withColumnRenamed('count', 'model_count'))


def get_column_models(combined_model):
    for i in CAT_COLS:
        # data, model_count, id
        model = (combined_model
                 .filter('column_id == %d' % (i - CAT_COLS[0]))
                 .drop('column_id'))
        yield i, model


def col_of_rand_long():
    return (rand() * (1 << 52)).cast(LongType())


def apply_models(df, models):
    for i, model in models:
        col_name = '_c%d' % i
        model = model.drop('model_count').withColumnRenamed('data', col_name)
        df = (df
              .join(model, col_name, how='left')
              .drop(col_name)
              .withColumnRenamed('id', col_name))
    return df.fillna(0, ['_c%d' % i for i in CAT_COLS])


def transform_log(df, transform_log=False):
    cols = ['_c%d' % i for i in INT_COLS]
    if transform_log:
        for col_name in cols:
            df = df.withColumn(col_name, log(df[col_name] + 3))
    return df.fillna(0, cols)


def rand_ordinal(df):
    # create a random long from the double precision float.
    # The fraction part of a double is 52 bits, so we try to capture as much
    # of that as possible
    return df.withColumn('ordinal', col_of_rand_long())


def process_column_models(column_models):
    for i, column in column_models:
        values = column.agg(count('*').alias('size')).collect()
        yield i, column, values[0]

In [None]:
def pre_process(df,
                frequency_limit: str = 3,
                output_ordering: str = "total_random",
                no_numeric_log_col: bool = False):
    with _timed("generate models"):
        # column_id, data, count
        col_counts = get_column_counts_with_frequency_limit(df, frequency_limit=frequency_limit)
        # column_id, data, model_count, id
        col_counts_with_id = assign_id_with_window(col_counts)
        # data, model_count, id
        columns_model = list(get_column_models(col_counts_with_id))

    with _timed('transform'):
        # 40 columns, ordinal, day
        if output_ordering == 'total_random':
            df = rand_ordinal(df)
        elif output_ordering == 'input':
            df = df.withColumn('ordinal', monotonically_increasing_id())
        else:  # any ordering
            pass

        models = list(process_column_models(columns_model))
        categorical_feature_sizes = [agg.size for _, _, agg in models]
        models = [(i, df) for i, df, agg in models]

        df = apply_models(df, models)
        df = transform_log(df, not no_numeric_log_col)

        if output_ordering == 'total_random':
            # Don't do a full sort it is expensive. Order is random so
            # just make it random
            df = df.repartition('ordinal').sortWithinPartitions('ordinal')
            df = df.drop('ordinal')
        elif output_ordering == 'input':
            # Applying the dictionary happened within a single task so we are already really
            # close to the correct order, just need to sort within the partition
            df = df.sortWithinPartitions('ordinal')
            df = df.drop('ordinal')
            df = df.drop("day")
        # else: any ordering so do nothing the ordering does not matter

    return df, categorical_feature_sizes

## DLRM Model
Define the DLRM model

In [None]:
import copy
import math
from typing import List, Sequence, Optional, Tuple

import torch
from torch import nn


class DotInteraction:

    def __init__(self, embedding_num: int, embedding_dim: int):
        """
        Interactions are among outputs of all the embedding tables and bottom MLP, total number of
        (num_embedding_tables + 1) vectors with size embedding_dim. ``dot`` product interaction computes dot product
        between any 2 vectors. Output of interaction will have shape [num_interactions, embedding_dim].
        """
        self._num_interaction_inputs = embedding_num + 1
        self._embedding_dim = embedding_dim
        self._tril_indices = torch.tensor([[i for i in range(self._num_interaction_inputs)
                                            for _ in range(i)],
                                           [j for i in range(self._num_interaction_inputs)
                                            for j in range(i)]])

    @property
    def num_interactions(self) -> int:
        n = (self._num_interaction_inputs * (self._num_interaction_inputs - 1)) // 2 + self._embedding_dim
        return n + 1  # pad 1 to be multiple of 8

    def interact(self, bottom_output, bottom_mlp_output):
        """
        :param bottom_output: [batch_size, 1 + #embeddings, embedding_dim]
        :param bottom_mlp_output
        :return:
        """
        batch_size = bottom_output.size()[0]

        interaction = torch.bmm(bottom_output, torch.transpose(bottom_output, 1, 2))
        interaction_flat = interaction[:, self._tril_indices[0], self._tril_indices[1]]

        # concatenate dense features and interactions
        zeros_padding = torch.zeros(batch_size, 1, dtype=bottom_output.dtype, device=bottom_output.device)
        interaction_output = torch.cat(
            (bottom_mlp_output, interaction_flat, zeros_padding), dim=1)

        return interaction_output


class TorchMlp(nn.Module):
    def __init__(self, input_dim: int, sizes: Sequence[int]):
        super().__init__()

        layers = []
        for output_dims in sizes:
            layers.append(nn.Linear(input_dim, output_dims))
            layers.append(nn.ReLU(inplace=True))
            input_dim = output_dims

        self.layers = nn.Sequential(*layers)

        self._initialize_weights()

    def _initialize_weights(self):
        for module in self.modules():
            if isinstance(module, nn.Linear):
                nn.init.normal_(module.weight.data, 0., math.sqrt(2. / (module.in_features + module.out_features)))
                nn.init.normal_(module.bias.data, 0., math.sqrt(1. / module.out_features))

    @property
    def weights(self):
        return [layer.weight for layer in self.layers if isinstance(layer, nn.Linear)]

    @property
    def biases(self):
        return [layer.bias for layer in self.layers if isinstance(layer, nn.Linear)]

    def forward(self, mlp_input: torch.Tensor) -> torch.Tensor:
        """
        Args:
            mlp_input (Tensor): with shape [batch_size, num_features]

        Returns:
            Tensor: Mlp output in shape [batch_size, num_output_features]
        """
        return self.layers(mlp_input)


class MultiTableEmbeddings(nn.Module):

    def __init__(
            self,
            categorical_feature_sizes: Sequence[int],
            embedding_dim: int):
        super().__init__()
        self._categorical_feature_sizes = copy.copy(categorical_feature_sizes)

        embeddings = []
        # Each embedding table has size [num_features, embedding_dim]
        for i, num_features in enumerate(categorical_feature_sizes):
            embedding_weight = torch.empty((num_features, embedding_dim))
            embedding = nn.Embedding.from_pretrained(embedding_weight, freeze=False, sparse=True)
            embeddings.append(embedding)

        self.embeddings = nn.ModuleList(embeddings)
        self.embedding_dim = embedding_dim

    def forward(self, categorical_inputs) -> List[torch.Tensor]:
        """
        Args:
            categorical_inputs (Tensor): with shape [batch_size, num_categorical_features]

        Returns:
            Tensor: embedding outputs in shape [batch, embedding_num, embedding_dim]
        """
        # embedding_outputs will be a list of (26 in the case of Criteo) fetched embeddings with shape
        # [batch_size, embedding_size]
        embedding_outputs = []
        for embedding_id, embedding in enumerate(self.embeddings):
            embedding_outputs.append(embedding(categorical_inputs[:, embedding_id]).unsqueeze(1))

        return embedding_outputs

    @property
    def weights(self):
        return [embedding.weight.data for embedding in self.embeddings]


class DlrmBottom(nn.Module):

    def __init__(
            self,
            num_numerical_features: int,
            categorical_feature_sizes: Sequence[int],
            bottom_mlp_sizes: Optional[Sequence[int]] = None,
            embedding_dim: int = 128):
        super().__init__()
        assert bottom_mlp_sizes is None or embedding_dim == bottom_mlp_sizes[-1], "The last bottom MLP layer must" \
                                                                                  " have same size as embedding."
        self._embedding_dim = embedding_dim
        self._categorical_feature_sizes = copy.copy(categorical_feature_sizes)

        self.embeddings = MultiTableEmbeddings(categorical_feature_sizes, embedding_dim)
        self.mlp = (TorchMlp(num_numerical_features, bottom_mlp_sizes)
                    if bottom_mlp_sizes else torch.nn.ModuleList())

        self._initialize_embeddings_weights(self.embeddings, categorical_feature_sizes)

    def _initialize_embeddings_weights(self, embeddings, categorical_feature_sizes: Sequence[int]):
        assert len(embeddings.weights) == len(categorical_feature_sizes)

        for size, weight in zip(categorical_feature_sizes, embeddings.weights):
            nn.init.uniform_(
                weight,
                -math.sqrt(1. / size),
                math.sqrt(1. / size)
            )

    @property
    def num_categorical_features(self) -> int:
        return len(self._categorical_feature_sizes)

    @property
    def num_feature_vectors(self) -> int:
        return self.num_categorical_features + int(self.mlp is not None)

    def forward(self, numerical_input, categorical_inputs) -> Tuple[torch.Tensor, Optional[torch.Tensor]]:
        """
        Args:
            numerical_input (Tensor): with shape [batch_size, num_numerical_features]
            categorical_inputs (Tensor): with shape [batch_size, num_categorical_features]

        Returns:
            Tensor: Concatenated bottom mlp and embedding output in shape [batch, 1 + #embedding, embedding_dim]
        """
        batch_size = len(numerical_input) if numerical_input is not None else len(categorical_inputs)
        bottom_output = []
        bottom_mlp_output = None

        if self.mlp:
            bottom_mlp_output = self.mlp(numerical_input)
            # reshape bottom mlp to concatenate with embeddings
            bottom_output.append(bottom_mlp_output.view(batch_size, 1, -1))

        if self.num_categorical_features > 0:
            bottom_output += self.embeddings(categorical_inputs)

        if len(bottom_output) == 1:
            return bottom_output[0], bottom_mlp_output

        return torch.cat(bottom_output, dim=1), bottom_mlp_output


class DlrmTop(nn.Module):

    def __init__(self, top_mlp_sizes: Sequence[int], interaction: DotInteraction):
        super().__init__()

        self.interaction = interaction
        self.mlp = TorchMlp(interaction.num_interactions, top_mlp_sizes[:-1])
        self.out = nn.Linear(top_mlp_sizes[-2], top_mlp_sizes[-1])

        self._initialize_weights()

    def _initialize_weights(self):
        # Explicitly set weight corresponding to zero padded interaction output. They will
        # stay 0 throughout the entire training. An assert can be added to the end of the training
        # to prove it doesn't increase model capacity but just 0 paddings.
        nn.init.zeros_(self.mlp.weights[0][:, -1].data)

    def forward(self, bottom_output, bottom_mlp_output):
        """
        Args:
            bottom_output (Tensor): with shape [batch_size, 1 + #embeddings, embedding_dim]
            bottom_mlp_output (Tensor): with shape [batch_size, embedding_dim]
        """
        interaction_output = self.interaction.interact(bottom_output, bottom_mlp_output)
        return self.out(self.mlp(interaction_output))


class Dlrm(nn.Module):
    """Reimplement Facebook's DLRM model

    Original implementation is from https://github.com/facebookresearch/dlrm.

    """

    def __init__(
            self,
            num_numerical_features: int,
            categorical_feature_sizes: Sequence[int],
            bottom_mlp_sizes: Sequence[int],
            top_mlp_sizes: Sequence[int],
            embedding_dim: int = 32):
        super().__init__()
        assert embedding_dim == bottom_mlp_sizes[-1], "The last bottom MLP layer must have same size as embedding."

        self.num_numerical_features = num_numerical_features

        interaction = DotInteraction(len(categorical_feature_sizes), embedding_dim)

        self.bottom_model = DlrmBottom(
            num_numerical_features=num_numerical_features,
            categorical_feature_sizes=categorical_feature_sizes,
            bottom_mlp_sizes=bottom_mlp_sizes,
            embedding_dim=embedding_dim)
        self.top_model = DlrmTop(top_mlp_sizes, interaction)

    def forward(self, *inputs):
        inputs = list(inputs)
        numerical_input = torch.cat(inputs[0: self.num_numerical_features], dim=1)
        categorical_inputs = torch.cat(inputs[self.num_numerical_features:], dim=1)
        bottom_output, bottom_mlp_output = self.bottom_model(numerical_input, categorical_inputs)
        return self.top_model(bottom_output, bottom_mlp_output)

## Combine the data preprocess and model training with RayDP

In [None]:
import ray
import raydp
from raydp.utils import random_split
from raydp.torch import TorchEstimator

In [None]:
# connect to ray cluster or init
ray.init(_redis_password="123")
# start up spark
spark = raydp.init_spark(app_name="dlrm",
                         num_executors=2,
                         executor_cores=4,
                         executor_memory="8GB")
spark

In [None]:
# the data can download from: https://www.kaggle.com/c/criteo-display-ad-challenge/
file_path = "/Users/xianyang/datasets/criteo-small/train_1m.txt"

# define the df schmea
label_fields = [StructField('_c%d' % LABEL_COL, IntegerType())]
int_fields = [StructField('_c%d' % i, IntegerType()) for i in INT_COLS]
str_fields = [StructField('_c%d' % i, StringType()) for i in CAT_COLS]

schema = StructType(label_fields + int_fields + str_fields)
df = spark.read.schema(schema).option("sep", "\t").csv(file_path)
df.cache()
df, categorical_feature_sizes = pre_process(df=df,
                                            frequency_limit="3",
                                            output_ordering="total_random",
                                            no_numeric_log_col=False)
train_df, test_df = random_split(df, [0.9, 0.1])

In [None]:
model = Dlrm(num_numerical_features=len(INT_COLS),
             categorical_feature_sizes=categorical_feature_sizes,
             bottom_mlp_sizes=[512, 128, 32],
             top_mlp_sizes=[1024, 1024, 512, 256, 1])
loss_fn = torch.nn.BCEWithLogitsLoss(reduction="mean")
optimizer = torch.optim.SGD(model.parameters(), lr=0.01)

feature_columns = ['_c%d' % i for i in range(1, 40)]
feature_types = [torch.float32] * len(INT_COLS)
feature_types += [torch.long] * len(CAT_COLS)

estimator = TorchEstimator(num_workers=2,
                           model=model,
                           optimizer=optimizer,
                           loss=loss_fn,
                           feature_columns=feature_columns,
                           feature_types=feature_types,
                           label_column="_c0",
                           label_type=torch.float,
                           batch_size=128,
                           num_epochs=2,
                           use_gpu=False)

estimator.fit_on_spark(train_df, test_df)

In [None]:
raydp.stop_spark()
ray.shutdown()