# Wide & Deep Recommendation for large scale data - Feature Engineering

This demo uses the [Twitter Recsys Challenge 2021 dataset](http://www.recsyschallenge.com/2021/) and the [Wide & Deep Model](https://arxiv.org/abs/1606.07792). The dataset includes 46 million users and 340 million tweets (items) and each record contains the tweet along with engagement features, user features, and tweet features.

At the very beginning, let's have a high-level overview of general recommendation systems. The diagram below demonstrates the common components of a recommendation system, which typically consists of three stages:
- Offline: Perform feature engineering on the raw data and use the preprocessed data to train embeddings and deep learning models.

- Nearline: Retrieve user/item profiles and keep them in the Key-Value store. Make updates to the profiles and fine-tune the deep learning model from time to time.

- Online: Trigger the recommendation process whenever a user request comes. Recall service generates candidates from millions of items based on embedding similarity and ranking services uses the trained deep learning model to re-rank the candidates for the final recommendation results.

<img src="figures/overview-recsys.png" alt="overview-recsys" width="750"/>

This notebook demonstrates some common data preprocessing and feature engineering steps for Wide & Deep Learning on the Twitter Recsys Challenge 2021 dataset.

First of all, we import the necessary packages in BigDL for cluster initialization and built-in recommendation operations.

In [2]:
from bigdl.orca import init_orca_context, stop_orca_context, OrcaContext
from bigdl.friesian.feature import FeatureTable, StringIndex

Initialize the environment on the YARN cluster. You simply need to prepare the Python environment on the driver node with [Anaconda](https://www.anaconda.com/products/individual) and BigDL will automatically distribute and prepare the environment for you across the cluster.
Besides, you can specify the allocated resources for this application during the initialization, including the number of nodes, cores and the amount of memory to use, etc. BigDL provides detailed guidance to be easily deployed on [Hadoop/YARN](https://bigdl.readthedocs.io/en/latest/doc/UserGuide/hadoop.html) or [K8S](https://bigdl.readthedocs.io/en/latest/doc/UserGuide/k8s.html) clusters.

In [3]:
# To display terminal's stdout and stderr in the Jupyter notebook.
OrcaContext.log_output = True

cluster_mode = "yarn"

executor_cores = 36
num_executor = 6
executor_memory = "96g"
driver_cores = 4
driver_memory = "36g"
conf = {"spark.network.timeout": "10000000",
        "spark.sql.broadcastTimeout": "7200",
        "spark.sql.shuffle.partitions": "2000",
        "spark.locality.wait": "0s",
        "spark.sql.crossJoin.enabled": "true",
        "spark.task.cpus": "1",
        "spark.executor.heartbeatInterval": "200s",
        "spark.driver.maxResultSize": "40G",
        "spark.eventLog.enabled": "true",
        "spark.app.name": "recsys-demo-preprocess",
        "spark.debug.maxToStringFields": "100"}
if cluster_mode == "local":  # For local machine
    sc = init_orca_context(cluster_mode="local",
                           cores=executor_cores, memory=executor_memory)
elif cluster_mode == "yarn":  # For Hadoop/YARN cluster
    sc = init_orca_context(cluster_mode="yarn", cores=executor_cores,
                           num_nodes=num_executor, memory=executor_memory,
                           driver_cores=driver_cores, driver_memory=driver_memory,
                           conf=conf)


Initializing orca context
Current pyspark location is : /root/anaconda3/envs/bigdl/lib/python3.7/site-packages/pyspark/__init__.py
Initializing SparkContext for yarn-client mode
Start to pack current python env
Collecting packages...
Packing environment at '/root/anaconda3/envs/bigdl' to '/tmp/tmp0d6y97wf/python_env.tar.gz'
[########################################] | 100% Completed | 15.0s
Packing has been completed: /tmp/tmp0d6y97wf/python_env.tar.gz
pyspark_submit_args is: --master yarn --deploy-mode client --archives /tmp/tmp0d6y97wf/python_env.tar.gz#python_env --driver-cores 4 --driver-memory 36g --num-executors 6 --executor-cores 36 --executor-memory 96g --driver-class-path /root/anaconda3/envs/bigdl/lib/python3.7/site-packages/bigdl/share/dllib/lib/bigdl-dllib-spark_2.4.6-2.0.0-jar-with-dependencies.jar:/root/anaconda3/envs/bigdl/lib/python3.7/site-packages/bigdl/share/orca/lib/bigdl-orca-spark_2.4.6-2.0.0-jar-with-dependencies.jar:/root/anaconda3/envs/bigdl/lib/python3.7/site-

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


2022-03-21 13:23:42,002 Thread-5 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-03-21 13:23:42,005 Thread-5 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-03-21 13:23:42,006 Thread-5 WARN The bufferSize is set to 4000 but bufferedIo is false: false
2022-03-21 13:23:42,007 Thread-5 WARN The bufferSize is set to 4000 but bufferedIo is false: false
22-03-21 13:23:42 [Thread-5] INFO  Engine$:121 - Auto detect executor number and executor cores number
22-03-21 13:23:42 [Thread-5] INFO  Engine$:123 - Executor number is 6 and executor cores number is 36



User settings:

   KMP_AFFINITY=granularity=fine,compact,1,0
   KMP_BLOCKTIME=0
   KMP_SETTINGS=1
   OMP_NUM_THREADS=1

Effective settings:

   KMP_ABORT_DELAY=0
   KMP_ADAPTIVE_LOCK_PROPS='1,1024'
   KMP_ALIGN_ALLOC=64
   KMP_ALL_THREADPRIVATE=224
   KMP_ATOMIC_MODE=2
   KMP_BLOCKTIME=0
   KMP_CPUINFO_FILE: value is not defined
   KMP_DETERMINISTIC_REDUCTION=false
   KMP_DEVICE_THREAD_LIMIT=2147483647
   KMP_DISP_HAND_THREAD=false
   KMP_DISP_NUM_BUFFERS=7
   KMP_DUPLICATE_LIB_OK=false
   KMP_FORCE_REDUCTION: value is not defined
   KMP_FOREIGN_THREADS_THREADPRIVATE=true
   KMP_FORKJOIN_BARRIER='2,2'
   KMP_FORKJOIN_BARRIER_PATTERN='hyper,hyper'
   KMP_FORKJOIN_FRAMES=true
   KMP_FORKJOIN_FRAMES_MODE=3
   KMP_GTID_MODE=3
   KMP_HANDLE_SIGNALS=false
   KMP_HOT_TEAMS_MAX_LEVEL=1
   KMP_HOT_TEAMS_MODE=0
   KMP_INIT_AT_FORK=true
   KMP_ITT_PREPARE_DELAY=0
   KMP_LIBRARY=throughput
   KMP_LOCK_KIND=queuing
   KMP_MALLOC_POOL_INCR=1M
   KMP_MWAIT_HINTS=0
   KMP_NUM_LOCKS_IN_BLOCK=1
   KMP_

22-03-21 13:23:42 [Thread-5] INFO  ThreadPool$:95 - Set mkl threads to 1 on thread 28
2022-03-21 13:23:42 WARN  SparkContext:66 - Using an existing SparkContext; some configuration may not take effect.
22-03-21 13:23:42 [Thread-5] INFO  Engine$:446 - Find existing spark context. Checking the spark conf...
cls.getname: com.intel.analytics.bigdl.dllib.utils.python.api.Sample
BigDLBasePickler registering: bigdl.dllib.utils.common  Sample
cls.getname: com.intel.analytics.bigdl.dllib.utils.python.api.EvaluatedResult
BigDLBasePickler registering: bigdl.dllib.utils.common  EvaluatedResult
cls.getname: com.intel.analytics.bigdl.dllib.utils.python.api.JTensor
BigDLBasePickler registering: bigdl.dllib.utils.common  JTensor
cls.getname: com.intel.analytics.bigdl.dllib.utils.python.api.JActivity
BigDLBasePickler registering: bigdl.dllib.utils.common  JActivity


Load raw train and validation data as FeatureTables.

In [4]:
train_path = "/path/to/train/data"
valid_path = "/path/to/valid/data"

train_tbl = FeatureTable.read_parquet(train_path)
valid_tbl = FeatureTable.read_parquet(valid_path)

print("Total number of train records: {}".format(train_tbl.size()))
print("Total number of validation records: {}".format(valid_tbl.size()))

                                                                                

Total number of train records: 747694282




Total number of validation records: 14461760



                                                                                

Common features for recommendation include boolean features, categorical features (mostly string) and continuous (numeric) features. Several typical features of these types in this dataset are listed below.

In [5]:
bool_cols = [
    'engaged_with_user_is_verified',
    'enaging_user_is_verified'
]

count_cols = [
    'engaged_with_user_follower_count',
    'engaged_with_user_following_count',
    'enaging_user_follower_count',
    'enaging_user_following_count'
]

cat_cols = [
    'present_media',
    'tweet_type',
    'language'
]

<img src="figures/feature.png" alt="feature" width="750"/>

Now we let's start the data preprocessing for both the train and validation dataset! With the built-in high-level preprocessing operations in FeatureTable, you can achieve this using only several lines of code. :) 

- Fill null with default values.
- For boolean features, we simply cast them to integers (either 0 or 1).
- For categorical features:
    - If all the categories are already known (for example in this dataset, there will be only 13 present_media options and 3 tweet_type options shown below), to save computation, you can directly assign each category with an id by yourself and use the string_index map to encode the features to the corresponding ids.
    - Alternatively, the StringIndex for the categories that appear in the dataset will be generated first and then the original features will be encoded to the corresponding ids.
- For continuous features:
    - If the values vary along a large range, for example in this dataset, the following/follower counts may vary from 0 to million, we can put them into discrete bins and assign each bin with an index. In this case, the resulting features will actually be converted to categorical features. In the following example, 8 bins are used for following/follower counts (range: <1, 1-1e2, 1e2-1e3, 1e3-1e4, 1e4-1e5, 1e5-1e6, 1e6-1e7, >1e7) and they will be represented by 0 to 7 respectively.
    - If the values don't vary a lot, min max scaling is a common approach to rescale the continuous values to the range [0, 1].

In [7]:
media_map = {
    '': 0,
    'GIF': 1,
    'GIF_GIF': 2,
    'GIF_Photo': 3,
    'GIF_Video': 4,
    'Photo': 5,
    'Photo_GIF': 6,
    'Photo_Photo': 7,
    'Photo_Video': 8,
    'Video': 9,
    'Video_GIF': 10,
    'Video_Photo': 11,
    'Video_Video': 12
}

type_map = {
    'Quote': 0,
    'Retweet': 1,
    'TopLevel': 2,
}

In [8]:
def preprocess(tbl):
    tbl = tbl.fillna("", "present_media")
    tbl = tbl.cast(bool_cols + count_cols, "int")  # cast bool and long to int
    tbl = tbl.cut_bins(columns=count_cols,
                       bins=[1, 1e2, 1e3, 1e4, 1e5, 1e6, 1e7],
                       out_cols=count_cols)
    if "present_media" in cat_cols:
        process_media = lambda x: '_'.join(x.split('\t')[:2])
        tbl = tbl.apply("present_media", "present_media", process_media, "string")
        tbl = tbl.encode_string("present_media", media_map)
    if "tweet_type" in cat_cols:
        tbl = tbl.encode_string("tweet_type", type_map)

    return tbl


train_tbl = preprocess(train_tbl)
valid_tbl = preprocess(valid_tbl)

For the language feature, we first of all assign each languages that appear in the train dataset with ids starting from 1 and 0 is reserved for unknown languages. Note that for the validation dataset, we use the StringIndex generated by the train dataset to encode its language features. It is a common case that the new dataset may have unseen categories and we use the reserved index 0 to fill null values and encode them.

In [9]:
if "language" in cat_cols:
    train_tbl, language_idx = train_tbl.category_encode("language")
    valid_tbl = valid_tbl.encode_string("language", language_idx)
    valid_tbl = valid_tbl.fillna(0, "language")

    print("The number of languages: {}".format(language_idx.size()))

                                                                                

The number of languages: 66


Besides the preprocessing steps above, we can also generate new features from the existing features. For example,

- Generating cross columns given multiple categorical columns is a common technique for Wide & Deep learning to memorize the co-appearance of joint features.
- We also count the number of hashtags, present domains and present links and treat them as continuous features.

In [10]:
def generate_features(tbl):
    cross_cols = [['present_media', 'language']]
    cross_dims = [600]
    tbl = tbl.cross_columns(cross_cols, cross_dims)  # The resulting cross column will have name "present_media_language"

    count_func = lambda x: str(x).count('\t') + 1 if x else 0
    tbl = tbl.apply("hashtags", "len_hashtags", count_func, "int") \
        .apply("present_domains", "len_domains", count_func, "int") \
        .apply("present_links", "len_links", count_func, "int")
    return tbl


train_tbl = generate_features(train_tbl)
valid_tbl = generate_features(valid_tbl)

For the number of hashtags, present domains and present links, we use min max scaling to rescale them to the range [0, 1] as described above. Similarly, we use the min and max stats of the train dataset to transform the validation dataset as it is a common case that the new dataset may have out-of-range values.

In [11]:
len_cols = ['len_hashtags',
            'len_domains',
            'len_links']

train_tbl, min_max_dict = train_tbl.min_max_scale(len_cols)
valid_tbl = valid_tbl.transform_min_max_scale(len_cols, min_max_dict)

                                                                                

Besides the preprocessing and feature engineering operations described above, BigDL provides a lot more built-in operations including: target encoding, count encoding, difference lag, negative sampling, etc. See [here](https://bigdl.readthedocs.io/en/latest/doc/PythonAPI/Friesian/feature.html#bigdl.friesian.feature.table.FeatureTable) for more details and API usage.

Finally, we are to process the label for supervised learning. The four timestamp features indicate whether the engaging user interact with the tweet and they will be jointly used to produce the label.

If one of the timestamps is not null, namely the engaging user has at least one interaction with the tweet, the record will be treated as a positive sample (i.e. having label 1).

In [12]:
timestamp_cols = [
    'reply_timestamp',
    'retweet_timestamp',
    'retweet_with_comment_timestamp',
    'like_timestamp'
]

In [13]:
def transform_label(tbl):
    tbl = tbl.cast(timestamp_cols, "int")
    tbl = tbl.fillna(0, timestamp_cols)
    gen_label = lambda x: 1 if max(x) > 0 else 0
    tbl = tbl.apply(in_col=timestamp_cols, out_col="label", func=gen_label, dtype="int")
    return tbl


train_tbl = transform_label(train_tbl)
valid_tbl = transform_label(valid_tbl)

Now we are all settled! We have finished all the preprocessing steps for this dataset.

Let's take a look at the preprocessed dataset and finally save the preprocessed data to be used for Wide & Deep training.

In [14]:
train_tbl.select(bool_cols + cat_cols).show(5)

+-----------------------------+------------------------+-------------+----------+--------+
|engaged_with_user_is_verified|enaging_user_is_verified|present_media|tweet_type|language|
+-----------------------------+------------------------+-------------+----------+--------+
|                            0|                       0|            0|         1|      40|
|                            0|                       0|            5|         1|      43|
|                            0|                       0|            7|         2|      43|
|                            1|                       0|            0|         1|      43|
|                            0|                       0|            0|         1|      43|
+-----------------------------+------------------------+-------------+----------+--------+
only showing top 5 rows



In [15]:
train_tbl.select(count_cols).show(5)

+--------------------------------+---------------------------------+---------------------------+----------------------------+
|engaged_with_user_follower_count|engaged_with_user_following_count|enaging_user_follower_count|enaging_user_following_count|
+--------------------------------+---------------------------------+---------------------------+----------------------------+
|                               2|                                2|                          1|                           1|
|                               2|                                2|                          2|                           2|
|                               4|                                4|                          2|                           2|
|                               5|                                4|                          2|                           2|
|                               2|                                2|                          3|                      


[Stage 30:>                                                         (0 + 1) / 1]
                                                                                

In [16]:
train_tbl.select(len_cols + ["present_media_language", "label"]).show(5)


[Stage 31:>                                                         (0 + 1) / 1]

+------------+-----------+---------+----------------------+-----+
|len_hashtags|len_domains|len_links|present_media_language|label|
+------------+-----------+---------+----------------------+-----+
|         0.0|        0.0|      0.0|                   123|    0|
|         0.0|        0.0|      0.0|                   281|    1|
|         0.0|        0.0|      0.0|                   463|    0|
|         0.0|        0.0|      0.0|                   126|    1|
|         0.0|        0.0|      0.0|                   126|    0|
+------------+-----------+---------+----------------------+-----+
only showing top 5 rows




                                                                                

In [17]:
train_tbl.write_parquet("/path/to/preprocessed/train/data")
valid_tbl.write_parquet("/path/to/preprocessed/valid/data")

                                                                                

Also save the StringIndex for each categorical feature to parquet files. The mapping from string to its corresponding id would be used in the later stages (e.g. to encode new data).

In [18]:
model_path = "/path/to/models/data"

if "language" in cat_cols:
    language_idx.write_parquet(model_path)  # Saved to model_path/language.parquet

if "present_media" in cat_cols:
    media_idx = StringIndex.from_dict(media_map, "present_media")
    media_idx.write_parquet(model_path)  # Saved to model_path/present_media.parquet

if "tweet_type" in cat_cols:
    type_idx = StringIndex.from_dict(type_map, "tweet_type")
    type_idx.write_parquet(model_path)  # Saved to model_path/tweet_type.parquet

    
stop_orca_context()

                                                                                

Stopping orca context
