In [5]:
import tensorflow as tf
import os
from tensorflow.keras.layers import Dense, Conv2D, Flatten, Dropout, MaxPooling2D
import numpy as np
from pathlib import Path
test=False

In [122]:
class_num = 10
val_percentage = 0.1
test_percentage = 0.1

#helper functions for the features.
def _int64_feature(value):
#Wrapper for inserting int64 features into Example proto.
    if not isinstance(value, list):
        value = [value]
    return tf.train.Feature(int64_list=tf.train.Int64List(value=value))

def _bytes_feature(value):
#Wrapper for inserting bytes features into Example proto.
    return tf.train.Feature(bytes_list=tf.train.BytesList(value=[value]))



def image_example(image_string, label):
#creates the example to write in the TFrecords file
    image_shape = tf.image.decode_image(image_string)
    feature = {
      'height': _int64_feature(image_shape.shape[0]),
      'width': _int64_feature(image_shape.shape[1]),
      'depth': _int64_feature(image_shape.shape[2]),
      'label': _int64_feature(label),
      'image_raw': _bytes_feature(image_string),
  }
    return tf.train.Example(features=tf.train.Features(feature=feature))


def randomize_data(folder_path, record_file, val_percentage = 0.1, test_percentage = 0.1, class_num = 10):
#creates a randomised vector to choose files for validation and test data
    file_num = 0
    for i in range(class_num):
        folder = i
        data_folder = Path(folder_path + "/%d" %folder)
        directory = os.fsencode(data_folder) 
        for file in os.listdir(directory):
            file_num +=1
    np.random.seed(126)
    randomized = np.random.choice(file_num, int(file_num * (val_percentage + test_percentage)), replace = False)
    to_val = np.sort(np.random.choice(randomized, int(file_num * val_percentage), replace = False))
    to_test = np.sort([x for x in randomized if x not in to_val])
    lim_val = len(to_val) - 1
    lim_test = len(to_test) - 1 
    return to_val, lim_val, to_test, lim_test, file_num

def create_val(folder_path, record_file, to_val, lim_val, class_num = 10):
#creates the validation data tfrecords file
    idx = 0
    buffer_size=0
    file_name = 'val_' + record_file
    with tf.io.TFRecordWriter(file_name) as writer_val:
        for i in range(class_num):
            folder = i
            data_folder = Path(folder_path + "/%d" %folder)
            directory = os.fsencode(data_folder) 
            for file in os.listdir(directory):
                filename = data_folder / os.fsdecode(file)
                image_string = open(filename,'rb').read()
                label = folder
                tf_example = image_example(image_string, label)
                if buffer_size == to_val[idx]:
                    writer_val.write(tf_example.SerializeToString())
                    if idx!=lim_val:
                        idx +=1
                buffer_size +=1

def create_train(folder_path, record_file, to_val, lim_val,to_test, lim_test, class_num = 10):
#creats the training data tfrecords file
    idx_val = 0
    idx_test = 0
    buffer_size=0
    file_name = 'train_' + record_file                    
    with tf.io.TFRecordWriter(file_name) as writer_train:
        for i in range(class_num):
            folder = i
            data_folder = Path(folder_path + "/%d" %folder)
            directory = os.fsencode(data_folder) 
            for file in os.listdir(directory):
                filename = data_folder / os.fsdecode(file)
                image_string = open(filename,'rb').read()
                label = folder
                tf_example = image_example(image_string, label)
                if buffer_size != to_val[idx_val] and buffer_size!=to_test[idx_test]:
                    writer_train.write(tf_example.SerializeToString())
                elif buffer_size == to_val[idx_val]:
                    if idx_val!=lim_val:
                        idx_val +=1
                else:
                    if idx_test!=lim_test:
                        idx_test +=1
                buffer_size +=1

def create_test(folder_path, record_file, to_test, lim_test, class_num = 10, test_percentage = 0.1):
#creats the test data tfrecords file
    idx = 0
    buffer_size=0
    file_name = 'test_' + record_file                    
    with tf.io.TFRecordWriter(file_name) as writer_test:
        for i in range(class_num):
            folder = i
            data_folder = Path(folder_path + "/%d" %folder)
            directory = os.fsencode(data_folder) 
            for file in os.listdir(directory):
                filename = data_folder / os.fsdecode(file)
                image_string = open(filename,'rb').read()
                label = folder
                tf_example = image_example(image_string, label)
                if buffer_size == to_test[idx]:
                    writer_test.write(tf_example.SerializeToString())
                    if idx!=lim_test:
                        idx +=1
                buffer_size +=1                
        
                
        

record_file = 'minst.tfrecords'
folder_path = 'mnist data/trainingSet/trainingSet'
to_val, lim_val , to_test, lim_test = randomize_data(folder_path, record_file, val_percentage, test_percentage ,class_num)
create_val(folder_path, record_file, to_val, lim_val, class_num)
create_train(folder_path, record_file, to_val, lim_val,to_test, lim_test, class_num)
create_test(folder_path, record_file, to_test, lim_test, class_num, test_percentage)


In [154]:
def convert_back(data_type,buffer_size,record_file,val_percentage=0.1, test_percentage = 0.1, channels =1, img_size = (28,28)):
# converts the tfrecords files to images and labels and returns the parsed dataset
    def _parse_image_function(example_proto):
        # Parse the input tf.Example proto using the dictionary above.
        # Create a dictionary describing the features.
        image_feature_description = {
            'height': tf.io.FixedLenFeature([], tf.int64),
            'width': tf.io.FixedLenFeature([], tf.int64),
            'depth': tf.io.FixedLenFeature([], tf.int64),
            'label': tf.io.FixedLenFeature([], tf.int64),
            'image_raw': tf.io.FixedLenFeature([], tf.string),
        }
        image_features = tf.io.parse_single_example(example_proto,image_feature_description)
        image_buffer = image_features['image_raw']
        image = tf.image.decode_jpeg(image_buffer,channels = channels)
        image = tf.image.convert_image_dtype(image,dtype=tf.float32)*(1. / 255)
        image_shape = tf.stack([img_size[0],img_size[1],channels])
        image = tf.reshape(image,image_shape)
        label = tf.cast(image_features['label'],tf.uint8)
        label = tf.squeeze(label)
        return image,label
    
    batch_size = 32
    num_parallel_batches = 2
    if data_type == 'val':
        buffer = int(buffer_size * val_percentage)
    elif data_type == 'test':
        buffer = int(buffer_size * test_percentage)
    else:
        buffer = buffer_size
    raw_image_dataset = tf.data.TFRecordDataset(data_type+ '_' + record_file)
    raw_image_dataset = raw_image_dataset.shuffle(buffer)
    parsed_image_dataset = raw_image_dataset.map(_parse_image_function, num_parallel_calls = num_parallel_batches)
    parsed_image_dataset = parsed_image_dataset.batch(batch_size)
    parsed_image_dataset = parsed_image_dataset.prefetch(1)
    return parsed_image_dataset


record_file = 'minst.tfrecords'
folder_path = 'mnist data/trainingSet/trainingSet'
_, _,_, buffer_size = randomize_data(folder_path, record_file, val_percentage, test_percentage, class_num)
val_ds = convert_back('val',buffer_size = buffer_size, record_file = record_file)
train_ds = convert_back('train',buffer_size = buffer_size, record_file = record_file)
test_ds = convert_back('test',buffer_size = buffer_size, record_file = record_file)

#sanity check
if test:
    for image, label in test_ds.take(1):
        print(image.shape, label.shape)

In [149]:
filters = 32
kernels = (3,3)
pools = (3,3)
dense1 = 128
dense2 = 64
last_dense = 10
dropout1 = 0.4
dropout2 = 0.4

#creating the model
model = tf.keras.Sequential()
model.add(Conv2D(filters, kernel_size=kernels, padding='same', input_shape=(28,28,1)))
model.add(MaxPooling2D(pool_size=pools, padding='same'))
model.add(Flatten())
model.add(Dense(dense1, activation='relu'))
model.add(Dropout(dropout1))
model.add(Dense(dense2, activation='relu'))
model.add(Dropout(dropout2))
model.add(Dense(last_dense, activation='softmax'))

In [150]:
model.compile(optimizer='adam', metrics=['sparse_categorical_accuracy'], loss='sparse_categorical_crossentropy')

In [151]:
model.fit(train_ds, epochs=10, validation_data = val_ds)

Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10


<tensorflow.python.keras.callbacks.History at 0x273bbcf1d48>

In [152]:
model.evaluate(test_ds)

    132/Unknown - 0s 4ms/step - loss: 0.4767 - sparse_categorical_accuracy: 0.8595

[0.4766711139430602, 0.85952383]