In [None]:
import tqdm
import numpy as np
import cv2
from  train import make_set
import matplotlib.pyplot as plt
import os
import keras
import tensorflow as tf

In [None]:
def diff_for_one(y_true, y_pred):   
    y_true = tf.math.floormod(y_true, 720)
    y_pred = tf.math.floormod(y_pred, 720)
    return (y_pred - y_true + 360) % 720 - 360

@keras.saving.register_keras_serializable()
def diff(y_true, y_pred):
    y_true = tf.convert_to_tensor(y_true)
    y_pred = tf.convert_to_tensor(y_pred)
    y_true = tf.cast(y_true, dtype=tf.float32)
    y_pred = tf.cast(y_pred, dtype=tf.float32)
    y_true = tf.math.floormod(y_true, 720)
    y_pred = tf.math.floormod(y_pred, 720)

    return (y_pred - y_true + 360) % 720 - 360

@keras.saving.register_keras_serializable()
def custom_loss(y_true, y_pred):
    return tf.reduce_mean(tf.square(diff(y_true, y_pred)))

@keras.saving.register_keras_serializable()
def custom_accuracy(y_true, y_pred):
    return tf.reduce_mean(tf.cast(tf.abs(diff(y_true, y_pred)) <= 20, tf.float32))

In [None]:
train, valid = make_set("c:\\Users\\mymri\\repos\\clock_set\\")

In [None]:
img_paths = []

def prepare_data(data_list, output_dir, N, aug_sequence=None):
    os.makedirs(output_dir, exist_ok=True)
    
    for num, i in enumerate(tqdm.tqdm(range(0, len(data_list), N))):
        batch_data = []
        batch_labels = []
        
        for j in range(i, min(i + N, len(data_list))):
            image_path, label = data_list[j]
            img_paths.append(image_path)
            image = cv2.imread(image_path)
            if image is None:
                print(f"Warning: Could not read image: {image_path}")
                continue
            
            image = cv2.resize(image, (224, 224))
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
            batch_data.append(image)
            batch_labels.append(label)
        
        if aug_sequence is not None:
            batch_data = aug_sequence(images=np.array(batch_data))

        np.save(os.path.join(output_dir, f"data_{num}.npy"), np.array(batch_data))
        np.save(os.path.join(output_dir, f"labels_{num}.npy"), np.array(batch_labels))

In [None]:
prepare_data(train, "train_data_dir", N = 4096)

In [None]:
prepare_data(valid, "valid_data_dir", N = 4096)

In [None]:
def data_generator_from_folder(data_dir, batch_size):
    data_files = sorted([f for f in os.listdir(data_dir) if f.startswith('data_') and f.endswith('.npy')])
    label_files = sorted([f for f in os.listdir(data_dir) if f.startswith('labels_') and f.endswith('.npy')])

    indices = list(range(len(data_files)))

    batch_images = []  
    batch_labels_slice = []
    for ind in indices:
        data_file = data_files[ind]
        label_file = label_files[ind]
        data_path = os.path.join(data_dir, data_file)
        labels_path = os.path.join(data_dir, label_file)
            
        batch_data = np.load(data_path)
        batch_labels = np.load(labels_path)
            
        for i in range(0, len(batch_data), batch_size):
            batch_labels_slice.append(batch_labels[i:i + batch_size])
            batch_images.append(tf.keras.applications.mobilenet.preprocess_input(batch_data[i:i + batch_size]))

    return  batch_images, batch_labels_slice
data, labels = data_generator_from_folder("train_data_dir", 1)

In [None]:
for i in range(0,5):
    im = data[i][0]
    label = labels[i][0]
    
    im_from_f = cv2.imread(img_paths[i])

    fig, axes = plt.subplots(1, 2, figsize=(10, 5))

    axes[0].imshow(cv2.cvtColor(im_from_f, cv2.COLOR_BGR2RGB))
    axes[0].set_title(f"just path: {img_paths[i]}")
    axes[0].axis('off')

    axes[1].imshow(im)
    axes[1].set_title(f"From Array: {label}")
    axes[1].axis('off')

    plt.tight_layout()
    plt.show()