In [1]:
#
# a Python function that creates subfolders within a given folder
#

import os

def create_subfolders(parent_folder, subfolder_names):
    for subfolder in subfolder_names:
        subfolder_path = os.path.join(parent_folder, subfolder)
        if not os.path.exists(subfolder_path):
            os.makedirs(subfolder_path)
            
parent_folder = '../../scratch'
subfolder_names = ['models', 'real', 'train','tune', 'src', '.raw']

create_subfolders(parent_folder, subfolder_names)

In [3]:
%%writefile ../../scratch/src/scratch_env.py

#
# set environment variables using the os module
#

import os

def set_environment_variable(variable_name, variable_value):
    os.environ[variable_name] = variable_value
    
variable_name = 'SCRATCH'
variable_value = '../../scratch'

set_environment_variable(variable_name, variable_value)

scratch_path = os.environ.get('SCRATCH', './scratch')

Writing ../../scratch/src/scratch_env.py


In [4]:
%%writefile ../../scratch/src/__init__.py

#
# create an __init__.py to indicate that a directory is a python package.
#

import scratch_env
import install_requirements
import git_clone
import extract_tar_files
import create_image_dataset
import model_builder

Writing ../../scratch/src/__init__.py


In [5]:
%%writefile ../../scratch/src/install_requirements.py

#
# a Python function to install requirements.
#

import subprocess
from IPython.display import clear_output

def install_requirements(file_path):
    subprocess.run(["pip", "install", "--upgrade", "pip"], check=True)
    with open(file_path) as f:
        packages = f.read().splitlines()

    for package in packages:
        subprocess.call([f"pip install {package} -q"], shell=True)
        
install_requirements(file_path="../../requirements.txt")
clear_output()

Writing ../../scratch/src/install_requirements.py


In [6]:
%%writefile ../../scratch/src/git_clone.py

#
# use the subprocess module in Python to run shell commands for a git clone 
#


import subprocess

def git_clone(repository_url, target_directory):
    subprocess.run(["git", "clone", repository_url, target_directory], check=False)
    
repository_url = "https://github.com/redhat-na-ssa/demo-datasci-fingerprint-data.git"
target_directory = "../../scratch/.raw/"

git_clone(repository_url, target_directory)

Writing ../../scratch/src/git_clone.py


In [7]:
%%writefile ../../scratch/src/extract_tar_files.py

#
# a function that extracts the contents of multiple tar archives
#

import tarfile

def extract_tar_files(tar_files, extract_path):
    for tar_file in tar_files:
        with tarfile.open(tar_file) as tar:
            tar.extractall(extract_path)
            
tar_files = ["../../scratch/.raw/left.tar.xz", "../../scratch/.raw/right.tar.xz"]
extract_path = "../../scratch/train/"

extract_tar_files(tar_files, extract_path)

Writing ../../scratch/src/extract_tar_files.py


In [8]:
%%writefile ../../scratch/src/create_image_dataset.py

#
# a function that creates a dataset
#

import tensorflow as tf

# You can then use this function by passing in the desired data and batch size
# usage example:  dataset = create_image_dataset(scratch_path + '/train' 32, 'training')
def create_image_dataset(data_dir, batch_size, subset):
    """
    Creates a dataset of image data from a directory.
    :param data_dir: The directory containing the image data.
    :param batch_size: The batch size to use when training.
    :param subset: The data subset to be created training, validation, test, etc.
    :return: A dataset of image data.
    """
    
    # Load the data from the directory
    dataset = tf.keras.preprocessing.image_dataset_from_directory(
        data_dir,
        labels='inferred',
        label_mode = "categorical", 
        class_names=['left','right'],
        color_mode="grayscale",
        batch_size=batch_size,
        image_size=(96, 96),
        seed=42,
        validation_split=0.3,
        subset=subset,
        interpolation='nearest'
    )
    
    data_aug = tf.keras.Sequential([
        tf.keras.layers.RandomRotation(
            0.2,
            fill_mode='constant',
            interpolation='nearest',
            seed=None,
            fill_value=0.0
        )
    ])
    
    # Use the AUTOTUNE option for optimal data loading performance
    AUTOTUNE = tf.data.experimental.AUTOTUNE
    dataset = dataset.prefetch(buffer_size=tf.data.AUTOTUNE)
    
    return dataset

# create the train and validation and optimize the datasets
train_ds = create_image_dataset(data_dir="../../scratch/train/", batch_size=32, subset="training")
val_ds = create_image_dataset(data_dir="../../scratch/train/", batch_size=32, subset="validation")

Writing ../../scratch/src/create_image_dataset.py


In [9]:
%%writefile ../../scratch/src/model_builder.py

#
# a function that defines, builds and compiles a model
#

import tensorflow as tf

inputShape=(96, 96, 1)
data_format="channels_last"

def model_builder(hp, input_shape):
    model = keras.Sequential(name="fingerprint_prediction")
    """
    Builds and compiles a simple sequential model.
    :param hp: The optimal parameters for training.
    :param input_shape: The shape of the input data.
    :return: The compiled model.
    """
    # comment out to remove augmentation
    data_augmentation
    input_shape=(img_height, img_width, 1)
    chanDim = -1
        
    # first CONV => RELU => POOL layer set
    model.add(Conv2D(
        hp.Int("conv_1", min_value=32, max_value=96, step=32),
        (3, 3), padding="same", input_shape=inputShape, data_format=data_format))
    model.add(Activation("relu"))
    model.add(BatchNormalization(axis=chanDim))
    model.add(MaxPooling2D(pool_size=(2, 2), data_format=data_format))
    
    # second CONV => RELU => POOL layer set
    model.add(Conv2D(
        hp.Int("conv_2", min_value=64, max_value=128, step=32),
        (3, 3), padding="same", data_format=data_format))
    model.add(Activation("relu"))
    model.add(BatchNormalization(axis=chanDim))
    model.add(MaxPooling2D(pool_size=(2, 2), data_format=data_format))
    
    # third CONV => RELU => POOL layer set
    model.add(Conv2D(
        hp.Int("conv_3", min_value=96, max_value=256, step=32),
        (3, 3), padding="same", data_format=data_format))
    model.add(Activation("relu"))
    model.add(BatchNormalization(axis=chanDim))
    model.add(MaxPooling2D(pool_size=(2, 2), data_format=data_format))    
    
    # first (and only) set of FC => RELU layers
    model.add(Flatten())
    model.add(Dense(hp.Int("dense_units", min_value=256,
                           max_value=768, step=256)))
    model.add(Activation("relu"))
    model.add(BatchNormalization())
    model.add(Dropout(0.5))
    # softmax classifier
    model.add(Dense(num_classes))
    model.add(Activation("softmax"))
    
        # initialize the learning rate choices and optimizer
    lr = hp.Choice("learning_rate",
                   values=[1e-1, 1e-2, 1e-3])
    # compile the model
    model.compile(optimizer='adam', loss="categorical_crossentropy",
                  metrics=["accuracy"])
   
    # return the model
    model.compile(
        optimizer=tf.keras.optimizers.Adam(learning_rate=lr),
        loss=tf.losses.CategoricalCrossentropy(from_logits=False),
        # metrics to be evaluated by the model during training and testing.The strings 'accuracy' or 'acc', TF converts this to binary, categorical or sparse.
        metrics=['accuracy'],
    )
    return model

Writing ../../scratch/src/model_builder.py


In [30]:
%%writefile ../../scratch/src/run_hyperband.py

from kerastuner import Hyperband
import tensorflow as tf

communication_options = tf.distribute.experimental.CommunicationOptions(
    implementation=tf.distribute.experimental.CommunicationImplementation.AUTO)
strategy = tf.distribute.MultiWorkerMirroredStrategy(communication_options=communication_options)

def run_hyperband(model_builder, x_train, y_train, x_val, y_val, project_name):
    # Initialize the Hyperband tuner
    tuner = kt.Hyperband(
        model_builder,
        objective='val_accuracy',
        max_epochs=5,
        factor=3,
        project_name='hypertune',
        distribution_strategy=strategy,
        directory='../../scratch/tune/model_hp',
        overwrite=True

    )

    # Fit the tuner to the training data
    tuner.search(x_train, y_train, epochs=max_epochs, validation_data=(x_val, y_val))

    # Get the best model from the search
    best_model = tuner.get_best_models(num_models=1)[0]

    # Evaluate the best model on the validation data
    _, acc = best_model.evaluate(x_val, y_val, verbose=0)

    return acc

Writing ../../scratch/src/run_hyperband.py


In [None]:
%writefile src/train_model.py

def train_model(model, X_train, y_train, batch_size=32, epochs=10, validation_split=0.2):
    model.compile(optimizer='adam', loss='binary_crossentropy', metrics=['accuracy'])
    history = model.fit(X_train, y_train, batch_size=batch_size, epochs=epochs, validation_split=validation_split)
    return history

In [None]:
import tensorflow as tf

def train_model(model, dataset, epochs, communication_options):
    """
    Trains a model using data parallelism with the specified communication options.
    :param model: The model to train.
    :param dataset: The dataset to use for training.
    :param epochs: The number of epochs to train the model for.
    :param communication_options: The communication options to use when training.
    """
    # Use the MirroredStrategy for data parallelism
    communication_options = tf.distribute.experimental.CommunicationOptions(
            # AUTO defers the choice to Tensorflow.
            implementation=tf.distribute.experimental.CommunicationImplementation.AUTO)
    
    strategy = tf.distribute.MultiWorkerMirroredStrategy(communication_options=communication_options)
    
    with strategy.scope():
        # Compile the model
        model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

        # Use the experimental CommunicationOptions to specify communication options
        dataset = dataset.batch(batch_size=32).prefetch(buffer_size=tf.data.AUTOTUNE,
                                                        communication_options=communication_options)

        # Train the model
        history = model.fit(dataset, epochs=epochs)

    return history