In [None]:
## Import

import tensorflow as tf
import numpy as np
import random
import os

from tensorflow import keras
from tensorflow.keras.datasets import mnist
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense
from tensorflow.keras.layers import Dropout
from tensorflow.keras.utils import to_categorical

In [None]:
## Helper functions

def read_from_file(file_name):
    return np.load(file_name)
    
def generate_data_labels(labels, label, length):
    data_label = []
    for i in range(length):
        result = [1 if labels[j] == label else 0 for j in range(len(labels))]
        data_label.append(result)
    return np.array(data_label)

def read_from_batch(data_directory, chosen_labels, label, batch_num):
    # FOR NOW list(read_from_file().values())
    print("start reading data...") 
    
    data = read_from_file(data_directory + "data" + "_batch_" + str(batch_num) + ".npy")
    data_labels = generate_data_labels(chosen_labels, label, len(data))
    print("read data")
    
    test = read_from_file(data_directory + "validation" + "_batch_" + str(batch_num) + ".npy")
    test_labels = generate_data_labels(chosen_labels, label, len(test))
    print("read validation")
    
    return data, data_labels, test, test_labels

def append_or_copy_array(base_array, new_array):
    print(base_array.shape, new_array.shape)
    if base_array.size == 0:
        result_array = new_array
    else:
        result_array = np.concatenate((base_array, new_array), axis=0)

    return result_array

In [1]:
## Model architecture

model = tf.keras.Sequential([
    tf.keras.layers.Conv3D(32, (3, 3, 3), activation='relu', input_shape=(37, 100, 176, 3)),
    tf.keras.layers.MaxPooling3D((2, 2, 2)),
    tf.keras.layers.Dropout(0.25),  # Add dropout with a rate of 25%
    tf.keras.layers.Conv3D(64, (3, 3, 3), activation='relu'),
    tf.keras.layers.MaxPooling3D((2, 2, 2)),
    tf.keras.layers.Dropout(0.25),  # Add another dropout layer with the same rate
    tf.keras.layers.Conv3D(64, (3, 3, 3), activation='relu'),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(128, activation='relu'),
    tf.keras.layers.Dense(2, activation='softmax')
])

In [None]:
## Traning on batch

data_directory = 'D:\\Magshimim\\Dataset\\batches 5 multiplier\\'
chosen_labels = [14, 15]
batch_multiplier = 5

# Define loss function and optimizer
loss_fn = 'categorical_crossentropy'
optimizer = tf.keras.optimizers.Adam()

# Training loop using train_on_batch
num_epochs = 10
batch_size = 30  # Adjust batch size as needed
print("starting...")

for batch in range(batch_multiplier):
    data = np.array([])
    data_labels = np.array([])
    test = np.array([])
    test_labels = np.array([])
    
    for label in chosen_labels:
        data_batch, data_labels_batch, test_batch, test_labels_batch = read_from_batch(data_directory, chosen_labels, label, (label * batch_multiplier + batch))
        print(label * batch_multiplier + batch)
        
        data = append_or_copy_array(data, data_batch)
        data_labels = append_or_copy_array(data_labels, data_labels_batch)
        test = append_or_copy_array(test, test_batch)
        test_labels = append_or_copy_array(test_labels, test_labels_batch)
        
    data = tf.convert_to_tensor(data)
    data_labels = tf.convert_to_tensor(data_labels)
    test = tf.convert_to_tensor(test)
    test_labels = tf.convert_to_tensor(test_labels)
    
    print(data.shape, data_labels.shape, test.shape, test_labels.shape)
    
    for epoch in range(num_epochs):
        total_loss = 0
        correct_predictions = 0

        for i in range(0, len(data), batch_size):
            print(i)
            batch_data = data[i:i+batch_size]
            batch_labels = data_labels[i:i+batch_size]

            with tf.GradientTape() as tape:
                predictions = model(batch_data)
                if loss_fn == 'categorical_crossentropy':
                    loss = tf.keras.losses.categorical_crossentropy(batch_labels, predictions)
                else:
                    raise ValueError("Invalid loss function specified.")

            gradients = tape.gradient(loss, model.trainable_variables)
            optimizer.apply_gradients(zip(gradients, model.trainable_variables))

            total_loss += loss.numpy() 
            correct_predictions += np.sum(np.argmax(predictions, axis=1) == np.argmax(batch_labels, axis=1))

        average_loss = total_loss / (len(data) / batch_size)
        accuracy = correct_predictions / len(data)

        # Validation
        val_predictions = model(test)
        val_loss = tf.keras.losses.categorical_crossentropy(test_labels, val_predictions).numpy()
        val_accuracy = np.sum(np.argmax(val_predictions, axis=1) == np.argmax(test_labels, axis=1)) / len(test)

        print(f'Epoch {epoch+1}, Loss: {average_loss}, Accuracy: {accuracy}, Val Loss: {val_loss}, Val Accuracy: {val_accuracy}')