<a href="https://colab.research.google.com/github/mrpintime/Constraints_NeuralNet/blob/main/Constraints_NeuralNet.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Session 7  


# Approche 1  
Several Conflict Matrix

In [None]:
import random
import numpy as np

list_of_conflicts = []

while len(list_of_conflicts) < 30:
    pairs_list = set()
    matrix = np.zeros((24, 24), dtype=int)

    while matrix.sum() < 40:
        num1 = np.random.choice(range(24))
        num2 = np.random.choice(range(24))

        if num1 == num2:
            continue

        pair = (num1, num2)
        if pair in pairs_list:
            continue

        pairs_list.add(pair)
        matrix[num1, num2] = 1

    if not any(np.array_equal(matrix, conflict) for conflict in list_of_conflicts):
        list_of_conflicts.append(matrix)

In [None]:
conflicts = np.array(list_of_conflicts)

In [None]:
conflicts.shape

In [None]:
def create_adjacent_mask(n_seats, seats_per_row, seats_per_col):
    adjacent_mask = np.zeros((n_seats, n_seats))
    for i in range(n_seats):
        if i % seats_per_row != 0:
            adjacent_mask[i, i-1] = 1
        if i % seats_per_row != seats_per_row-1:
            adjacent_mask[i, i+1] = 1
        if i >= seats_per_row:
            adjacent_mask[i, i-seats_per_row] = 1
        if i < n_seats-seats_per_row:
            adjacent_mask[i, i+seats_per_row] = 1
    return adjacent_mask

adjacent_mask = create_adjacent_mask(24,6,4)

In [None]:
adjacent_mask.shape

## Tensorflow

In [None]:
import tensorflow as tf
import numpy as np

adjacent_mask = create_adjacent_mask(24, 6, 4)

def calculate_conflict(seating_arrangement, conflict_matrix):
    ca_mul = tf.convert_to_tensor(conflict_matrix * adjacent_mask, tf.float64)
    conflicts = tf.reduce_sum(tf.matmul(tf.cast(seating_arrangement, tf.float64), ca_mul))
    return conflicts

def custom_loss(predicted_seating_arrangement, conflicts_tensor):
    alpha = 0.99
    beta = 1 - alpha
    kl  = tf.keras.losses.KLDivergence(reduction='sum')
    batch_size = predicted_seating_arrangement.shape[0]

    # Ensure the predicted seating arrangement is in float64
    predicted_seating_arrangement = tf.cast(predicted_seating_arrangement, tf.float64)

    # Calculate Conflict in produced seating arrangement
    conflict = calculate_conflict(predicted_seating_arrangement, conflicts_tensor)
    # Ensure each seat is assigned to only one person (columns should sum to 1) (Uniqueness)
    probs = tf.reduce_sum(predicted_seating_arrangement, axis=1) / tf.constant(24, dtype=tf.float64)
    one_like = tf.ones_like(probs, tf.float64) / tf.constant(24, dtype=tf.float64)
    # Calculate KLDivergence
    uniqueness = kl(one_like, probs)

    total_loss =  0.8 * uniqueness + 0.2 * conflict

    return total_loss / batch_size # Normalize

conflicts_tensor = tf.convert_to_tensor(conflicts, tf.float64)

model = tf.keras.Sequential([
    tf.keras.layers.Input(conflicts_tensor.shape[1:]),
    tf.keras.layers.Flatten(),
    tf.keras.layers.Dense(80, activation='relu'),
    tf.keras.layers.Dense(100, activation='relu'),
    tf.keras.layers.Dense(24*24, activation='relu'),
    tf.keras.layers.Reshape((24, 24)),
    tf.keras.layers.Softmax(axis=2)  # Applying softmax along the last axis
])

optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

for epoch in range(20):
    with tf.GradientTape() as tape:
        predicted_seating_arrangement = model(conflicts_tensor, training=True)
        loss = custom_loss(predicted_seating_arrangement, conflicts_tensor)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    print(f'Epoch: {epoch}, Loss: {loss.numpy()}')

In [None]:
# optimizer = tf.keras.optimizers.Adam(learning_rate=0.01)

for epoch in range(200):
    with tf.GradientTape() as tape:
        predicted_seating_arrangement = model(conflicts_tensor, training=True)
        loss = custom_loss(predicted_seating_arrangement, conflicts_tensor)
    gradients = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(gradients, model.trainable_variables))
    print(f'Epoch: {epoch}, Loss: {loss.numpy()}')

In [None]:
out = model(conflicts_tensor)

In [None]:
tf.reshape(tf.argmax(out, 2)[0], (6,4))

In [None]:
np.unique(tf.argmax(out, 2)[0]).size

In [None]:
xo = np.array([np.unique(i).size for i in tf.argmax(out, 2)])
xo

In [None]:
probs = tf.reduce_sum(out, axis=1) / tf.constant(10, dtype=tf.float32)
one_like = tf.ones_like(probs, tf.float32) / tf.constant(10, dtype=tf.float32)

In [None]:
one_like.shape

In [None]:
kl = tf.keras.losses.KLDivergence(reduction='sum')
kl(one_like, probs)

## Pytorch

In [None]:
import torch
from torch import nn
import torch.nn.functional as F
import torch.optim as op

In [None]:
class SeatingArr(nn.Module):
    def __init__(self) -> None:
        super(SeatingArr, self).__init__()
        self.flatten = nn.Flatten()
        self.dense_1 = nn.Linear(24*24, 80, dtype=torch.float64)
        self.dense_2 = nn.Linear(80, 100, dtype=torch.float64)
        self.dense_3 = nn.Linear(100, 24 * 24, dtype=torch.float64)


    def forward(self, x):
        x = self.flatten(x)
        x = F.relu(self.dense_1(x))
        x = F.relu(self.dense_2(x))
        x = F.relu(self.dense_3(x))
        x = x.view(-1,24,24)
        x = F.softmax(x, dim=2)
        return x


In [None]:
net = SeatingArr()
optim = op.Adam(params=net.parameters(), lr=0.01)

In [None]:
torch_conflicts = torch.tensor(conflicts, dtype=torch.float64)
loader = torch.utils.data.DataLoader(torch_conflicts, batch_size=64, pin_memory=True)
examples = enumerate(loader)
batch_idx, example_data = next(examples)

In [None]:
batch_idx, example_data.shape, loader.batch_size, loader.dataset.shape, len(loader.dataset), len(loader)

In [None]:
example_data.type()

In [None]:
adjacent_mask = create_adjacent_mask(n_seats=24, seats_per_row=6, seats_per_col=4)
adjacent_mask_torch = torch.tensor(adjacent_mask, dtype=torch.float64)

def calculate_conflict_torch(seating_arrangement, conflict_matrix):
    ca_mul = conflict_matrix * adjacent_mask_torch
    conflicts = torch.sum(torch.matmul(seating_arrangement.type(torch.float64), ca_mul))
    return conflicts

def custom_loss_torch(predicted_seating_arrangement, conflicts_tensor):
    alpha = 0.99
    beta = 1 - alpha
    kl  = nn.KLDivLoss(reduction="batchmean")
    batch_size = predicted_seating_arrangement.shape[0]
    # Ensure the predicted seating arrangement is in float64
    predicted_seating_arrangement = predicted_seating_arrangement.type(torch.float64)

    # Calculate Conflict in produced seating arrangement
    conflict = calculate_conflict_torch(predicted_seating_arrangement, conflicts_tensor)

    # Ensure each seat is assigned to only one person (columns should sum to 1) (Uniqueness)
    probs = torch.sum(predicted_seating_arrangement, dim=1) / torch.tensor(24, requires_grad=False, dtype=torch.float64)
    one_like = torch.ones_like(probs, dtype=torch.float64) / torch.tensor(24, requires_grad=False, dtype=torch.float64)
    # Calculate KLDivergence
    uniqueness = kl(torch.log(probs), one_like)
    # Total Loss
    total_loss =  alpha * uniqueness + beta * conflict

    return total_loss / batch_size # Normalize

In [None]:
# save model every 10 epochs
interval = 10

def Training(epoch):
    # train
    overal_loss = 0
    net.train()
    for batch_idx, conflict in enumerate(loader):
        optim.zero_grad()
        output = net(conflict)
        loss = custom_loss_torch(output, conflict)
        loss.backward()
        optim.step()
        overal_loss += loss.item()
    if epoch % interval == 0:
        print('Train Epoch: {} Loss: {:.6f}'.format(epoch, overal_loss))
        # save state of model and optimizer
        torch.save(net.state_dict(), '/content/model.pth')
        torch.save(optim.state_dict(), '/content/optimizer.pth')


In [None]:
n_epoch = 800
for i in range(n_epoch):
     Training(i+1)

In [None]:
with torch.no_grad():
    output = net(torch_conflicts)
    loss = custom_loss_torch(output, torch_conflicts)
    loss_val = loss.item()
    model_out_readable = torch.argmax(output, dim=2)

In [None]:
loss_val

In [None]:
model_out_readable[0]

In [None]:
model_out_readable[0].view(6,4)

In [None]:
xo = np.array([np.unique(i).size for i in model_out_readable])
xo