# Notes

`Course Instructor`: **John Chiasson**

`Author (TA)`: **Ruthvik Vaila**

* In this notebook we shall see how to parallellize tfrecordization.
* Tested on `Python 3.7.5` with `Tensorflow 1.15.0` and `Keras 2.2.4`. 
* Tested on `Python 2.7.17` with `Tensorflow 1.15.3` and `Keras 2.2.4`. 

# Imports

In [1]:
import tensorflow as tf
import numpy as np
import h5py, pickle, inspect, functools, os, shutil, sys
import time
import multiprocessing as mp
import tfrecords_worker as tfwk

In [2]:
sys.version

'3.7.5 (default, Nov  7 2019, 10:50:52) \n[GCC 8.3.0]'

# Load a large `NumPy` array

In [3]:
filename = 'data/emnist_train_x.h5'
with h5py.File(filename, 'r') as hf:
    train_x = hf['pool1_spike_features'][:]

filename = 'data/emnist_test_x.h5'
with h5py.File(filename, 'r') as hf:
    test_x = hf['pool1_spike_features'][:]

print('Train data shape:{}'.format(train_x.shape))
print('Test data shape:{}'.format(test_x.shape))

train_x = list(train_x) #convert 2D numpy array to a list of 1D numpy arrays 
test_x = list(test_x)

Train data shape:(112799, 3630)
Test data shape:(18800, 3630)


In [4]:
filename = 'data/emnist_train_y.pkl'
filehandle = open(filename, 'rb')
train_y = pickle.load(filehandle)
filehandle.close()

filename = 'data/emnist_test_y.pkl'
filehandle = open(filename, 'rb')
test_y = pickle.load(filehandle)
filehandle.close()

print('Train labels shape:{}'.format(train_y.shape))
print('Test labels shape:{}'.format(test_y.shape))

train_y = train_y.tolist() #convert 2D numpy array to a list of 1D numpy arrays 
test_y = test_y.tolist()

Train labels shape:(112799,)
Test labels shape:(18800,)


# Use `inspect` to examine the arguments of `tfwk.worker` 

In [5]:
inspect.getargspec(tfwk.worker)[0]

  """Entry point for launching an IPython kernel.


['partitions', 'train_x', 'train_y', 'filename']

In [6]:
num_partitions = 20
partitions = np.linspace(0,len(train_x), num_partitions+1)
partitions[-1] -= 1
partitions = partitions.astype(np.int32).tolist()
#print(partitions)

partitions = np.repeat(partitions, 2)[1:-1].tolist()
#print(partitions)

iter_lists = np.split(np.array(partitions), num_partitions)
iter_lists = [item.tolist() for item in iter_lists]
#print(iter_lists)

iter_lists = list(enumerate(iter_lists))
print(iter_lists)

[(0, [0, 5639]), (1, [5639, 11279]), (2, [11279, 16919]), (3, [16919, 22559]), (4, [22559, 28199]), (5, [28199, 33839]), (6, [33839, 39479]), (7, [39479, 45119]), (8, [45119, 50759]), (9, [50759, 56399]), (10, [56399, 62039]), (11, [62039, 67679]), (12, [67679, 73319]), (13, [73319, 78959]), (14, [78959, 84599]), (15, [84599, 90239]), (16, [90239, 95879]), (17, [95879, 101519]), (18, [101519, 107159]), (19, [107159, 112798])]


# Generating `.tfrecords` serially

In [7]:
t1 = time.time()
folder_name = 'parallel_tfrecords/'
if os.path.exists(folder_name):
    shutil.rmtree(folder_name)
os.mkdir(folder_name)
filename=folder_name+'EMNIST_train_strings_'
worker = functools.partial(tfwk.worker, train_x=train_x, train_y=train_y, filename=filename)
for item in iter_lists:
    worker(item)
print('Time taken for serial process:{}'.format(time.time()-t1))

Time taken for serial process:83.82524633407593


# Generating `.tfrecords` parallelly

In [8]:
if __name__ == '__main__':
    t1 = time.time()
    folder_name = 'parallel_tfrecords/'
    if os.path.exists(folder_name):
        shutil.rmtree(folder_name)
    os.mkdir(folder_name)
    filename=folder_name+'EMNIST_train_strings_'
    worker = functools.partial(tfwk.worker, train_x=train_x, train_y=train_y, filename=filename)
    num_process = 6
    pool = mp.Pool(num_process)
    pool_handle = pool.map_async(worker, iter_lists)
    pool_handle.get()
    pool.close()
    pool.join()
    print('Time taken for serial process:{}'.format(time.time()-t1))
    print('Multiprocessing Done!')
    

Time taken for serial process:44.98908305168152
Multiprocessing Done!
