![](../header.jpg)

# Streaming Pickle Data to Disk

Kevin J. Walchko, Phd

19 July 2020

---

In [1]:
import pickle
from slurm.files import rm
from slurm.rate import Rate
import datetime
from collections import defaultdict
import os
import attr
import atexit
import gzip
import numpy as np
from pathlib import Path
import time

In [2]:
# import multiprocessing as mp
# from colorama import Fore
# import attr


# @attr.s(slots=True)
# class SimpleProcess(object):
#     """
#     A simple class to help processes start/stop easily. It is main intended for
#     testing and some simple things.
#     p = SimpleProcess()       # create process
#     p.start(function)         # start the process, runs function
#     ...                       # stuff happens
#     p.join()                  # time to go ... bye!
#     """
#     ps = attr.ib(init=False, default=None)

#     def __del__(self):
#         if self.ps:
#             self.join(0.1)

#     @property
#     def name(self):
#         return self.ps.name

#     @property
#     def pid(self):
#         return self.ps.pid

#     def is_alive(self):
#         """Check if the process is still running"""
#         if self.ps:
#             return self.ps.is_alive()
#         else:
#             return False

#     def terminate(self):
#         """Terminate the process"""
#         if self.ps:
#             self.ps.terminate()

#     def start(self, func, name='simple_process', **kwargs):
#         """Starts the process
#           func: function for multi-process
#           name: what to call the process
#           kwargs: args to pass function
#         """
#         if kwargs:
#             kwargs = kwargs['kwargs']  # WTF???
#             self.ps = mp.Process(name=name, target=func, kwargs=kwargs)
#         else:
#             self.ps = mp.Process(name=name, target=func)

#         self.ps.start()
#         print(f'>> {Fore.GREEN}Started{Fore.RESET}: {self.ps.name}[{self.ps.pid}]')

#     def join(self, timeout=1.0):
#         """
#         Attempts to join() the process with the given timeout. If that fails, it calls
#         terminate().
#         timeout: how long to wait for join() in seconds.
#         """
#         print(f'>> {Fore.RED}Stopping{Fore.RESET}: {self.ps.name}[{self.ps.pid}] ...')
#         if self.ps:
#             self.ps.join(timeout)
#             if self.ps.is_alive():
#                 self.ps.terminate()
#         self.ps = None

In [2]:
@attr.s(slots=True)
class Pickle:
    proto = attr.ib(init=False, default="pickle")

    def pack(self, data, fd):
        return pickle.dump(data, fd)

    def unpack(self, fd):
        return pickle.load(fd)

@attr.s(slots=True)
class PickleGz:
    proto = attr.ib(init=False, default="pickle-gz")
    compressionlevel = attr.ib(default=9)

    def pack(self, data, fd):
        d = pickle.dumps(data)
        d = gzip.compress(d, compresslevel=self.compressionlevel)
        pickle.dump(d, fd)

    def unpack(self, fd):
        d = pickle.load(fd)
        d = gzip.decompress(d)
        return pickle.loads(d)

In [3]:
from threading import Thread

class BagItStream:
    """
    """
#     __slots__ = ('buffer', 'packer', 'maxlen', 'counter', 'filename')

    def __init__(self, packer, maxlen=1000):
        """

        """
        self.buffer = defaultdict(list)
        self.packer = packer
        self.counter = 0
        self.maxlen = maxlen
        self.filename = None
        atexit.register(self.flush)
        
#         self.run = True
#         self.ps = Thread(target=self.__loop)
#         self.ps.daemon = True
#         self.ps.start()
        
#     def __del__(self):
#         self.run = False
        
#     def __loop(self):
#         while self.run:
#             print("hi")
#             time.sleep(1)
            
#     def stop(self):
#         self.run = False

    def flush(self):
        if self.counter > 0:
            self.__write()

    def clear(self):
        self.counter = 0
        self.buffer.clear()
        self.filename = None

    def push(self, key, msg, incr=1):
        """
        Push another message and a key into the buffer. Once the buffer limit
        is reached it is written to a file.
        """
        if self.counter == self.maxlen:
            self.__write()

        self.buffer[key].append(msg)
        self.counter += incr

    def set_file(self, filename, timestamp=True):
        filename = os.path.expanduser(filename)

        if timestamp:
            dt = str(datetime.datetime.now()).replace(' ', '-')
            filename = f"{filename}.{dt}.s-{self.packer.proto}.bag"
        else:
            filename = f"{filename}.s-{self.packer.proto}.bag"

        self.clear()

        self.filename = filename
        rm(filename)

        return filename

    def __write(self):
        if self.filename is None:
            raise Exception("Call set_file first")

        with open(self.filename, 'ab+') as fd:
            self.packer.pack(self.buffer, fd)
        self.buffer.clear()
        self.counter = 0

    def read(self, filename):
        """
        Given a filename, it opens it and read all data into memory and return
        Inputs:
          filename - name of file
        Return:
          dict() with keys for each recorded data stream and a list/tuple of
          data points
        """
        filename = os.path.expanduser(filename)
        t = filename.split('.')
        p = t[-2]

        if p in ['pickle', 'pickle-gz']:
            print(f">> Reading[{p}]: {filename}")
            if p != "s-" + self.packer.proto:
                raise Exception(f"File is {p} protocol, this Bagit is {self.packer.proto}")

        dd = defaultdict(list)
        with open(filename, 'rb') as fr:
            try:
                while True:
                    f = self.packer.unpack(fr)
                    for k,v in f.items():
                        dd[k] += v
            except EOFError:
                pass

        return dd

In [11]:
def test(pack, maxlen):
    bag = BagItStream(pack,maxlen)
    bag.set_file("bob", False)
    rate = Rate(20)

    for i in range(100):
        bag.push("a", np.ones((720,1280))*127)
        bag.push("b", i/2)
        bag.push("c", i/2)
        bag.push("d", i/2)
        rate.sleep()

#     bag.flush()
#     d = bag.read(f"bob.s-{bag.packer.proto}.bag")
#     return d
#     bag.stop()
#     print(">> done ...")

In [9]:
%%timeit
d = test(Pickle(),10)

5.43 s ± 73.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [10]:
%%timeit
d = test(PickleGz(3),10)

5.74 s ± 26 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [12]:
test(Pickle(),10)
test(PickleGz(),10)

gz_sz = os.stat('bob.s-pickle-gz.bag').st_size
pk_sz = os.stat('bob.s-pickle.bag').st_size

print(f"Pickle: {pk_sz//(1024**2)}Mb    PickleGz: {gz_sz//1024**2}Mb")

Pickle: 689Mb    PickleGz: 1Mb
