# Squirrel for Timeseries data 

Squirrel also handles timeseries data or any form **ordered** data. However, a few modifications are needed here to maintain the ordereness after storing. 

In this notebook we will show two possible approaches for storing and loading timeseries. The first one utilizes squirrel-native functionalities and the second one makes use of **Squirrel** and **Spark**.

In [None]:
!pip install squirrel-core pyspark

In [None]:
import typing as t
import tempfile
import numpy as np

import time
import pickle

from random import randint
from functools import partial

from pyspark.sql import SparkSession
from squirrel.driver import MessagepackDriver
from squirrel.store import SquirrelStore
from squirrel.serialization import MessagepackSerializer
from squirrel.iterstream import IterableSource, Composable

In [None]:
# Some utility functions to generate timeseries samples and verify the ordereness


def generate_timeseries_samples(N: int):
    """Generate timeseries"""
    for _ in range(N):
        yield {"time_stamp_sec": randint(0, 1e6), "data": pickle.dumps(np.random.rand(2, 2))}


def is_ordered(li: t.List[t.Dict], key=None) -> bool:
    """Test if the list is ordered according to a key in l"""
    return all(li[i].get(key) <= li[i + 1].get(key) for i in range(len(li) - 1))


# Constants shared between the experiments
N_SHARDS = 10
N = int(1e4)

## Timeseries with Squirrel-native

The only thing we changed here is to use `zip_index` to obtain a key for storing the data. This integer key is formated as a string padded with zeros, if it does not have the same number of digits as given by `pad_length`. `zip_index` returns an iterable over tuples where the first item is the index, and the second the item is the value.

This sounds cumbersome at first, but to preserve the order we sort by the shard keys before yielding them. As shard keys are used as filenames, keys are sorted as strings. A key with `11` will be then sorted before `9`, when sorting in ascending order. For this reason, the key is padded with zeros.

In [None]:
# Note that we assume the data is already sorted, we just guarantee that the data
# remain sorted
samples_list = list(sorted(generate_timeseries_samples(N), key=lambda x: x["time_stamp_sec"]))
samples = IterableSource(samples_list)

start = time.time()
with tempfile.TemporaryDirectory() as tempdir:
    # Write to a new cleaned store
    store = SquirrelStore(url=str(tempdir), serializer=MessagepackSerializer(), clean=True)
    samples.batched(N_SHARDS).zip_index(pad_length=9).map(lambda x: store.set(key=x[0], value=x[1])).join()
    # Read
    driver = MessagepackDriver(url=str(tempdir))
    retrieved = driver.get_iter()
    assert is_ordered(retrieved.collect(), key="time_stamp_sec")
end = time.time()

print(f"Timeseries loading with Squirrel took {end - start}")

## Timeseries with Squirrel and Spark 

We leverage Spark here to sort the time-series. Spark is useful, when your data does not fit entirely into memory.

In [None]:
def save_iterable_as_shard(it, store, pad_len=10) -> None:
    """Helper to save a shard into a messagepack store using squirrel."""
    it_list = list(it)
    if len(it_list) > 0:
        # use the earliest time_stamp as key
        smallest_timestamp = str(it_list[0]["time_stamp_sec"])
        # pad the key similar to zip_index()
        key = "0" * (pad_len - len(smallest_timestamp)) + smallest_timestamp
        store.set(value=it_list, key=key)

In [None]:
samples = IterableSource(generate_timeseries_samples(N))
# Initiate Spark
spark = SparkSession.builder.appName("test").getOrCreate()
rdd = spark.sparkContext.parallelize(samples)
# Sort
rdd = rdd.repartition(N_SHARDS).sortBy(lambda x: x["time_stamp_sec"])

start = time.time()
with tempfile.TemporaryDirectory() as tempdir:
    # Store into a new store
    store = SquirrelStore(url=str(tempdir), serializer=MessagepackSerializer(), clean=True)

    rdd.foreachPartition(partial(save_iterable_as_shard, store=store))

    # Read
    driver = MessagepackDriver(url=str(tempdir))
    retrieved = driver.get_iter()
    assert is_ordered(retrieved.collect(), key="time_stamp_sec")
end = time.time()

print(f"Timeseries loading with Spark took {end-start}")

We can also sort the data with Spark during loading. 

In [None]:
class SparkSource(Composable):
    def __init__(self, url: str, sort_callback):
        """Define a helper class to encapsulate an Iterator over Spark contents"""
        self.url = url
        self.sort_callback = sort_callback
        self.spark = SparkSession.builder.appName("test").getOrCreate()

    def __iter__(self):
        store = SquirrelStore(url=str(tempdir), serializer=MessagepackSerializer())
        keys = list(store.keys())
        # Here we do the sorting
        rdd = self.spark.sparkContext.parallelize(keys).map(lambda k: list(store.get(k))).flatMap(lambda x: x)
        rdd = rdd.sortBy(self.sort_callback)
        for item in rdd.toLocalIterator():
            yield item


# unsorted data
samples = IterableSource(generate_timeseries_samples(N))

with tempfile.TemporaryDirectory() as tempdir:
    print(tempdir)
    # Write
    store = SquirrelStore(url=tempdir, serializer=MessagepackSerializer(), clean=True)
    samples.batched(N_SHARDS).map(store.set).join()

    # Read
    spark_iterable = SparkSource(tempdir, lambda x: x["time_stamp_sec"]).collect()
    assert is_ordered(spark_iterable, key="time_stamp_sec")