In [1]:
import numpy as np
import scipy as sp
import pandas as pd
import matplotlib.pyplot as plt
import pandas as pd
import seaborn as sns
import re
import time
import random
import sys
import logging
import os
import subprocess
from IPython.display import display
# import cufflinks as cf

pd.set_option("display.max_columns",1000)
pd.set_option("display.max_rows",1000)

import tensorflow as tf

%matplotlib inline

### Queue & Coordinator

In [2]:
N_SAMPLES = 1000
NUM_THREADS = 4
# Generating some simple data
# create 1000 random samples, each is a 1D array from the normal distribution N(1, 100)
data = 10 * np.random.randn(N_SAMPLES,4) + 1
# create 1000 random labels of 0 and 1
label = np.random.randint(0,2,size=N_SAMPLES)

queue = tf.FIFOQueue(capacity=50,dtypes=[tf.float32,tf.int32],shapes=[[4],[]])

enqueue_op = queue.enqueue_many([data,label])
dequeue_op = queue.dequeue()

# create NUM_THREADS to do enqueue
qr = tf.train.QueueRunner(queue,enqueue_ops=[enqueue_op] * NUM_THREADS)
with tf.Session() as sess:
    # Create a coordinator, launch the queue runner threads.
    coord = tf.train.Coordinator()
    enqueue_threads = qr.create_threads(sess,coord=coord,start=True)
    try:
        for step in range(10):
            if coord.should_stop():
                break
            data_batch,label_batch = sess.run(dequeue_op)
            print(step,data_batch,label_batch)
    except Exception as e:
        coord.request_stop(e)
    finally:
        coord.request_stop()
        coord.join(enqueue_threads)

0 [ 12.06169987  28.70277214 -10.63586807  -4.9760704 ] 1
1 [ -3.46140981   3.51335835   0.22240219  18.45738029] 0
2 [-12.2756319    6.17782688   3.51156044  -2.59906912] 1
3 [ 10.12537003   7.38464594  19.89593697  -1.42829859] 1
4 [ -8.45846558 -10.87396812   6.12136126  -1.9430933 ] 1
5 [  9.43890858  22.02574348   5.98641253  14.76107407] 0
6 [  4.83206797  -7.18120527  11.14713287  -9.94617176] 1
7 [ 10.79069138 -19.49463463   4.67529821  12.83173561] 1
8 [-10.77382183   1.86574149  -8.7260437   10.24249554] 1
9 [ 7.15000868  6.95936489 -3.12906528  8.24206448] 1


### Data Reader - CSV

In [None]:
N_FEATURES = 9
BATCH_SIZE = 10

############# Read CSV Files ################

# Create a queue that hould the names of all files
filename_queue = tf.train.string_input_producer(["/home/ec2-user/data/tf_playground/heart.csv"])
reader = tf.TextLineReader(skip_header_lines=1) # TF does not care about header

# Read one line from the file in queue
key,value = reader.read(filename_queue)

############# Decoding ################

# Decoding with data type
record_defaults = [[np.float32(1.0)] for _ in range(N_FEATURES)]
record_defaults[4]=[" "]
record_defaults.append([1])
content = tf.decode_csv(value,record_defaults=record_defaults)

############# Preprocessing ################

# Convert the 5th column (present/absent) to the binary value 0 and 1
condition = tf.equal(content[4],tf.constant('Present'))
content[4] = tf.where(condition,tf.constant(1.0),tf.constant(0.0))

# Pack all 9 features into a tensor
features = tf.stack(content[:N_FEATURES])
# Assign the last column to label
label = content[-1]

############# Build Shuffle Batch ################

# minimum number elements in the queue after a dequeue, used to ensure
# that the samples are sufficiently mixed
# I think 10 times the BATCH_SIZE is sufficient
min_after_dequeue = 10 * BATCH_SIZE

# the maximum number of elements in the queue
capacity = 20 * BATCH_SIZE

# shuffle the data to generate BATCH_SIZE sample pairs
feature_batch,label_batch = tf.train.shuffle_batch([features,label],batch_size=BATCH_SIZE,
                                                  capacity=capacity,min_after_dequeue=min_after_dequeue)

with tf.Session() as sess:
    coord = tf.train.Coordinator()
    threads = tf.train.start_queue_runners(coord=coord)
    try:
        for step in range(10):
            if(coord.should_stop()):
                break
            print(sess.run([feature_batch,label_batch]))
    except Exception as e:
        coord.request_stop(e)
    finally:
        coord.request_stop()
        coord.join(threads)


### Data Reader - TFRecord

In [15]:
def write_to_tfrecord(data,label,shape,tfrecord_filename):
    """Write data as tfrecord, all input should be bytes"""
    writer = tf.python_io.TFRecordWriter(tfrecord_filename)
    feature = {
        "label": tf.train.Feature(bytes_list=tf.train.BytesList(value=[label])),
        "data": tf.train.Feature(bytes_list=tf.train.BytesList(value=[data])),
        "shape": tf.train.Feature(bytes_list=tf.train.BytesList(value=[shape]))
    }
    example = tf.train.Example(features=tf.train.Features(feature=feature))
    writer.write(example.SerializeToString())
    writer.close()

In [17]:
data = np.random.randn(3,2)
label = np.random.randint(0,2,size=3)

write_to_tfrecord(data.tobytes(),label.tobytes(),np.array(data.shape).tobytes(),"./first_tfrecord.tfr")