In [1]:
%matplotlib inline
import os, sys
import numpy as np
import pandas as pd
from pathlib import Path
np.set_printoptions(threshold=sys.maxsize)

from matplotlib import pyplot as plt
from matplotlib import animation
from matplotlib import cm
from IPython.core.display import display, HTML

## Define the dataset class
ref: [clear-nus/vtsnn](https://bamsumit.github.io/slayerPytorch/build/html/_modules/slayerSNN/spikeFileIO.html#spikeArrayToEvent)

In [2]:
def read_letter_file(in_dir, label, sampleNum):
    """Reads a tactile file from path. Returns a pandas dataframe."""
    obj_path = in_dir + "{}/{}_{}.csv".format(label, label, sampleNum)
    df = pd.read_csv(
        obj_path,
        header=0,
#         names=["isNeg", "taxel_id", "timestamp"],
        dtype={"timestamp": float, "isNeg": int, "taxel_id": int, "adc": int}
    )
    return df

In [3]:
class LetterData:
    def __init__(self, label, sampleNum, in_dir="/home/wang/Desktop/sorted/", threshold = 1):
        self.label = label
        self.df = read_letter_file(in_dir, label, sampleNum)
        timestamps = self.df["timestamp"]
        self.start_t = timestamps.min()
        self.end_t = timestamps.max()
        self.T = self.end_t - self.start_t
        self.threshold = threshold
        
    
    def binarize(self, bin_duration):
        bin_number = int(np.floor(self.T/bin_duration))
        data_matrix = np.zeros([80, 2, bin_number])
        
        pos_df = self.df[self.df.isNeg == 0]
        neg_df = self.df[self.df.isNeg == 1]
        
#         print("pos_df", pos_df.shape)
#         print(pos_df)
#         print("neg_df", neg_df.shape)
#         print(neg_df)
        
        count = 0
        
        init_t = self.start_t
        end_t = init_t + bin_duration
        
        while end_t <= self.T + init_t:
            _pos_count = pos_df[
                (
                    (pos_df.timestamp >= self.start_t)
                    & (pos_df.timestamp < end_t)
                )
            ]
            _pos_selective_cells = (
                _pos_count.taxel_id.value_counts() > self.threshold
            )
            if len(_pos_selective_cells):
                
#                 print("start_t {}, _pos_sel_cells {}".format(self.start_t, len(_pos_selective_cells)))
#                 print("_pos_selective_cells")
#                 print(_pos_selective_cells)
                data_matrix[
                    _pos_selective_cells[_pos_selective_cells].index.values-1,
                    0,
                    count,
                ] = 1

            _neg_count = neg_df[
                (
                    (neg_df.timestamp >= self.start_t)
                    & (neg_df.timestamp < end_t)
                )
            ]
            _neg_selective_cells = (
                _neg_count.taxel_id.value_counts() > self.threshold
            )
            if len(_neg_selective_cells):
#                 print("start_t {}, _neg_sel_cells {}".format(self.start_t, len(_neg_selective_cells)))
                data_matrix[
                    _neg_selective_cells[_neg_selective_cells].index.values-1,
                    1, 
                    count,
                ] = 1
            
            self.start_t = end_t
            end_t += bin_duration
            count += 1
        
#         print("before delete", data_matrix.shape) # (80, 2, 35)
#         data_matrix = np.delete(data_matrix, [16, 48], 0)
        return data_matrix
                

## process the spikeArray and visualize it as animated Image
ref: [slayerPytorch](https://bamsumit.github.io/slayerPytorch/build/html/_modules/slayerSNN/spikeFileIO.html#spikeArrayToEvent)

In [4]:
class event():
    '''
    This class provides a way to store, read, write and visualize spike event.

    Members:
        * ``x`` (numpy ``int`` array): `x` index of spike event.
        * ``y`` (numpy ``int`` array): `y` index of spike event (not used if the spatial dimension is 1).
        * ``p`` (numpy ``int`` array): `polarity` or `channel` index of spike event.
        * ``t`` (numpy ``double`` array): `timestamp` of spike event. Time is assumend to be in ms.

    Usage:

    >>> TD = spikeFileIO.event(xEvent, yEvent, pEvent, tEvent)
    '''
    def __init__(self, xEvent, yEvent, pEvent, tEvent):
        if yEvent is None:
            self.dim = 1
        else:
            self.dim = 2

        self.x = xEvent if type(xEvent) is np.ndarray else np.asarray(xEvent) # x spatial dimension
        self.y = yEvent if type(yEvent) is np.ndarray else np.asarray(yEvent) # y spatial dimension
        self.p = pEvent if type(pEvent) is np.ndarray else np.asarray(pEvent) # spike polarity
        self.t = tEvent if type(tEvent) is np.ndarray else np.asarray(tEvent) # time stamp in ms

        if not issubclass(self.x.dtype.type, np.integer): self.x = self.x.astype('int')
        if not issubclass(self.p.dtype.type, np.integer): self.p = self.p.astype('int')

        if self.dim == 2:
            if not issubclass(self.y.dtype.type, np.integer): self.y = self.y.astype('int')

        self.p -= self.p.min()

    def toSpikeArray(self, samplingTime=1, dim=None):	# Sampling time in ms
            '''
            Returns a numpy tensor that contains the spike events sampled in bins of `samplingTime`.
            The array is of dimension (channels, height, time) or``CHT`` for 1D data.
            The array is of dimension (channels, height, width, time) or``CHWT`` for 2D data.

            Arguments:
                * ``samplingTime``: the width of time bin to use.
                * ``dim``: the dimension of the desired tensor. Assignes dimension itself if not provided.

            Usage:

            >>> spike = TD.toSpikeArray()
            '''
            if self.dim == 1:
                if dim is None: dim = ( np.round(max(self.p)+1).astype(int),
                                        np.round(max(self.x)+1).astype(int), 
                                        np.round(max(self.t)/samplingTime+1).astype(int) )
                frame = np.zeros((dim[0], 1, dim[1], dim[2]))
            elif self.dim == 2:
                if dim is None: dim = ( np.round(max(self.p)+1).astype(int), 
                                        np.round(max(self.y)+1).astype(int), 
                                        np.round(max(self.x)+1).astype(int), 
                                        np.round(max(self.t)/samplingTime+1).astype(int) )
                frame = np.zeros((dim[0], dim[1], dim[2], dim[3]))
            return self.toSpikeTensor(frame, samplingTime).reshape(dim)


    def toSpikeTensor(self, emptyTensor, samplingTime=1, randomShift=False, binningMode='OR'):	# Sampling time in ms
            '''
            Returns a numpy tensor that contains the spike events sampled in bins of `samplingTime`.
            The tensor is of dimension (channels, height, width, time) or``CHWT``.

            Arguments:
                * ``emptyTensor`` (``numpy or torch tensor``): an empty tensor to hold spike data 
                * ``samplingTime``: the width of time bin to use.
                * ``randomShift``: flag to shift the sample in time or not. Default: False.
                * ``binningMode``: the way spikes are binned. 'SUM' or 'OR' are supported. Default: 'OR'

            Usage:

            >>> spike = TD.toSpikeTensor( torch.zeros((2, 240, 180, 5000)) )
            '''

            if randomShift is True:
                tSt = np.random.randint(
                    max(
                        int(self.t.min() / samplingTime), 
                        int(self.t.max() / samplingTime) - emptyTensor.shape[3],
                        emptyTensor.shape[3] - int(self.t.max() / samplingTime),
                        1,
                    )
                )
            else:
                tSt = 0

            xEvent = np.round(self.x).astype(int)
            pEvent = np.round(self.p).astype(int)
            tEvent = np.round(self.t/samplingTime).astype(int) - tSt

            # print('shifted sequence by', tSt)

            if self.dim == 1:
                validInd = np.argwhere((xEvent < emptyTensor.shape[2]) &
                                       (pEvent < emptyTensor.shape[0]) &
                                       (tEvent < emptyTensor.shape[3]) &
                                       (xEvent >= 0) &
                                       (pEvent >= 0) &
                                       (tEvent >= 0))
                if binningMode.upper() == 'OR':
                    emptyTensor[pEvent[validInd],
                                0, 
                                xEvent[validInd],
                                tEvent[validInd]] = 1/samplingTime
                elif binningMode.upper() == 'SUM':
                    emptyTensor[pEvent[validInd],
                                0, 
                                xEvent[validInd],
                                tEvent[validInd]] += 1/samplingTime
                else:
                    raise Exception('Unsupported binningMode. It was {}'.format(binningMode))

            elif self.dim == 2:
                yEvent = np.round(self.y).astype(int)
                validInd = np.argwhere((xEvent < emptyTensor.shape[2]) &
                                       (yEvent < emptyTensor.shape[1]) & 
                                       (pEvent < emptyTensor.shape[0]) &
                                       (tEvent < emptyTensor.shape[3]) &
                                       (xEvent >= 0) &
                                       (yEvent >= 0) & 
                                       (pEvent >= 0) &
                                       (tEvent >= 0))

                if binningMode.upper() == 'OR':
                    emptyTensor[pEvent[validInd], 
                                yEvent[validInd],
                                xEvent[validInd],
                                tEvent[validInd]] = 1/samplingTime
                elif binningMode.upper() == 'SUM':
                    emptyTensor[pEvent[validInd], 
                                yEvent[validInd],
                                xEvent[validInd],
                                tEvent[validInd]] += 1/samplingTime
                else:
                    raise Exception('Unsupported binningMode. It was {}'.format(binningMode))

            return emptyTensor

In [5]:
def spikeArrayToEvent(spikeMat, samplingTime=1):
    '''
    Returns TD event from a numpy array (of dimension 3 or 4).
    The numpy array must be of dimension (channels, height, time) or``CHT`` for 1D data.
    The numpy array must be of dimension (channels, height, width, time) or``CHWT`` for 2D data.

    Arguments:
        * ``spikeMat``: numpy array with spike information.
        * ``samplingTime``: time width of each time bin.

    Usage:

    >>> TD = spikeFileIO.spikeArrayToEvent(spike)
    '''
    if spikeMat.ndim == 3:
        spikeEvent = np.argwhere(spikeMat > 0)
        xEvent = spikeEvent[:,1]
        yEvent = None
        pEvent = spikeEvent[:,0]
        tEvent = spikeEvent[:,2]
    elif spikeMat.ndim == 4:
        spikeEvent = np.argwhere(spikeMat > 0)
        xEvent = spikeEvent[:,2]
        yEvent = spikeEvent[:,1]
        pEvent = spikeEvent[:,0]
        tEvent = spikeEvent[:,3]
    else:
        raise Exception('Expected numpy array of 3 or 4 dimension. It was {}'.format(spikeMat.ndim))
    print("check spikeEvent", spikeEvent.shape)
    print(spikeEvent)
    return event(xEvent, yEvent, pEvent, tEvent * samplingTime) 

In [6]:
def _showTD1D(TD, fig=None, frameRate=24, preComputeFrames=True, repeat=False, plot=True):
    if TD.dim !=1:	raise Exception('Expected Td dimension to be 1. It was: {}'.format(TD.dim))
    if fig is None:	fig = plt.figure()
    interval = 1e3 / frameRate					# in ms
    xDim = TD.x.max()+1
    tMax = TD.t.max()
    tMin = TD.t.min()
    pMax = TD.p.max()+1
    minFrame = int(np.floor(tMin / interval))
    maxFrame = int(np.ceil(tMax / interval )) + 1

    # ignore preComputeFrames

    raster,   = plt.plot([], [], '.')
    scanLine, = plt.plot([], [])
    plt.axis((tMin -0.1*tMax, 1.1*tMax, -0.1*xDim, 1.1*xDim))

    def animate(i):
        tEnd = (i + minFrame + 1) * interval
        ind  = (TD.t < tEnd)
        # update raster
        raster.set_data(TD.t[ind], TD.x[ind])
        # update raster scan line
        scanLine.set_data([tEnd + interval, tEnd + interval], [0, xDim])


    anim = animation.FuncAnimation(fig, animate, frames=maxFrame, interval=interval, repeat=repeat)

    if plot is True:
        plt.show()
    return anim


def _showTD2D(TD, fig=None, frameRate=24, preComputeFrames=True, repeat=False, plot=True, use_ms=True, xDim=None, yDim=None, speed=1):
    """
    added parameter by RH:
    use_ms: True if spikeEvent use [ms] as unit, otherwise use [s]. Default is True
    xDim, yDim: use input parameter to define 2D image size, infer from TD readings if not specified
    speed: accelerate or slow down the animation, default to be 1
    """
    if TD.dim != 2: 
        raise Exception('Expected Td dimension to be 2. It was: {}'.format(TD.dim))
    if fig is None:	
        fig = plt.figure()
    interval = 1e3 / frameRate if use_ms else 1/frameRate
    
    if xDim is None:
        xDim = TD.x.max()+1 
    if yDim is None:
        yDim = TD.y.max()+1 
    print("check in _showTD2D")
    print("xDim {}, yDim {}, tmin {}, tmax {}".format(xDim, yDim, TD.t.min(), TD.t.max()))

    if preComputeFrames is True:
        minFrame = int(np.floor(TD.t.min() / interval))
        maxFrame = int(np.ceil(TD.t.max() / interval ))
#         print("minFrame {}, maxFrame {}".format(minFrame, maxFrame))
        image    = plt.imshow(np.zeros((yDim, xDim, 3)))
        frames   = np.zeros( (maxFrame-minFrame, yDim, xDim, 3))

        # precompute frames
        for i in range(len(frames)):
            tStart = (i + minFrame) * interval
            tEnd = (i + minFrame + 1) * interval
            timeMask = (TD.t >= tStart) & (TD.t < tEnd)
            rInd = (timeMask & (TD.p == 1))
            gInd = (timeMask & (TD.p == 2))
            bInd = (timeMask & (TD.p == 0))
#             print("frame {}, rInd {}, gInd {}, bInd {}".format(i, rInd, gInd, bInd))
            frames[i, TD.y[rInd], TD.x[rInd], 0] = 1
            frames[i, TD.y[gInd], TD.x[gInd], 1] = 1
            frames[i, TD.y[bInd], TD.x[bInd], 2] = 1

        def animate(frame):
            image.set_data(frame)
            return image
        
        play_interval = interval/speed # accelerate or slow down the animation
        anim = animation.FuncAnimation(fig, animate, frames=frames, interval=play_interval, repeat=repeat)

    else:
        minFrame = int(np.floor(TD.t.min() / interval))
        maxFrame = int(np.ceil(TD.t.max() / interval ))
        image    = plt.imshow(np.zeros((yDim, xDim, 3)))
        def animate(i):
            tStart = (i + minFrame) * interval
            tEnd = (i + minFrame + 1) * interval
            frame  = np.zeros((yDim, xDim, 3))
            timeMask = (TD.t >= tStart) & (TD.t < tEnd)
            rInd = (timeMask & (TD.p == 1))
            gInd = (timeMask & (TD.p == 2))
            bInd = (timeMask & (TD.p == 0))
            frame[TD.y[rInd], TD.x[rInd], 0] = 1
            frame[TD.y[gInd], TD.x[gInd], 1] = 1
            frame[TD.y[bInd], TD.x[bInd], 2] = 1
            image.set_data(frame)
            return image

        anim = animation.FuncAnimation(fig, animate, frames=maxFrame-minFrame, interval=interval, repeat=repeat)

    # # save the animation as an mp4.  This requires ffmpeg or mencoder to be
    # # installed.  The extra_args ensure that the x264 codec is used, so that
    # # the video can be embedded in html5.  You may need to adjust this for
    # # your system: for more information, see
    # # http://matplotlib.sourceforge.net/api/animation_api.html
    # if saveAnimation: anim.save('showTD_animation.mp4', fps=30)

    if plot is True:
        plt.show()
    return anim


In [7]:
def animTD(TD, fig=None, frameRate=24, preComputeFrames=True, repeat=True, use_ms=True, xDim=None, yDim=None, speed=1):
    '''
    Reutrn animation object for TD event.

    Arguments:
    * ``TD``: spike event to visualize.
    * ``fig``: figure to plot animation. Default is ``None``, in which case a figure is created.
    * ``frameRate``: framerate of visualization.
    * ``preComputeFrames``: flag to enable precomputation of frames for faster visualization. Default is ``True``.
    * ``repeat``: flag to enable repeat of animation. Default is ``True``.

    Usage:

    >>> anim = animTD(TD)
    '''
    print("TD", type(TD), TD.dim)
    if fig is None:
        fig = plt.figure()
    if TD.dim == 1:
        anim =  _showTD1D(TD, fig, frameRate=frameRate, preComputeFrames=preComputeFrames, repeat=repeat, plot=False)		
    else:
        anim =  _showTD2D(TD, fig, frameRate=frameRate, preComputeFrames=preComputeFrames, repeat=repeat, plot=False, use_ms=use_ms, xDim=xDim, yDim=yDim, speed=speed)

    plt.close(anim._fig)
    return anim


## Main example

In [8]:
save_dir ="data/"
label = "L"
sampleNum = 45
bin_duration = 0.1

In [9]:
# Binarize the spikeArray and smoothen it with bin_duration
sampleLetterData = LetterData(label=label, sampleNum=sampleNum)
print("sampleLetterData {} sampleNum {}, start_t {}, duration {}".format(label, sampleNum, sampleLetterData.start_t, sampleLetterData.T))
letterData = sampleLetterData.binarize(bin_duration=bin_duration)
fout = save_dir + "{}/{}_{}.npy".format(label, label, sampleNum)
print(f"Writing {fout}...")
np.save(fout, letterData.astype(np.uint8))

FileNotFoundError: [Errno 2] No such file or directory: '/home/wang/Desktop/sorted/L/L_45.csv'

In [None]:
# Load the data back for further processing
data = np.load(save_dir+'{}/{}_{}.npy'.format(label, label, sampleNum))
# (unique, counts) = np.unique(data, return_counts=True)
# frequencies = np.asarray((unique, counts)).T
# print(frequencies)

In [None]:
order = np.array([36, 37, 38, 39, 40, 56, 57,58,59,60,31,32,33,34,35,51,52,53,54,55,26,27,28,29,30,46,47,48,49,50,21,22,23,24,25,41,42,43,44,45,5,4,3,2,1,65,64,63,62,61,10,9,8,7,6,70,69,68,67,66,15,14,13,12,11,75,74,73,72,71,20,19,18,17,16,80,79,78,77,76])
order = order-1
def reshape_taxel(arr, order):
    # customized reshaping for 8x10 NeuTouch sensor
    arr_reshape = arr[order]
    arr_reshape = arr_reshape.reshape((8,10))
    return arr_reshape
# # make a dumb array to check ordering
# arr = np.arange(1,81)
# arr_reshape = reshape_taxel(arr, order)
# print("arr")
# print(arr)
# print("arr_reshape")
# print(arr_reshape)

In [None]:
reshape_data = np.apply_along_axis(reshape_taxel, 0, data, order=order) # check shape, data (80, 2, 17) reshape_data (8, 10, 2, 17)
reshape_data = np.transpose(reshape_data, (2,0,1,3)) # convert to (CHWT)
print("reshape_data", reshape_data.shape)
TD = spikeArrayToEvent(reshape_data, samplingTime=bin_duration) # shape must be (channels,height, (width,) time)
anim = animTD(TD, frameRate=1/bin_duration, use_ms=False, xDim=reshape_data.shape[2], yDim=reshape_data.shape[1], speed=1)
anim_file = save_dir+'{}/{}_{}.gif'.format(label, label, sampleNum)
# anim.save(anim_file, fps=5.0, dpi=200)
writergif = animation.PillowWriter(fps=5)
anim.save(anim_file,writer=writergif)

In [None]:
# visualize the animation
HTML(anim.to_jshtml())