Such preparation is essential for training **multi-task models** that learn to predict multiple user intentions simultaneously, while preserving the **temporal dependencies** between actions.

## Tasks Performed

- Load and clean the original dataset
- Filter users with fewer than 10 actions
- Enforce correct behavioral sequence by removing out-of-order records
- Generate session IDs based on 30-minute inactivity windows
- Split the data into training, validation, and test sets based on timestamp
- Save each set as a compressed CSV file

## Dataset Schema

After preprocessing, the dataset contains the following columns:
- `UserID`: unique identifier for the user
- `ItemID`: unique identifier for the item
- `CategoryID`: identifier for the item's category
- `pv`, `fav`, `cart`, `buy`: binary columns indicating behavior type
- `SessionID`: identifier for a browsing session

## File Outputs

- `train_taobao_process.csv.gz` (2017-11-25 to 2017-11-28)
- `val_taobao_process.csv.gz` (2017-11-29 to 2017-11-30)
- `test_taobao_process.csv.gz` (2017-12-01 to 2017-12-03)

## Environment

This project uses [PySpark](https://spark.apache.org/docs/latest/api/python/) for distributed preprocessing. Tested in a Kaggle environment with Spark 3.x.

## How to Run

1. You can access the dataset directly in a Kaggle notebook using the following path:

`/kaggle/input/userbehavior/UserBehavior.csv`
Dataset link: [marwa80/userbehavior](https://www.kaggle.com/datasets/marwa80/userbehavior)
   Use this path to read the CSV file using pandas or PySpark.
3. Run the full preprocessing script (`.ipynb` or `.py`) provided in this repository
4. The processed files will be saved to `/kaggle/working/`

## Use Cases

This processed dataset is ideal for:
- Sequential recommendation modeling
- Multi-task learning (e.g., joint prediction of clicks and purchases)
- Session-based prediction models
- Contrastive learning and temporal pattern mining

## Citation

If you use this code or the preprocessed dataset, please cite:

Marwa Hamdi El-Sherief, Mohamed Helmy Khafagy and Asmaa Hashem Sweidan, “Multitask Model with an Attention Mechanism for Sequentially Dependent Online User Behaviors to Enhance Audience Targeting” International Journal of Advanced Computer Science and Applications(IJACSA), 16(4), 2025. http://dx.doi.org/10.14569/IJACSA.2025.01604112

Dataset: Marwa Hamdi. (2025). Preprocessed Taobao User Behavior Dataset for Sequential Modeling. Kaggle. https://www.kaggle.com/datasets/marwa80/userbehavior


## License

This project is for academic and non-commercial use only.



In [1]:
# This Python 3 environment comes with many helpful analytics libraries installed
# It is defined by the kaggle/python Docker image: https://github.com/kaggle/docker-python
# For example, here's several helpful packages to load

import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)

# Input data files are available in the read-only "../input/" directory
# For example, running this (by clicking run or pressing Shift+Enter) will list all files under the input directory

import os
for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

# You can write up to 20GB to the current directory (/kaggle/working/) that gets preserved as output when you create a version using "Save & Run All" 
# You can also write temporary files to /kaggle/temp/, but they won't be saved outside of the current session

/kaggle/input/userbehavior/UserBehavior.csv


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, when, lag, unix_timestamp, sum as spark_sum
from pyspark.sql.window import Window
from pyspark.sql.types import IntegerType

# Step 1: Create Spark session
spark = SparkSession.builder.appName("TaobaoPreprocessing").getOrCreate()

# Step 2: Load the dataset
df = spark.read.csv("/kaggle/input/userbehavior/UserBehavior.csv", header=False)
df = df.withColumnRenamed("_c0", "UserID") \
       .withColumnRenamed("_c1", "ItemID") \
       .withColumnRenamed("_c2", "CategoryID") \
       .withColumnRenamed("_c3", "Behavior") \
       .withColumnRenamed("_c4", "Timestamp")

# Step 3: Convert timestamp to datetime format
df = df.withColumn("Timestamp", col("Timestamp").cast("long"))
df = df.withColumn("Datetime", to_timestamp(col("Timestamp")))

# Step 4: Filter data within the target date range
df = df.filter((col("Datetime") >= "2017-11-25") & (col("Datetime") <= "2017-12-03"))

# Step 5: Create binary columns for each behavior type
df = df.withColumn("pv", when(col("Behavior") == "pv", 1).otherwise(0)) \
       .withColumn("fav", when(col("Behavior") == "fav", 1).otherwise(0)) \
       .withColumn("cart", when(col("Behavior") == "cart", 1).otherwise(0)) \
       .withColumn("buy", when(col("Behavior") == "buy", 1).otherwise(0)) \
       .drop("Behavior", "Timestamp")

# Step 6: Keep only users with at least 10 actions
user_counts = df.groupBy("UserID").count()
valid_users = user_counts.filter(col("count") >= 10).select("UserID")
df = df.join(valid_users, on="UserID", how="inner")

# Step 7: Enforce correct behavior sequence (pv → fav → cart → buy)
df = df.withColumn("Step",
                   when(col("pv") == 1, 1)
                   .when(col("fav") == 1, 2)
                   .when(col("cart") == 1, 3)
                   .when(col("buy") == 1, 4)
                   .otherwise(0))

w = Window.partitionBy("UserID").orderBy("Datetime")
df = df.withColumn("PrevStep", lag("Step").over(w))
df = df.withColumn("Valid", when((col("PrevStep").isNull()) | (col("Step") >= col("PrevStep")), 1).otherwise(0))
df = df.filter(col("Valid") == 1).drop("PrevStep", "Valid", "Step")

# Step 8: Generate session IDs based on time difference > 30 minutes
df = df.withColumn("TimeDiff", unix_timestamp(col("Datetime")) - unix_timestamp(lag("Datetime").over(w)))
df = df.withColumn("NewSession", when((col("TimeDiff") > 1800) | col("TimeDiff").isNull(), 1).otherwise(0))
df = df.withColumn("SessionID", spark_sum("NewSession").over(w))
df = df.drop("TimeDiff", "NewSession")


In [4]:
df.show(20)

[Stage 6:>                                                                              (0 + 1) / 1]

+-------+-------+----------+-------------------+---+---+----+---+---------+
| UserID| ItemID|CategoryID|           Datetime| pv|fav|cart|buy|SessionID|
+-------+-------+----------+-------------------+---+---+----+---+---------+
|1000015|3243563|   2394030|2017-11-26 00:46:00|  1|  0|   0|  0|        1|
|1000015|4758554|   4339722|2017-11-26 08:16:21|  1|  0|   0|  0|        2|
|1000015|1711102|   2394030|2017-11-26 08:17:48|  1|  0|   0|  0|        2|
|1000015|5116964|   3002561|2017-11-28 02:26:50|  1|  0|   0|  0|        3|
|1000015|4005236|   3002561|2017-11-28 02:29:47|  1|  0|   0|  0|        3|
|1000015|2006192|   3002561|2017-11-28 02:30:28|  1|  0|   0|  0|        3|
|1000015| 592457|   3002561|2017-11-28 02:30:47|  1|  0|   0|  0|        3|
|1000015|5012596|   3607361|2017-11-29 01:15:52|  1|  0|   0|  0|        4|
|1000015| 999099|   3002561|2017-11-29 01:16:44|  1|  0|   0|  0|        4|
|1000015|1317359|   3002561|2017-11-29 01:42:01|  0|  0|   1|  0|        4|
|1000015|131

                                                                                                    

In [5]:
# Step 9: Split into train, validation, and test sets
train_start = "2017-11-25"
train_end   = "2017-11-28"
val_start   = "2017-11-29"
val_end     = "2017-11-30"
test_start  = "2017-12-01"
test_end    = "2017-12-03"

train_df = df.filter((col("Datetime") >= train_start) & (col("Datetime") <= train_end)).drop("Datetime")
val_df   = df.filter((col("Datetime") >= val_start)   & (col("Datetime") <= val_end)).drop("Datetime")
test_df  = df.filter((col("Datetime") >= test_start)  & (col("Datetime") <= test_end)).drop("Datetime")

# Step 10: Save output files as compressed CSV
train_df.write.csv("/kaggle/working/train_taobao_process.csv.gz", header=True, compression="gzip")
val_df.write.csv("/kaggle/working/val_taobao_process.csv.gz", header=True, compression="gzip")
test_df.write.csv("/kaggle/working/test_taobao_process.csv.gz", header=True, compression="gzip")

                                                                                                    