# Transformer For Variable Length Sequences


In [1]:
# !conda install -c conda-forge pyspark
# !pip show pyspark
# !java -version

In [2]:
import os
import sys
import datetime
import itertools
from pyspark.sql import  SparkSession, Row, functions as F
from pyspark.sql.types import ArrayType, StringType, IntegerType, BooleanType

import utils
from ts2vec import datautils

In [3]:
DATA_PATH = './data'
TB_DATASET = 'UserBehaviorSample.csv'

## 1. 集成到 `utils.py`

为了方便复用，将上一节的时间序列 Transformer 模型写成一个类 `TSTransformer`，集成到 `utils.py` 中。

In [4]:
# # 分割训练集和验证集
# X_train, y_train, X_test, y_test = datautils.load_UCR('ECG200')

# tst = utils.TSTransformer(input_dim=None,
#                           model_dim=8,
#                           nhead=4,
#                           num_layers=2,
#                           hidden_dim=32,
#                           num_classes=2,
#                           num_epochs=100,
#                           batch_size=25)
# predict, metrics = tst(X_train, y_train, X_test, y_test)

In [5]:
# print(f'Top 5 predict labels: {predict[:5]}')

# for k, v in metrics.items():
#     print(f'{k}: {v:.4f}')

## 2. 不定长序列

上一节，我们生成了定长序列的嵌入表示，本节我们来尝试不定长序列。

淘宝用户行为数据集是一个不定长的行为序列数据集，链接：[TianChi](https://tianchi.aliyun.com/dataset/649) | [Kaggle](https://www.kaggle.com/datasets/marwa80/userbehavior)

从上述链接，将数据文件 `UserBehavior.csv` 下载到 `data` 文件夹中。由于数据文件极大，本文只取前 10000 条数据，用于演示。

各字段含义如下：

|Field|Explanation|
| -- | -- |
|User ID|An integer, the serialized ID that represents a user|
|Item ID|An integer, the serialized ID that represents an item|
|Category ID|An integer, the serialized ID that represents the category which the corresponding item belongs to|
|Behavior type|A string, enum-type from ('pv', 'buy', 'cart', 'fav')|
|Timestamp|An integer, the timestamp of the behavior|

### 2.1 导入数据

In [6]:
# 导入数据
csv_path = utils.gen_abspath(DATA_PATH, TB_DATASET)
df = utils.read_csv(csv_path, header=None)
df.columns = ['User ID', 'Item ID', 'Category ID', 'Behavior type', 'Timestamp']

df

Unnamed: 0,User ID,Item ID,Category ID,Behavior type,Timestamp
0,1,2268318,2520377,pv,1511544070
1,1,2333346,2520771,pv,1511561733
2,1,2576651,149192,pv,1511572885
3,1,3830808,4181361,pv,1511593493
4,1,4365585,2520377,pv,1511596146
...,...,...,...,...,...
9996,1000436,1542569,3619575,buy,1512184723
9997,1000436,2279117,405755,pv,1512184785
9998,1000436,3249912,2920476,pv,1512184868
9999,1000436,921462,1888306,fav,1512184965


### 2.2 数据预处理

- 对文本字段进行编码
- 将时间戳转换成时间

In [7]:
# 将 Behavior type 编码成数字
behavior_type = df['Behavior type']
cv = utils.Convert(behavior_type)
df['Behavior type'] = df['Behavior type'].apply(cv.encoder)

# 将时间戳转换成日期
df['Time'] = df['Timestamp'].apply(datetime.datetime.fromtimestamp)

df

Unnamed: 0,User ID,Item ID,Category ID,Behavior type,Timestamp,Time
0,1,2268318,2520377,0,1511544070,2017-11-25 01:21:10
1,1,2333346,2520771,0,1511561733,2017-11-25 06:15:33
2,1,2576651,149192,0,1511572885,2017-11-25 09:21:25
3,1,3830808,4181361,0,1511593493,2017-11-25 15:04:53
4,1,4365585,2520377,0,1511596146,2017-11-25 15:49:06
...,...,...,...,...,...,...
9996,1000436,1542569,3619575,3,1512184723,2017-12-02 11:18:43
9997,1000436,2279117,405755,0,1512184785,2017-12-02 11:19:45
9998,1000436,3249912,2920476,0,1512184868,2017-12-02 11:21:08
9999,1000436,921462,1888306,2,1512184965,2017-12-02 11:22:45


### 2.3 生成行为序列数据

对数据进行 reshape 操作，将数据做成 `(样本量, 步长, 特征数)` 的形状。

> 在本地运行 Spark 可能需要一些配置：
> - `spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")` enables the use of Apache Arrow for data exchange between PySpark (Python) and JVM (Java Virtual Machine) in Spark.

In [8]:
python_path = sys.executable

os.environ['PYSPARK_PYTHON'] = python_path
os.environ['PYSPARK_DRIVER_PYTHON'] = python_path

# 创建 SparkSession
spark = SparkSession.builder \
    .appName("App") \
    .config("spark.pyspark.python", python_path) \
    .config("spark.pyspark.driver.python", python_path) \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.cores", 2) \
    .master("local[*]") \
    .getOrCreate()

# 将 Pandas DataFrame 加载到 Spark
spark_df = spark.createDataFrame(df)
spark_df.show(5, truncate=False)

+-------+-------+-----------+-------------+----------+-------------------+
|User ID|Item ID|Category ID|Behavior type|Timestamp |Time               |
+-------+-------+-----------+-------------+----------+-------------------+
|1      |2268318|2520377    |0            |1511544070|2017-11-25 01:21:10|
|1      |2333346|2520771    |0            |1511561733|2017-11-25 06:15:33|
|1      |2576651|149192     |0            |1511572885|2017-11-25 09:21:25|
|1      |3830808|4181361    |0            |1511593493|2017-11-25 15:04:53|
|1      |4365585|2520377    |0            |1511596146|2017-11-25 15:49:06|
+-------+-------+-----------+-------------+----------+-------------------+
only showing top 5 rows



In [9]:
# 对特征做预聚合 Item ID, Category ID, Behavior type, Timestamp
# 染红按 User ID 分组，对 item_rec 去重，并用 `;` 拼接
items_df = spark_df.withColumn('features', F.concat_ws('_', F.col('Item ID'), F.col('Category ID'), F.col('Behavior type'), F.col('Timestamp'))) \
    .withColumn('item_rec', F.concat_ws(',', F.col('Timestamp'), F.col('features'))) \
    .groupBy("User ID").agg(F.concat_ws(";", F.collect_set("item_rec")) \
    .alias("item_list"))

items_df.show(5)

+-------+--------------------+
|User ID|           item_list|
+-------+--------------------+
|1000001|1511881921,158285...|
|      1|1511910242,409206...|
|   1000|1512042775,190767...|
|    100|1511684109,383911...|
|1000004|1511966192,899145...|
+-------+--------------------+
only showing top 5 rows



In [10]:
def sorted_items(items: str,
                 main_delimiter: str = ';',
                 minor_delimiter: str = ','):
    """按时间顺序对用户行为排序"""
    if len(items) == 0:
        return []

    item_list = []
    for kv in items.split(main_delimiter):
        kv_list = kv.split(minor_delimiter)
        if len(kv_list) == 2:
            item_list.append(kv_list)

    sorted_list = sorted(item_list, key=lambda e: e[0])
    sorted_items = [int(e[1]) for e in sorted_list]
    return [[key for key, _ in itertools.groupby(sorted_items)]]

sorted_items('2015-03-15,0;2015-05-27,3;2015-03-16,0;2014-03-16,1')

[[1, 0, 3]]

In [12]:
# # 将函数注册为 Spark UDF，返回值类型为 Integer
# sort_udf = F.udf(sorted_items, ArrayType(ArrayType(IntegerType())))

# # 在 DataFrame 上应用此函数
# sorted_items_df = items_df.withColumn("item_list", sort_udf(F.col("item_list"))) \
#     .withColumnRenamed("item_list", "sequence")

# sorted_items_df.show(10, truncate=False)