In [1]:
import pandas as pd
import numpy as np

from torchdata.datapipes import functional_datapipe
import torchdata.datapipes.iter as pipes

In [2]:
df = pd.read_csv("HistoricalQuotes.csv")
df.head()

Unnamed: 0,Date,Close/Last,Volume,Open,High,Low
0,02/28/2020,$273.36,106721200,$257.26,$278.41,$256.37
1,02/27/2020,$273.52,80151380,$281.1,$286,$272.96
2,02/26/2020,$292.65,49678430,$286.53,$297.88,$286.5
3,02/25/2020,$288.08,57668360,$300.95,$302.53,$286.13
4,02/24/2020,$298.18,55548830,$297.26,$304.18,$289.23


In [3]:
datapipe = pipes.IterableWrapper(["HistoricalQuotes.csv"])
csv = pipes.FileOpener(datapipe, mode='rt').parse_csv(delimiter=',', skip_lines=1)

In [4]:
next(iter(csv))

['02/28/2020', ' $273.36', ' 106721200', ' $257.26', ' $278.41', ' $256.37']

In [5]:
def parse_price(dp):
    date, close, vol, open, high, low = dp
    return float(close.strip().replace("$", ""))

In [6]:
prices = csv.map(parse_price)

In [7]:
next(iter(prices))

273.36

In [8]:
import itertools

@functional_datapipe("rolling")
class RollingWindow(pipes.IterDataPipe):
    def __init__(self, source_dp: pipes.IterDataPipe, window_size, step=1) -> None:
        super().__init__()
        self.source_dp = source_dp
        self.window_size = window_size
        self.step = step
    
    def __iter__(self):
        it = iter(self.source_dp)
        cur = []
        while True:
            try:
                while len(cur) < self.window_size:
                    cur.append(next(it))
                yield np.array(cur)
                for _ in range(self.step):
                    if cur:
                        cur.pop(0)
                    else:
                        next(it)
            except StopIteration:
                return

In [24]:
dp = RollingWindow(prices, 5, step=2)
it = iter(dp)
list(itertools.islice(it, 4))

[array([273.36, 273.52, 292.65, 288.08, 298.18]),
 array([292.65, 288.08, 298.18, 313.05, 320.3 ]),
 array([298.18, 313.05, 320.3 , 323.62, 319.  ]),
 array([320.3 , 323.62, 319.  , 324.95, 324.87])]

## Final Pipeline

In [18]:
datapipe = pipes.IterableWrapper(["HistoricalQuotes.csv"])
ds  = (pipes.FileOpener(datapipe, mode='rt').parse_csv(delimiter=',', skip_lines=1)
            .map(parse_price)
            .rolling(window_size=5, step=1)
            .batch(4)
      )
            

In [19]:
next(iter(ds))

[array([273.36, 273.52, 292.65, 288.08, 298.18]),
 array([273.52, 292.65, 288.08, 298.18, 313.05]),
 array([292.65, 288.08, 298.18, 313.05, 320.3 ]),
 array([288.08, 298.18, 313.05, 320.3 , 323.62])]