# Timeseries Transformer

In [1]:
import os
os.environ['TF_CPP_MIN_LOG_LEVEL'] = '2' # https://stackoverflow.com/a/64438413

In [2]:
from __future__ import annotations
import glob
import inspect
import json
import logging
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from pathlib import Path
import seaborn as sns
import sys
import tensorflow as tf
import tensorflow.keras as keras
from typing import Callable

In [3]:
sns.set() # Use seaborn themes.

## Environment Setup

This section contains code that is modifies output path locations, random seed, and logging.

In [4]:
DATASET_ROOT = Path('/Users/derieux/research/makassar/datasets').expanduser()
if not DATASET_ROOT.exists(): raise ValueError(f"Dataset root directory does not exist at {DATASET_ROOT}")
PROJECT_ROOT = Path('/Users/derieux/research/makassar').expanduser()
CHECKPOINT_ROOT = PROJECT_ROOT / 'checkpoints'
IMAGE_ROOT = PROJECT_ROOT / 'images'

# Ensure some directories exist.
PROJECT_ROOT.mkdir(parents=True, exist_ok=True)
CHECKPOINT_ROOT.mkdir(parents=True, exist_ok=True)
IMAGE_ROOT.mkdir(parents=True, exist_ok=True)

In [5]:
# Set random seeds.
SEED = 0
tf.random.set_seed(SEED) # Only this works on ARC (since tensorflow==2.4).

In [6]:
# Setup logging (useful for ARC systems).
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) # Must be lowest of all handlers listed below.
while logger.hasHandlers(): logger.removeHandler(logger.handlers[0]) # Clear all existing handlers.

# Custom log formatting.
formatter = logging.Formatter('%(asctime)s | %(levelname)s | %(message)s')

# Log to STDOUT (uses default formatting).
sh = logging.StreamHandler(stream=sys.stdout)
sh.setLevel(logging.INFO)
logger.addHandler(sh)

# Set Tensorflow logging level.
tf.get_logger().setLevel('ERROR') # 'INFO'

In [7]:
# List all GPUs visible to TensorFlow.
gpus = tf.config.list_physical_devices('GPU')
logger.info(f"Num GPUs Available: {len(gpus)}")
for gpu in gpus:
    logger.info(f"Name: {gpu.name}, Type: {gpu.device_type}")

Num GPUs Available: 0


## Window Subset Loader

The `WindowGenerator` class operates on `pandas.DataFrame` objects. It accomplishes several tasks:

1. Split the data into contiguous windows.
2. Convert the data into a `tensorflow.Dataset` class for batch loading.

In [None]:
# Inspiration: https://www.tensorflow.org/tutorials/structured_data/time_series#data_windowing
class WindowGenerator:
    def __init__(self,
        in_seq_len: int,
        out_seq_len: int,
        shift: int,
        train_df: pd.DataFrame,
        val_df: pd.DataFrame,
        test_df: pd.DataFrame,
        in_feat: list[str] = None,
        out_feat: list[str] = None,
        batch_size: int = 32,
        shuffle: bool = True,
        ):
        """Constructs sliding windows of sequential data.

        Data must already be split into train/val/test subsets,
        and provided as `pandas.DataFrame` objects.

        Args:
            in_seq_len (int): Input sequence length.
            out_seq_len (int): Output (target) sequence length.
            shift (int): Number of indices to skip between elements when traversing window.
            train_df (pd.DataFrame): Training data frame.
            val_df (pd.DataFrame): Validation data frame.
            test_df (pd.DataFrame): Testing data frame.
            in_feat (list[str], optional): Desired subset of input features for window. Defaults to None.
            out_feat (list[str], optional): Desired subset of output features for window. Defaults to None.
            batch_size (int, optional): Batch size. Defaults to 32.
            shuffle (bool, optional): Shuffle windows prior to batching. Defaults to True.
        """
        self.batch_size = batch_size
        self.shuffle = shuffle

        # Preserve dataframes.
        self.train_df = train_df
        self.val_df = val_df
        self.test_df = test_df

        # Preserve sequence information.
        self.in_seq_len = in_seq_len
        self.out_seq_len = out_seq_len
        self.shift = shift
        self.total_window_len = in_seq_len + shift

        # Setup indexing slices for window extraction.
        self.in_slice = slice(0, self.in_seq_len)
        self.out_slice = slice(self.total_window_len - self.out_seq_len, None)
        self.in_idx = np.arange(self.total_window_len)[self.in_slice]
        self.out_idx = np.arange(self.total_window_len)[self.out_slice]

        # Setup train/val/test column extractors.
        self.col2idx = {name: i for i, name in enumerate(train_df.columns)}
        if in_feat is not None:
            self.in_feat = in_feat
            self.in_col_idx = [self.col2idx[col] for col in in_feat]
        else:
            self.in_col_idx = list(range(len(train_df.columns)))
            self.in_feat = [train_df.columns[i] for i in self.in_col_idx]
        if out_feat is not None:
            self.out_feat = out_feat
            self.out_col_idx = [self.col2idx[col] for col in out_feat]
        else:
            self.out_col_idx = list(range(len(train_df.columns)))
            self.out_feat = [train_df.columns[i] for i in self.out_col_idx]

    def __repr__(self):
        """String representation of class."""
        return '\n'.join([
            f"Total window length: {self.total_window_len}",
            f"Input indices: {self.in_idx}",
            f"Output indices: {self.out_idx}",
            f"Input features: {self.in_feat}",
            f"Output features: {self.out_feat}",
        ])

    def split_window(self, 
        window: tf.Tensor, # window shape is (batch, seq, feat)
        ) -> tuple[tf.Tensor, tf.Tensor]:
        """Splits a single window of data into input/output seqments.

        Args:
            window (tf.Tensor): Tensor of window data with shape (batch, seq, feat).

        Returns:
            tuple[tf.Tensor, tf.Tensor]: 2-tuple of input/output data segments, where the shapes are:
                - Input window: (batch, in_seq_len, in_feat)
                - Output window: (batch, out_seq_len, out_feat)
        """
        # Decompose input/output sequence from given input window.
        in_seq = tf.stack([window[:, self.in_slice, i] for i in self.in_col_idx], axis=-1)
        out_seq = tf.stack([window[:, self.out_slice, i] for i in self.out_col_idx], axis=-1)

        # Set shape for input/output sequences.
        # Note that dimensions set to `None` are not updated.
        in_seq = tf.ensure_shape(in_seq, (None, self.in_seq_len, None))
        out_seq = tf.ensure_shape(out_seq, (None, self.out_seq_len, None))

        return in_seq, out_seq

    def make_dataset(self, 
        df: pd.DataFrame,
        batch_size: int = 32,
        shuffle: bool = True,
        ) -> tf.data.Dataset:
        """Construct a TensorFlow Dataset from given input data frame.

        Datasets load tuples of batched input/output windows with shapes:
            - Input window: (batch, in_seq_len, in_feat)
            - Output window: (batch, out_seq_len, out_feat)

        Note that output windows are generally target sequences.

        Args:
            df (pd.DataFrame): Source data frame.
            batch_size (int, optional): Batch size. Defaults to 32.
            shuffle (bool, optional): Shuffle windows prior to batching. Defaults to True.

        Returns:
            tf.data.Dataset: Dataset object.
        """

        # Convert data frame into numpy matrix.
        data = df.to_numpy()

        # Convert data matrix into TensorFlow dataset.
        # dataset = keras.utils.timeseries_dataset_from_array(
        dataset = keras.preprocessing.timeseries_dataset_from_array(
            data=data,
            targets=None,
            sequence_length=self.total_window_len,
            sequence_stride=self.shift,
            shuffle=shuffle,
            batch_size=batch_size,
        )

        # Pipe the raw dataset into the window splitting function.
        dataset = dataset.map(self.split_window)

        # Return the dataset.
        return dataset

    @property
    def train(self):
        """Training dataset."""
        return self.make_dataset(
            df=self.train_df,
            batch_size=self.batch_size,
            shuffle=self.shuffle,
        )

    @property
    def val(self):
        """Validation dataset."""
        return self.make_dataset(
            df=self.val_df,
            batch_size=self.batch_size,
            shuffle=self.shuffle,
        )

    @property
    def test(self):
        """Testing dataset."""
        return self.make_dataset(
            df=self.test_df,
            batch_size=self.batch_size,
            shuffle=self.shuffle,
        )

## Beijing PM2.5 Dataset

In [19]:
def partition_dataset_df(
    df: pd.DataFrame,
    split: tuple[float, float, float] = (0.8, 0.1, 0.1),
    ) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
    """Split a Pandas dataframe into train, validation, and test subsets.

    Args:
        df (pd.DataFrame): Source dataframe.
        split (tuple[float, float, float], optional): Tuple of split ratios. Must sum to 1. Defaults to (0.8, 0.1, 0.1).

    Returns:
        tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: Tuple of train, validation, and test dataframes.
    """
    assert isinstance(split, (list, tuple))
    assert len(split) == 3
    assert sum(split) == 1.0

    # Split dataframe into train/val/test dataframes.
    # Inspiration: https://www.tensorflow.org/tutorials/structured_data/time_series#split_the_data
    train_split, val_split, test_split = split # Unpack split tuple.
    n = len(df.index) # Total number of data records.
    df_train = df[:int(n*train_split)].copy()
    df_val = df[int(n*train_split):int(n*(1-test_split))].copy()
    df_test = df[int(n*(1-test_split)):].copy()
    return df_train, df_val, df_test

In [None]:
def load_beijingpm25_df(
    path: str = None,
    ) -> pd.DataFrame:
    """Loads Beijing PM2.5 dataset as a pandas dataframe.

    https://archive-beta.ics.uci.edu/ml/datasets/beijing+pm2+5+data

    Dataset features:
        - `No`: (NOT USED) row number
        - `year`: year of data in this row
        - `month`: month of data in this row
        - `day`: day of data in this row
        - `hour`: hour of data in this row
        - `pm2.5`: PM2.5 concentration (ug/m^3)
        - `DEWP`: Dew Point (â„ƒ)
        - `TEMP`: Temperature (â„ƒ)
        - `PRES`: Pressure (hPa)
        - `cbwd`: (NOT USED) Combined wind direction
        - `Iws`: Cumulated wind speed (m/s)
        - `Is`: Cumulated hours of snow
        - `Ir`: Cumulated hours of rain
        - `datetime`: (NOT USED) dynamically generated datetime string
    """
    url = 'https://archive.ics.uci.edu/ml/machine-learning-databases/00381/PRSA_data_2010.1.1-2014.12.31.csv'

    # Set path to current directory if necessary.
    if path is None:
        path = os.getcwd()

    # Convert path to `Path` object.
    if not isinstance(path, Path):
        path = Path(path)

    # Create path if necessary.
    path.mkdir(parents=True, exist_ok=True)

    # Download the dataset if necessary.
    filename = Path(url.rsplit('/', 1)[1]) # Get name of file.
    filepath = (path / filename).expanduser().resolve()
    filepath = keras.utils.get_file(
        fname=filepath,
        origin=url,
        )

    # Load as Pandas DataFrame.
    df = pd.read_csv(filepath)

    # Create single date column from independent year/month/day columns.
    df['datetime'] = pd.to_datetime(df[['year','month','day','hour']])

    return df


df = load_beijingpm25_df(DATASET_ROOT/'beijing_pm25')
print(df.info())

a, b, c = partition_dataset_df(df)
print(a.info())
print(b.info())
print(c.info())