In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Flatten, Reshape, Conv2D, Conv1D, MaxPooling2D, Input, Concatenate
from tensorflow.keras.models import Model
from tensorflow import keras, config
import numpy as np
import matplotlib.pyplot as plt

In [None]:
print("GPUs Available: ", len(config.list_physical_devices('GPU')))

In [None]:
def show_image(img, width):
    plt.figure(figsize=(30,5))
    plt.xlim(0, width)
    plt.imshow(img)
    plt.show()

In [None]:
%load_ext autoreload
%autoreload 2
from image_process.image_preprocessors import shift_randomly_binary_labels
from data_filters import min_n_letters, take_percent
from keras_generators.image_generator import Image_Generator_RAW
# Global settings
TEST_SPLIT_SIZE = 0.80
VALIDATION_SPLIT_SIZE = 0.90
IMAGE_TARGET_SIZE = (5, 1400, 1)
BATCH_SIZE =  128
EPOCH_SIZE = 500
LETTER_END_POSITION = "P1"
ADD_NOISE_RANDOMLY = [-2, 30]
ADD_SIGNAL_INDENT_RANDOMLY = [12860, 12860]
IMAGE_PREPOCESSORS = [
    {"func": shift_randomly_binary_labels, "params" : [-200, -100]},
]

In [None]:
from Image_Generator_helpers import  DataSets, set_paths, global_path, Random_Item
set_obj = DataSets(set_paths, global_path)

def get_binary_labels(random_items):
    return np.zeros(len(random_items))

def position_labels_post_process(labels, set_obj: DataSets, random_items: list[Random_Item]): #
    return np.array(labels)

In [None]:

class Binary_Image_Generator:
    def __init__(self, image_amount, image_prepocessors, noise_range):
        self.generator = Image_Generator_RAW(
            image_amount=image_amount,
            set_obj= set_obj,
            FFT_JUMP=64,
            batch_size=BATCH_SIZE,
            image_target_size=IMAGE_TARGET_SIZE,
            image_prepocessors=image_prepocessors,
            noise_range=noise_range,
            random_signal_indent=ADD_SIGNAL_INDENT_RANDOMLY,
            label_func = get_binary_labels,
            label_post_process = position_labels_post_process
        )

Train_Generator_RAW = Binary_Image_Generator(
    image_amount=BATCH_SIZE * EPOCH_SIZE,
    image_prepocessors=IMAGE_PREPOCESSORS,
    noise_range=ADD_NOISE_RANDOMLY
).generator

# (t,l) = Train_Generator_RAW.__getitem__(0)


# for idx,img in enumerate(t):
#     print("label: ")
#     print(l[idx])
#     show_image(img, 200)
    

In [None]:
def conv_model_velocity(input_layer):
    x = keras.layers.Cropping2D(cropping=((0, 0), (0,1200)), data_format=None)(input_layer)

    x = Conv2D(90,(1,7), padding="same",activation="relu")(x)
    x = MaxPooling2D(pool_size=(2,1),padding="same")(x)

    x = Conv2D(90,(1,7),padding="same",activation="relu")(x)
    x = MaxPooling2D(pool_size=(1,2),padding="same")(x)

    x = Conv2D(90,(1,5),padding="same",activation="relu")(x)
    x = MaxPooling2D(pool_size=(1,2),padding="same")(x)

    x = Conv2D(90,(3,3),padding="same",activation="relu")(x)
    x = MaxPooling2D(pool_size=(1,2),padding="same")(x)

    x = Conv2D(90,(3,3),padding="same",activation="relu")(x)
    x = MaxPooling2D(pool_size=(1,2),padding="same")(x)

    x = Conv2D(90,(3,3),padding="same",activation="relu")(x)
    x = MaxPooling2D(pool_size=(1,2),padding="same")(x)

    x = Conv2D(90,(3,3),padding="same",activation="relu")(x)
    x = MaxPooling2D(pool_size=(1,2),padding="same")(x)

    x = Flatten()(x)

    return x

In [None]:
from tensorflow.keras import layers

input_layer = Input(shape=IMAGE_TARGET_SIZE)
conv_model_position_flattened = conv_model_velocity(input_layer)
output_layer_position = Dense(1, name="regr", activation='sigmoid')(conv_model_position_flattened)

model = Model(inputs=input_layer, outputs=output_layer_position)
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

print(model.summary())

In [None]:
init_epoch = 0

In [None]:

num_epochs = 1

def fit_model(epochs):
	
	global init_epoch
	history = model.fit(
		Train_Generator_RAW,
		steps_per_epoch = EPOCH_SIZE,
		epochs = epochs + init_epoch,
		initial_epoch=init_epoch,
		verbose =1,
		workers=12,
		use_multiprocessing=True
	)
					
	init_epoch += epochs
	return history

history = fit_model(num_epochs)


In [None]:
%%capture cap --no-stderr
from log_helpers.training_log import Training_Data_Log, print_name, json_to_file, get_deviating_predictions
import inspect

data_log = Training_Data_Log()
data_log.model_config = model.to_json()
data_log.model_config_method_string = [inspect.getsource(conv_model_velocity)]
data_log.training_set_size = EPOCH_SIZE
data_log.image_pre_processors = print_name(IMAGE_PREPOCESSORS)
data_log.noise_added = ADD_NOISE_RANDOMLY
data_log.model_summary = model.summary()
data_log.model_optimizer = str(type(model.optimizer))
data_log.model_history = history.history
data_log.model_history_final_epoch = {k: v[-1] for k, v in history.history.items()}
data_log.total_epochs = init_epoch


In [None]:
class Binary_Results:
    total_predictions: int = None
    noise_level: list[int] = None
    image_preprocessors_test = None
    predictions_incorrect: int = None
    predictions_incorrect_percent: int = None
    model_evaluation = None

In [None]:
# Testing parameters:
NOISE_LEVELS_TESTING = [[10, 30], [-2, 30]]
BATCHES_TESTING = 200

In [None]:
from log_helpers.training_log import binary_comparer

result_array = []
for noise_level in NOISE_LEVELS_TESTING:

    image_generator_testing = Binary_Image_Generator(
        image_amount = BATCH_SIZE * BATCHES_TESTING,
        image_prepocessors = IMAGE_PREPOCESSORS,
        noise_range = noise_level
    ).generator

    binary_differences  = get_deviating_predictions(image_generator_testing, model, binary_comparer)

    # Collect results:
    results = Binary_Results()
    results.model_evaluation = model.evaluate(image_generator_testing, verbose = 0)
    results.total_predictions = BATCHES_TESTING * BATCH_SIZE
    results.noise_level = noise_level
    results.image_preprocessors_test = print_name(IMAGE_PREPOCESSORS)
    results.predictions_incorrect = len(binary_differences)
    results.predictions_incorrect_percent = len(binary_differences) / (BATCHES_TESTING * BATCH_SIZE) * 100
    
    # Append results to result array
    result_array.append(results.__dict__)


In [None]:
data_log.results = result_array
data_log.results

In [None]:
json_to_file("logs/binary/binary_data_log", data_log)

In [None]:
for idx, diff in enumerate(binary_differences):

    if idx > 10:
        break

    pred, correct, img = diff

    print('Prediction', round(pred[0]))
    print('Correct', round(correct))
    show_image(img, 300)

    print("----------------------------------------------------------------------------------------")
