# A custom timeseries `DataPipe`

author: Thomas Capelle (tcapelle@wandb.com)

We will make a custom timeseries processing `DataPipe` to slice with a rolling window over data. Let's grab a [Kaggle dataset](https://www.kaggle.com/tarunpaparaju/apple-aapl-historical-stock-data) containing stock prices for the $AAPL (Apple Inc) company for the last 10 years.

In [1]:
import numpy as np
import pandas as pd

from torchdata.datapipes import functional_datapipe
import torchdata.datapipes.iter as pipes

In [2]:
df = pd.read_csv("data/HistoricalQuotes.csv")
df.head(10)

Unnamed: 0,Date,Close/Last,Volume,Open,High,Low
0,02/28/2020,$273.36,106721200,$257.26,$278.41,$256.37
1,02/27/2020,$273.52,80151380,$281.1,$286,$272.96
2,02/26/2020,$292.65,49678430,$286.53,$297.88,$286.5
3,02/25/2020,$288.08,57668360,$300.95,$302.53,$286.13
4,02/24/2020,$298.18,55548830,$297.26,$304.18,$289.23
5,02/21/2020,$313.05,32426420,$318.62,$320.45,$310.5
6,02/20/2020,$320.3,25141490,$322.63,$324.65,$318.21
7,02/19/2020,$323.62,23495990,$320,$324.57,$320
8,02/18/2020,$319,38190550,$315.36,$319.75,$314.61
9,02/14/2020,$324.95,20028450,$324.74,$325.98,$322.85


we can parse the CSV file `pipes.CSVParser`

In [3]:
datapipe = pipes.IterableWrapper(["data/HistoricalQuotes.csv"])

# we will skip the header
csv = pipes.FileOpener(datapipe, mode='rt').parse_csv(delimiter=',', skip_lines=1)

every iteration returns a `row` of data

In [4]:
next(iter(csv))

['02/28/2020', ' $273.36', ' 106721200', ' $257.26', ' $278.41', ' $256.37']

let's create a function to convert the `Close/Last` string to a float

In [5]:
def parse_price(dp):
    "select column `close` and cast to `float`"
    date, close, vol, open, high, low = dp
    return float(close.strip().replace("$", ""))

we can map the function with the `pipes.Mapper` class 

In [6]:
prices = csv.map(parse_price)

and check that we get a float, everything looks fine 😃

In [7]:
next(iter(prices))

273.36

## Custom Rolling Window
> We want to slice multiple values at the same time. This is useful to train a forecasting model afterwards.

![](data/df_window.png)

Slicing 5 values at the same time with a step of 2

In [8]:
import itertools

@functional_datapipe("rolling")
class RollingWindow(pipes.IterDataPipe):
    def __init__(self, source_dp: pipes.IterDataPipe, window_size, step=1) -> None:
        super().__init__()
        self.source_dp = source_dp
        self.window_size = window_size
        self.step = step
    
    def __iter__(self):
        it = iter(self.source_dp)
        cur = []
        while True:
            try:
                while len(cur) < self.window_size:
                    cur.append(next(it))
                yield np.array(cur)
                for _ in range(self.step):
                    if cur:
                        cur.pop(0)
                    else:
                        next(it)
            except StopIteration:
                return

In [9]:
dp = RollingWindow(prices, 5, step=2)
it = iter(dp)
list(itertools.islice(it, 4))

[array([273.36, 273.52, 292.65, 288.08, 298.18]),
 array([292.65, 288.08, 298.18, 313.05, 320.3 ]),
 array([298.18, 313.05, 320.3 , 323.62, 319.  ]),
 array([320.3 , 323.62, 319.  , 324.95, 324.87])]

## Final Pipeline
> Putting everything together

In [10]:
datapipe = pipes.IterableWrapper(["data/HistoricalQuotes.csv"])
ds  = (pipes.FileOpener(datapipe, mode='rt').parse_csv(delimiter=',', skip_lines=1)
            .map(parse_price)
            .rolling(window_size=5, step=1)  # this is created by the decorator @rolling
            .batch(4)
      )
            

In [11]:
next(iter(ds))

[array([273.36, 273.52, 292.65, 288.08, 298.18]),
 array([273.52, 292.65, 288.08, 298.18, 313.05]),
 array([292.65, 288.08, 298.18, 313.05, 320.3 ]),
 array([288.08, 298.18, 313.05, 320.3 , 323.62])]

> This article was originally published in the [Weights and Biases blog](http://wandb.me/torchdata)