In [1]:
import numpy as np
import os
import csv

from PIL import Image

import unittest

In [2]:
from PIL.JpegImagePlugin import JpegImageFile
TYPE_PIL_JPG = JpegImageFile
TYPE_NP_ARRAY = np.ndarray

In [3]:
class Data():
    @staticmethod
    def lidar_array():
        arr = np.random.randint(0,2, size=365)
        return arr
    
    @staticmethod
    def camera_img():
        img = Image.open('/home/wroscoe/mydonkey/sessions/1115am/frame_00001_ttl_0.42665756334488203_agl_0.09659284488019566_mil_0.0.jpg')
        return img
    
    def camera_arr(self):
        img = self.camera_img()
        return np.array(img)
    
    @staticmethod
    def steering_angle():
        return np.random.random() * 2 - 1
    
    @staticmethod
    def throttle():
        return np.random.random() * 1.2 - .2

In [4]:
import time

In [5]:
class Tub():
    """
    A datastore to store sensor data in a key, value format.
    
    Accepts str, int, float, image_array, image, and array data types.
    
    For example:
    
    #Create a tub to store speed values.
    t=Tub(inputs={'speed':'float'})
    
    #Save a speed value.
    t.put('speed', .345)
    
    #Get the last speed value.
    t.get('speed')
    
    #Get a dataframe of all the records. 
    t.records()
    t.records(interval=.1, how='last') 
    
    #Show the statistics about the Tub
    t.summary()
    
    #create a generator that will return a dictionary of the 
    #saved key, value pairs. 
    t.replay(keys=['speed'], y=interval=.1, how='last', shuffle=True)
    
    #zip the tub folder
    t.zip(filepath='first_race.zip')
    """
    
    
    
    def __init__(self, 
                 path=None,
                 inputs = {},
                 overwrite=False):
               
        self.record_id = 0
        self.start_time = time.time()
        self.key_type_map = key_type_map
        
        self.path = os.path.expanduser(path)
        self.log_path = os.path.join(self.path, 'log.txt')
        
        if overwrite:
            if os.path.exists(self.path):
                self.delete()
            self.log = self.load(new=True)
        else:
            self.log = self.load()

            
    def load(self, new=False):
        #create a folder
        if not os.path.exists(self.path):
            os.makedirs(self.path)    
            
        log = self._open_log(self.log_path)
        return log
            
        
    @staticmethod
    def _open_log(log_path):
        if os.path.isfile(log_path):
            write_mode = 'a' #append
        else: 
            write_mode = 'w'
        
        #create text file to hold log of variables
        log = open(log_path, write_mode)

        if write_mode is 'w':
            headers = 'id, key, val, type'
            log.write(headers + '\n')
    
        return log
    
    
    def write_record(self, key, val):
        """
        Write a record to the log.
        """
        line = ','.join([str(self.record_id), key, str(val)])
        self.log.write(line + '\n')
        self.log.flush()
        
    def save_image(self, key, image):
            file_path = self.make_file_path(key)
            image.save(file_path)
            return file_path
        
    def save_array(self, key, array):
            file_path = self.make_file_path(key, ext='.npy')
            array.tofile(file_path)
            return file_path
    
    def put_image_array(self, key, arr):

    
    
    def put(self, key, val):
        
        ''' Save the key and value to disk.'''
        
        assert key in self.key_type_map.keys()
        
        self.record_id += 1
        self.record_time = int(time.time() - self.start_time)
        
        val_type = self.key_type_map.get(key)
        
        if val_type in ['str', 'float', 'int']:
            self.write_record(key, val)        
            
        elif val_type is 'image_array':
            image = Image.fromarray(val)
            val = self.save_image(key, image)
            self.write_record(key, val)
            
        elif val_type is 'image':
            val = save_image(key, val)
            self.write_record(key, val)
            
        elif val_type is 'array':
            val = save_array(key, val)
            self.write_record(key, val)

    
    def get(self, keys):
        ''' 
        
        '''
    
    
    def make_file_path(self, key, ext='.jpg'):
        name = '_'.join([str(self.record_id), key, str(self.record_time), ext])
        file_path = os.path.join(self.path, name)
        return file_path
    
    def close(self):
        try:
            self.log.close()
        except AttributeError as e:
            pass
        
    def delete(self):
        """ Delete the folder and files for this tub. """
        self.close()
        import shutil
        shutil.rmtree(self.path)

    
    

In [6]:
class TestTubPut(unittest.TestCase):
    def setUp(self):
        
        key_type_map = {'speed': 'float',
                        'lidar': 'lidar_arr',
                        'camera': 'camera_arr'}
        
        self.tub = Tub('~/tubtest', 
                       key_type_map=key_type_map,
                       overwrite=True)
        self.data = Data()
    
    
    def get_log_last_line(self):
        f = open(self.tub.log_path, 'r')
        last_line = f.read().split('\n')[-2]
        print(last_line)
        i, key, val = last_line.split(',')
        return i, key, val
    
    def test_put_float(self):
        self.tub.put('speed', .234)
        i, key, val = self.get_log_last_line()
        self.assertEqual(key, 'speed')
        
    def test_put_lidar(self):
        val = self.data.lidar_array()
        self.tub.put('lidar', val)
        i, key, val = self.get_log_last_line()
        self.assertEqual(i, '1')
        self.assertEqual(key, 'lidar')
        assert os.path.isfile(self.tub.log_path)
        assert os.path.isfile(val)
        
    def test_put_camera_img(self):
        val = self.data.camera_arr()
        self.tub.put('camera', val)
        i, key, val = self.get_log_last_line()
        self.assertEqual(key, 'camera')
        assert os.path.isfile(val)
        
    def test_force_keys(self):
        self.assertRaises(AssertionError, self.tub.put, 'wrong_key_name', 'some_val')
        
    def tearDown(self):
        self.tub.delete()

In [7]:
a = TestTubPut()
suite = unittest.TestLoader().loadTestsFromModule(a)
unittest.TextTestRunner().run(suite)

....

1,camera,/home/wroscoe/tubtest/1_camera_0_.jpg
1,speed,0.234
1,lidar,/home/wroscoe/tubtest/1_lidar_0_.jpg



----------------------------------------------------------------------
Ran 4 tests in 0.074s

OK


<unittest.runner.TextTestResult run=4 errors=0 failures=0>

In [23]:
data = Data()

In [69]:
arr.shape + (1, )

(365, 1)

In [45]:
arr = (data.lidar_array() * 255.0).astype(np.uint8)
arr = arr.reshape((1,) + arr.shape)
#arr1 = np.concatenate([arr, arr], axis=1)

arr1.shape

NameError: name 'data' is not defined

In [46]:
img = Image.fromarray(arr)
img.save('~/lidar_test_img.jpg')

NameError: name 'arr' is not defined

In [None]:
bag.file.close()

In [None]:
f = open('~/test_bag/log.txt', 'r')


In [None]:
f.read()

In [None]:
#save image
arr = np.zeros(shape=(120,160))
bag.put('cam', arr, val_type=np.ndarray)

In [14]:
float.__name__

'float'

In [None]:
data = Data()

In [None]:
[data.steering_angle() for _ in range(10)]

In [76]:
data = Data()
t = Tub(path='~/test_tub', overwrite=True)

In [77]:
img = data.camera_img()

for i in range(100):
    path = os.path.join(t.path, str(i) + '.nda')
    arr = np.array(img)
    arr = arr.astype(np.uint8)
    arr.tofile(path)