<a href="https://colab.research.google.com/github/javahedi/QuantumStateTomography-ML/blob/main/qst_ml.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [29]:
import glob
import os
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, callbacks
import matplotlib.pyplot as plt
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [30]:
path= "/content/drive/MyDrive/SL_data/"
#name_list=os.listdir(path) # Makes a list with all file names
cvs_names = glob.glob(f'{path}*.csv')

data_list = []
for id, name in enumerate(cvs_names):
    #print(name)
    data         = np.loadtxt( os.path.join(path,cvs_names[id]),delimiter=',',skiprows=1)


    # scaled_x   between [0,1]
    #data[:,0] = np.sin(data[:,0]) #/ (2 * np.pi)  # theta
    #data[:,1] = np.sin(data[:,1]) #/ np.pi       # phi
    data_list.append(data[:,:2])
dataset = np.stack(data_list)
print(dataset.shape)  # This will print the shape of the batch


(100, 99, 2)


In [None]:
#Define your custom probability function
def probability(y1, y2):
    # Your custom logic here
    p_out = y1 + y2  # Replace this with your actual logic
    return p_out

# Define the custom loss function
def custom_loss(y_true, y_pred):
    # Assuming y_true and y_pred are of shape (batch_size, 2)
    y1_true, y2_true = tf.split(y_true, num_or_size_splits=2, axis=1)
    y1_pred, y2_pred = tf.split(y_pred, num_or_size_splits=2, axis=1)

    # Calculate the probability using the custom function
    p_true = probability(y1_true, y2_true)
    p_pred = probability(y1_pred, y2_pred)

    # Calculate MSE loss
    mse_loss = tf.keras.losses.mean_squared_error(p_true, p_pred)

    return mse_loss


# Stage 1: Feature Reduction
class Stage1(tf.keras.Model):
    def __init__(self, units, filters, kernel_size, lstm_units):
        super(Stage1, self).__init__()

        self.use_dense = layers.Dense(units, activation='relu')
        self.use_cnn   = layers.Conv1D(filters=filters, kernel_size=kernel_size,
                                       activation='relu', padding='same')
        self.use_lstm  = layers.LSTM(lstm_units, return_sequences=True)

    def call(self, inputs, model_type, training=None, mask=None):
        if model_type == 'dense':
            output = self.use_dense(inputs)
        elif model_type == 'cnn':
            output = self.use_cnn(inputs)
        elif model_type == 'lstm':
            output = self.use_lstm(inputs)
        else:
            raise ValueError("Invalid model_type. Choose 'dense', 'cnn', or 'lstm'.")

        return output


class CNN(tf.keras.Model):
    def __init__(self, filters, kernel_size):
        super(CNN, self).__init__()
        self.conv1d = layers.Conv1D(filters=filters, kernel_size=kernel_size,
                                    activation='relu', padding='same')

    def call(self, inputs, training=None, mask=None):
        output = self.conv1d(inputs)
        return output

class LSTM(tf.keras.Model):
    def __init__(self, units):
        super(LSTM, self).__init__()
        self.lstm = layers.LSTM(units, return_sequences=True)

    def call(self, inputs, training=None, mask=None):
        output = self.lstm(inputs)
        return output

class FiLM(tf.keras.Model):
    """
    Feature-wise Linear Modulation (FiLM):
    FiLM layers dynamically modulate the output of another layer based on the input.
    This can be useful for conditional feature modulation:
    """
    def __init__(self, units):
        super(FiLM, self).__init__()
        self.dense = layers.Dense(units, activation='relu')
        self.film = layers.Dense(units * 2)  # Twice the units for gamma and beta

    def call(self, inputs, training=None, mask=None):
        x = self.dense(inputs)
        film_params = self.film(inputs)
        gamma, beta = tf.split(film_params, 2, axis=-1)
        output = gamma * x + beta
        return output



class Stage1(tf.keras.Model):
    def __init__(self):
        super(Stage1, self).__init__()
        self.dense_layer = layers.Dense(1, activation='linear')  # Linear activation for regression

    def call(self, inputs, training=None, mask=None):
        output = self.dense_layer(inputs)
        return output


# Stage 2: Permutation Invariance
class Stage2(tf.keras.Model):
    def __init__(self):
        super(Stage2, self).__init__()

        # Ensure d_model is divisible by num_heads
        assert d_model % num_heads == 0, "d_model must be divisible by num_heads"

        self.attention = layers.MultiHeadAttention(
              num_heads=num_heads, key_dim=d_model // num_heads
            )
        self.pooling = layers.GlobalAveragePooling1D()

    def call(self, inputs, training=None, mask=None):
        attention_output = self.attention(inputs, inputs)
        pooled_output    = self.pooling(attention_output)
        return pooled_output


# Stage 3: Dense Feed-Forward Network
class Stage3(tf.keras.Model):
    def __init__(self):
        super(Stage3, self).__init__()
        self.dense_layer1 = layers.Dense(64, activation='relu')
        self.dense_layer2 = layers.Dense(2, activation='linear')  # Two outputs with linear activation for regression

    def call(self, inputs, training=None, mask=None):
        x      = self.dense_layer1(inputs)
        output = self.dense_layer2(x)
        return output



In [None]:

# Connect the models
num_samples     = 1500
sequence_length = 50
input_features  = 3
batch_size      = 32
num_epochs      = 10

input_data      = tf.keras.Input(shape=(num_samples, sequence_length, input_features))

# Stage 1
model_type    = 'cnn'  # Change this to 'dense', 'cnn', or 'lstm'
stage1_model  = Stage1(units=64, filters=32, kernel_size=3, lstm_units=64) # Create an instance of Stage1
stage1_output = stage1_model(input_data, model_type=model_type)


# Stage 2
num_heads        = 8
d_model          = 64
stage2_attention = Stage2(num_heads, d_model)  # Create an instance of Stage2
stage2_output    = stage2_attention(stage1_output)


# Stage 3
stage3_output = Stage3()(stage2_output)


# Create the final model
hybrid_model = tf.keras.Model(inputs=input_data, outputs=stage3_output)



In [None]:
# Assuming we have X_train and y_train
# X_train has shape (num_samples, sequence_length, input_features)
# y_train has shape (num_samples, 2)
# Adjust the shape of X_train to (num_samples, num_features, num_components)
X_train = np.random.random((int(num_samples*0.8), sequence_length, input_features))
y_train = np.random.random((int(num_samples*0.8), 2))


X_val = np.random.random((int(num_samples*0.2), sequence_length, input_features))
y_val = np.random.random((int(num_samples*0.2), 2))

In [None]:

# Create callbacks
checkpoint_callback = callbacks.ModelCheckpoint(
    filepath='model_checkpoint.h5',
    monitor='val_loss',  # Choose the metric to monitor
    save_best_only=True,
    save_weights_only=True,
    verbose=1
)


early_stopping_callback = callbacks.EarlyStopping(
    monitor='val_loss',  # Choose the metric to monitor
    patience=5,  # Number of epochs with no improvement after which training will be stopped
    verbose=1
)


# Compile the model with the custom loss function
hybrid_model.compile(optimizer='adam', loss=custom_loss)


# Fit the model with batching and callbacks
history = hybrid_model.fit(
    X_train, y_train,
    epochs=num_epochs,
    batch_size=batch_size,
    validation_data=(X_val, y_val),  # Optional: provide validation data for monitoring
    callbacks=[checkpoint_callback, early_stopping_callback]
)


In [None]:

# Plot training history
plt.plot(history.history['loss'], label='Training Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.legend()
plt.show()