In [None]:
import numpy as np
import tensorflow as tf
from tensorflow.keras.layers import Input, Dense, LeakyReLU, ELU
from tensorflow.keras.models import Model
from tensorflow.keras.callbacks import ModelCheckpoint, EarlyStopping
from tensorflow.keras.regularizers import l1, l2
from tensorflow.keras.preprocessing.sequence import pad_sequences
from sklearn.model_selection import train_test_split
from scipy.stats import gamma
import os
from sklearn.metrics import mean_squared_error
import matplotlib.pyplot as plt
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
np.random.seed(70)

# Function to generate synthetic data for Gamma distribution parameters (shape and scale)
def generate_gamma_data(num_shapes=20, num_scales=20):
    a_values = np.random.uniform(1, 5, num_shapes)
    b_values = np.random.uniform(0.5, 2, num_scales)

    x = []
    y_shape = []
    y_scale = []
    sample_sizes = np.random.randint(10, 100, 25)

    for a in a_values:
        for b in b_values:
            for num_samples in sample_sizes:
                for _ in range(5):
                    samples = np.random.gamma(shape=a, scale=b, size=num_samples)
                    samples = np.sort(samples)
                    x.append(np.insert(samples, 0, num_samples))
                    y_shape.append([a])
                    y_scale.append([b])

    max_length = 101
    x = pad_sequences(x, maxlen=max_length, dtype='float32', padding='post', truncating='post')

    x = np.array(x)
    y_shape = np.array(y_shape)
    y_scale = np.array(y_scale)

    return x, y_shape, y_scale

# Generate synthetic data for Gamma distribution
x, y_shape, y_scale = generate_gamma_data()

# Split the data into training and testing sets
x_train, x_test, y_train_shape, y_test_shape, y_train_scale, y_test_scale = train_test_split(
    x, y_shape, y_scale, test_size=0.2, random_state=42
)

# Save data to Google Drive (adjust the path as needed)
data_dir = "/content/drive/MyDrive/Parameter_estimation/gamma_distribution"
data_path = os.path.join(data_dir, "data_large.npz")

os.makedirs(data_dir, exist_ok=True)

np.savez(data_path,
         x_train=x_train,
         x_test=x_test,
         y_train_shape=y_train_shape,
         y_train_scale=y_train_scale,
         y_test_shape=y_test_shape,
         y_test_scale=y_test_scale)
print(f"Training and test data saved to {data_path}")



Training and test data saved to /content/drive/MyDrive/Parameter_estimation/gamma_distribution/data_large.npz


In [None]:
data_path = "./drive/MyDrive/Parameter_estimation/gamma_distribution/data.npz"
# Function to load data from file
def load_data(data_path):
    with np.load(data_path) as data:
        x_train = data['x_train']
        x_test = data['x_test']
        y_train_shape = data['y_train_shape']
        y_train_scale = data['y_train_scale']
        y_test_shape = data['y_test_shape']
        y_test_scale = data['y_test_scale']
        return x_train, x_test, y_train_shape, y_train_scale, y_test_shape, y_test_scale

# Load data
x_train, x_test, y_train_shape, y_train_scale, y_test_shape, y_test_scale = load_data(data_path)

In [None]:
# Define the neural network architecture
input_layer = Input(shape=(101,))
hidden1 = Dense(101, kernel_regularizer=l1(0.01))(input_layer)
hidden1 = ELU(alpha=1.0)(hidden1)
hidden2 = Dense(101, kernel_regularizer=l2(0.01))(hidden1)
hidden2 = LeakyReLU(alpha=0.01)(hidden2)
hidden3 = Dense(101, kernel_regularizer=l1(0.01))(hidden2)
hidden3 = ELU(alpha=1.0)(hidden3)
hidden4 = Dense(101, kernel_regularizer=l2(0.01))(hidden3)
hidden4 = LeakyReLU(alpha=0.01)(hidden4)

# Branch for shape (a)
shape_hidden5 = Dense(101, kernel_regularizer=l1(0.01))(hidden4)
shape_hidden5 = ELU(alpha=1.0)(shape_hidden5)
shape_hidden6 = Dense(101, kernel_regularizer=l2(0.01))(shape_hidden5)
shape_hidden6 = LeakyReLU(alpha=0.01)(shape_hidden6)
shape_hidden7 = Dense(10, kernel_regularizer=l1(0.01))(shape_hidden6)
shape_hidden7 = ELU(alpha=1.0)(shape_hidden7)
shape_hidden8 = Dense(10, kernel_regularizer=l2(0.01))(shape_hidden7)
shape_hidden8 = LeakyReLU(alpha=0.01)(shape_hidden8)
shape_hidden9 = Dense(10, kernel_regularizer=l1(0.01))(shape_hidden8)
shape_hidden9 = ELU(alpha=1.0)(shape_hidden9)
shape_hidden10 = Dense(10, kernel_regularizer=l2(0.01))(shape_hidden9)
shape_hidden10 = LeakyReLU(alpha=0.01)(shape_hidden10)
shape_hidden11 = Dense(10, kernel_regularizer=l1(0.01))(shape_hidden10)
shape_hidden11 = ELU(alpha=1.0)(shape_hidden11)
shape_hidden12 = Dense(10, kernel_regularizer=l2(0.01))(shape_hidden11)
shape_hidden12 = LeakyReLU(alpha=0.01)(shape_hidden12)
shape_hidden13 = Dense(10, kernel_regularizer=l1(0.01))(shape_hidden12)
shape_hidden13 = ELU(alpha=1.0)(shape_hidden13)
shape_hidden14 = Dense(10, kernel_regularizer=l2(0.01))(shape_hidden13)
shape_hidden14 = LeakyReLU(alpha=0.01)(shape_hidden14)
shape_output = Dense(1, activation='linear', kernel_regularizer=l2(0.01), name='shape_output')(shape_hidden14)

# Branch for scale (b)
scale_hidden5 = Dense(101, kernel_regularizer=l1(0.01))(hidden4)
scale_hidden5 = ELU(alpha=1.0)(scale_hidden5)
scale_hidden6 = Dense(101, kernel_regularizer=l2(0.01))(scale_hidden5)
scale_hidden6 = LeakyReLU(alpha=0.01)(scale_hidden6)
scale_hidden7 = Dense(10, kernel_regularizer=l1(0.01))(scale_hidden6)
scale_hidden7 = ELU(alpha=1.0)(scale_hidden7)
scale_hidden8 = Dense(10, kernel_regularizer=l2(0.01))(scale_hidden7)
scale_hidden8 = LeakyReLU(alpha=0.01)(scale_hidden8)
scale_output = Dense(1, activation='linear', kernel_regularizer=l2(0.01), name='scale_output')(scale_hidden8)

# Define the model
model = Model(inputs=input_layer, outputs=[shape_output, scale_output])

# Compile the model with separate losses for each output
model.compile(
    optimizer='adam',
    loss={'shape_output': 'mean_squared_error', 'scale_output': 'mean_squared_error'},
    metrics={'shape_output': ['mean_squared_error'], 'scale_output': ['mean_squared_error']}
)

# Define callbacks
data_dir = "/content/drive/MyDrive/Parameter_estimation/gamma_distribution"
checkpoint_path = os.path.join(data_dir, "model_checkpoint.h5")
checkpoint_callback = ModelCheckpoint(
    filepath=checkpoint_path,
    save_weights_only=True,
    save_best_only=True,
    save_freq='epoch'
)
early_stopping_callback = EarlyStopping(
    monitor='val_loss',
    patience=5,
    restore_best_weights=True
)
if os.path.exists(checkpoint_path):
    model.load_weights(checkpoint_path)
    print(f"Loaded weights from {checkpoint_path}")
else:
    print(f"No checkpoint found at {checkpoint_path}. Training from scratch.")



Loaded weights from /content/drive/MyDrive/Parameter_estimation/gamma_distribution/model_checkpoint.h5


In [None]:
# Train the model
history = model.fit(
    x_train, {'shape_output': y_train_shape, 'scale_output': y_train_scale},
    validation_data=(x_test, {'shape_output': y_test_shape, 'scale_output': y_test_scale}),
    epochs=1000, batch_size=32,
    callbacks=[checkpoint_callback, early_stopping_callback]
)

# Save the final model weights
model.save_weights(checkpoint_path)
print(f"Model weights saved to {checkpoint_path}")

In [None]:
# Evaluate the model
evaluation = model.evaluate(x_test, {'shape_output': y_test_shape, 'scale_output': y_test_scale})
loss, mse_shape, mse_scale = evaluation[0], evaluation[1], evaluation[2]
print(f"Test Loss: {loss}, Test MSE for Shape: {mse_shape}, Test MSE for Scale: {mse_scale}")

# Predictions
predictions = model.predict(x_test)
predicted_shapes = predictions[0]
predicted_scales = predictions[1]

print("Sample predictions:")
for i in range(5):
    print(f"Predicted Shape (a): {predicted_shapes[i][0]}, Actual Shape (a): {y_test_shape[i][0]}")
    print(f"Predicted Scale (b): {predicted_scales[i][0]}, Actual Scale (b): {y_test_scale[i][0]}")

# Compare with MLE
def gamma_mle(samples):
    shape, loc, scale = gamma.fit(samples, floc=0)
    return shape, scale

print("\nComparison with MLE:")
for i in range(5):
    sample = x_test[i][1:1 + int(x_test[i][0])]
    mle_shape, mle_scale = gamma_mle(sample)
    print(f"Sample {i+1}: MLE Shape: {mle_shape:.4f}, Predicted Shape: {predicted_shapes[i][0]:.4f}")
    print(f"Sample {i+1}: MLE Scale: {mle_scale:.4f}, Predicted Scale: {predicted_scales[i][0]:.4f}")


Test Loss: 1.1021833419799805, Test MSE for Shape: 0.636649489402771, Test MSE for Scale: 0.20896796882152557
Sample predictions:
Predicted Shape (a): 2.7356138229370117, Actual Shape (a): 2.2400439130193637
Predicted Scale (b): 1.0634686946868896, Actual Scale (b): 1.0164581573382039
Predicted Shape (a): 3.6975035667419434, Actual Shape (a): 4.620680694895188
Predicted Scale (b): 1.2403087615966797, Actual Scale (b): 0.7805620624327825
Predicted Shape (a): 4.984776496887207, Actual Shape (a): 4.709922154475835
Predicted Scale (b): 1.8807728290557861, Actual Scale (b): 1.901226514381733
Predicted Shape (a): 2.825852870941162, Actual Shape (a): 2.267330284762958
Predicted Scale (b): 1.0725339651107788, Actual Scale (b): 1.049074787546425
Predicted Shape (a): 4.049734115600586, Actual Shape (a): 3.8325211006902915
Predicted Scale (b): 1.374035358428955, Actual Scale (b): 0.7805620624327825

Comparison with MLE:
Sample 1: MLE Shape: 2.3750, Predicted Shape: 2.7356
Sample 1: MLE Scale: 0.9

In [None]:
mle_shapes = []
mle_scales = []
for i in range(len(x_test)):
    sample = x_test[i][1:1 + int(x_test[i][0])]
    mle_shape, mle_scale = gamma_mle(sample)
    mle_shapes.append(mle_shape)
    mle_scales.append(mle_scale)

mle_shapes = np.array(mle_shapes)
mle_scales = np.array(mle_scales)

mse_shape_mle = mean_squared_error(y_test_shape, mle_shapes)
mse_scale_mle = mean_squared_error(y_test_scale, mle_scales)

print(f"MSE for Shape (a) - MLE: {mse_shape_mle:.4f}, Model: {mse_shape:.4f}")
print(f"MSE for Scale (b) - MLE: {mse_scale_mle:.4f}, Model: {mse_scale:.4f}")

MSE for Shape (a) - MLE: 1.1726, Model: 0.6366
MSE for Scale (b) - MLE: 0.1128, Model: 0.2090


In [None]:
if os.path.exists(checkpoint_path):
    model.load_weights(checkpoint_path)
    print("Loaded model weights from checkpoint.path")