In [2]:
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers

# Generate dummy data with separate lists for X and y
X_full = tf.random.normal(shape=(100, 32, 32, 3))
y_full = [tf.random.uniform(shape=(100,)) for _ in range(3)]  # List of target variables




In [3]:
for i in y_full:
    print(i.shape)
print(X_full.shape)

(100,)
(100,)
(100,)
(100, 32, 32, 3)


In [4]:
from sklearn.model_selection import train_test_split
import numpy as np

# Convert to numpy
X_full_np = X_full.numpy()

# Stack the arrays vertically to create a single array of shape (3, 100)
y_full_stacked = np.vstack(y_full).T  # Transpose to get shape (100, 3)

# Convert y_full_stacked to integers
y_full_stacked = y_full_stacked.astype(int)

# Split the data
X_train, X_test, y_train, y_test = train_test_split(
    X_full_np, y_full_stacked, test_size=0.2, random_state=42
)

# Split y_train and y_test back into three separate arrays
y1_train, y2_train, y3_train = y_train[:, 0], y_train[:, 1], y_train[:, 2]
y1_test, y2_test, y3_test = y_test[:, 0], y_test[:, 1], y_test[:, 2]

# Print the shapes to verify
print(y1_train.shape, y1_test.shape)
print(y2_train.shape, y2_test.shape)
print(y3_train.shape, y3_test.shape)
print(X_train.shape, X_test.shape)


(80,) (20,)
(80,) (20,)
(80,) (20,)
(80, 32, 32, 3) (20, 32, 32, 3)


In [5]:

def create_model():
  inputs = keras.Input(shape=(32, 32, 3))
  x = layers.Conv2D(32, kernel_size=3, activation="relu")(inputs)
  x = layers.MaxPooling2D(pool_size=2)(x)
  x = layers.Flatten()(x)
  outputs = []
  # Define separate heads for each target variable
  for _ in range(3):
    branch = layers.Dense(64, activation="relu")(x)
    branch = layers.Dense(1)(branch)
    outputs.append(branch)
  model = keras.Model(inputs=inputs, outputs=outputs)
  return model


In [6]:
# scenario 1 full dataset and loss function accessing to full y1, y2 and y3 rows
#
# Create the model
model = create_model()

print(model.summary())

# Compile the model
model.compile(optimizer='adam', loss='mse', metrics=['mae'])

# Train the model on the full dataset
model.fit(X_train, [y1_train, y2_train, y3_train], epochs=10, batch_size=32, validation_split=0.2)

# Evaluate the model on the test data
#model.evaluate(X_test, [y1_test, y2_test, y3_test])

# Evaluate the model on the test data
loss_values, *metrics_values = model.evaluate(X_test, [y1_test, y2_test, y3_test])

# Print the loss value
print("Loss:", loss_values)

# Print the metric values (e.g., mean absolute error)
for i, metric_value in enumerate(metrics_values):
    print(f"Metric {i+1}:", metric_value)


Model: "model"
__________________________________________________________________________________________________
 Layer (type)                   Output Shape         Param #     Connected to                     
 input_1 (InputLayer)           [(None, 32, 32, 3)]  0           []                               
                                                                                                  
 conv2d (Conv2D)                (None, 30, 30, 32)   896         ['input_1[0][0]']                
                                                                                                  
 max_pooling2d (MaxPooling2D)   (None, 15, 15, 32)   0           ['conv2d[0][0]']                 
                                                                                                  
 flatten (Flatten)              (None, 7200)         0           ['max_pooling2d[0][0]']          
                                                                                              

In [8]:
# Scenario 2: our target situation: dummy case reducing access to y3 values by with custom loss function in 'head3'

def create_model1():
    inputs = keras.Input(shape=(32, 32, 3))
    x = layers.Conv2D(32, kernel_size=3, activation="relu")(inputs)
    x = layers.MaxPooling2D(pool_size=2)(x)
    x = layers.Flatten()(x)
    outputs = []
    # Define separate heads for each target variable
    for i in range(3):
        branch = layers.Dense(64, activation="relu", name=f'dense_{i}_out')(x)
        branch = layers.Dense(1, name=f'dense_{i}')(branch)
        outputs.append(branch)
    model = keras.Model(inputs=inputs, outputs=outputs)
    return model

# Create the model
model1 = create_model1()

# Define custom loss functions for each output
def custom_loss_0(y_true, y_pred):
    return keras.losses.mean_squared_error(y_true, y_pred)

def custom_loss_1(y_true, y_pred):
    return keras.losses.mean_squared_error(y_true, y_pred)

def custom_loss_2(y_true, y_pred):
    threshold = 0.8 ##example here gor 80%, but this should be iterated as 20, 40, 60, 80% 
    mask = tf.random.uniform(shape=tf.shape(y_true)) < threshold
    masked_true = tf.boolean_mask(y_true, mask)
    masked_pred = tf.boolean_mask(y_pred, mask)
    return keras.losses.mean_squared_error(masked_true, masked_pred)

custom_losses = {
    'dense_0': custom_loss_0,
    'dense_1': custom_loss_1,
    'dense_2': custom_loss_2
}

# Compile the model with custom losses for each output
model1.compile(optimizer='adam', loss=custom_losses, metrics=['mae'])

# Train the model with masked y3_train
model1.fit(X_train, {'dense_0': y1_train, 'dense_1': y2_train, 'dense_2': y3_train}, epochs=10, batch_size=32, validation_split=0.2)

# Evaluate the model on the test data
loss_values, *metrics_values = model1.evaluate(X_test, [y1_test, y2_test, y3_test])

# Print the loss value
print("Loss:", loss_values)

# Print the metric values (e.g., mean absolute error)
for i, metric_value in enumerate(metrics_values):
    print(f"Metric {i+1}:", metric_value)




Epoch 1/10
Epoch 2/10
Epoch 3/10
Epoch 4/10
Epoch 5/10
Epoch 6/10
Epoch 7/10
Epoch 8/10
Epoch 9/10
Epoch 10/10
Loss: 1.875126145023387e-05
Metric 1: 6.566290721821133e-06
Metric 2: 6.792250587750459e-06
Metric 3: 5.392719685914926e-06
Metric 4: 0.0025624774862080812
Metric 5: 0.00260278326459229
Metric 6: 0.002322223037481308
