In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np


print(torch.cuda.is_available())

print(torch.cuda.device_count())
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

import numpy as np
import random

def set_seed(seed):
    random.seed(seed)
    np.random.seed(seed)
    torch.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)  # For multi-GPU setups, if any
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False

# Example usage
set_seed(42)  # Replace 42 with your chosen seed

phase = 1
#https://ieeexplore.ieee.org/document/10245477

In [None]:
#!pip install torchviz

In [None]:
import pandas as pd
import torch
from sklearn.preprocessing import MinMaxScaler

def normalizaData(df):
    # Convert DataFrame to PyTorch Tensor and send to GPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    
    # Normalizing Angular Data (Robotic Joints)
    rad_cols = ['Tool_rx', 'Tool_ry', 'Tool_rz', 'Joint_base', 'Joint_Shoulder', 'Joint_Elbow', 'Joint_W1', 'Joint_W2', 'Joint_W3']
    for col in rad_cols:
        # Ensure the data is in the form of a PyTorch Tensor
        rad_tensor = torch.tensor(df[col].values, device=device, dtype=torch.float32)
        df[col + '_cos'] = torch.cos(rad_tensor).cpu().numpy()  # Compute cosine and convert back to NumPy for DataFrame
        df[col + '_sin'] = torch.sin(rad_tensor).cpu().numpy()  # Compute sine and convert back to NumPy for DataFrame
    
    # Normalizing Linear Data (Speed, Acceleration, Energy Consumed, and Time)
    # Min-Max Scaling: Scales the data to a fixed range, typically 0 to 1
    scaler = MinMaxScaler()
    linear_cols = ['Speed', 'Acceleration', 'Time', 'Energy_Consumped']
    df[linear_cols] = scaler.fit_transform(df[linear_cols])
    
    return df, scaler

In [None]:
def denormalize_data(arr, scaler, features):
    
    """
    Denormalizes the data from a normalized array using the provided scaler.
    
    Parameters:
    - arr (numpy.ndarray): The normalized array to be denormalized. Expected shape: (samples, features, timesteps).
    - scaler (MinMaxScaler or StandardScaler): The scaler used to normalize the data.
    - features (list): List of feature names for the DataFrame.
    
    Returns:
    - pd.DataFrame: A DataFrame with denormalized data.
    """
    
    # Reshape array into 2D (flatten the timesteps) and create DataFrame
    reshaped_array = arr.reshape(-1, arr.shape[2])
    df = pd.DataFrame(reshaped_array, columns=features)
    
    # Denormalizing Angular Data
    # Identify columns related to cosine and sine representations
    angle_columns = [col for col in df.columns if '_cos' in col or '_sin' in col]
    
    # Set to keep track of processed base columns to avoid duplicates
    processed_bases = set()
    
    for col in angle_columns:
        if '_cos' in col:
            base_col = col.replace('_cos', '')
            if base_col not in processed_bases:
                # Combine cosine and sine to compute the angle
                df[base_col] = np.arctan2(df[base_col + '_sin'], df[base_col + '_cos'])
                # Remove the original cosine and sine columns
                df.drop(columns=[base_col + '_sin', base_col + '_cos'], inplace=True)
                # Mark the base column as processed
                processed_bases.add(base_col)
    
    # Denormalizing Linear Data
    # List of columns to denormalize
    linear_columns = ['Speed', 'Acceleration', 'Time', 'Energy_Consumped']
    
    # Ensure all specified columns are in the DataFrame before attempting denormalization
    available_linear_columns = [col for col in linear_columns if col in df.columns]
    
    if available_linear_columns:
        df[available_linear_columns] = scaler.inverse_transform(df[available_linear_columns])
    return df

In [None]:
import pandas as pd
import torch

# Load data
real_data = pd.read_csv('phase RLHF dataset/remaining_paths.csv')


# Fix a typo in the 'point_type' column
real_data['point_type'] = real_data['point_type'].replace('approuch', 'approach')

# Map 'point_type' and 'Movement Type' to numeric values
points = {'pick': 1, 'approach': 2, 'start_point': 3, 'box1': 4, 'way_point': 5}
movement_types = {'MoveL': 1, 'MoveJ': 2, 'Movel': 1, 'Movej': 2}
real_data['point_type'] = real_data['point_type'].map(points)
real_data['Movement Type'] = real_data['Movement Type'].map(movement_types)

# Drop columns 'id' and 'row_index'
real_data.drop(['id'], axis=1, inplace=True)

# Assuming here that the use of PyTorch would be to handle numeric data for neural network processing
# Convert DataFrame to PyTorch Tensor and send to GPU if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
tensor_data = torch.tensor(real_data.values, dtype=torch.float, device=device)

# Print number of columns in the DataFrame
print(len(real_data.columns))

In [None]:
# Normalize the data
df_normalized, scaler = normalizaData(real_data.copy())

# Define the features to be used
features = [
    'point_type', 'Tool_x', 'Tool_y', 'Tool_z', 'Tool_rx_cos', 'Tool_rx_sin', 'Tool_ry_cos',
    'Tool_ry_sin', 'Tool_rz_cos', 'Tool_rz_sin', 'Joint_base_cos',
    'Joint_base_sin', 'Joint_Shoulder_cos', 'Joint_Shoulder_sin',
    'Joint_Elbow_cos', 'Joint_Elbow_sin', 'Joint_W1_cos', 'Joint_W1_sin',
    'Joint_W2_cos', 'Joint_W2_sin', 'Joint_W3_cos', 'Joint_W3_sin', 'Speed', 'Acceleration', 'Movement Type', 'Time', 'Energy_Consumped'
]

# Calculate the number of features
features_num = len(features)

# Select the specified features from the normalized DataFrame
df_normalized = df_normalized[features]

# Display the first few rows of the selected features
df_normalized.head()

In [None]:
import numpy as np
import pandas as pd

# Assuming df_normalized has been defined and features_num has been calculated as before
# Add a 'path_id' column to help in grouping. Creating a dummy 'path_id' that repeats every 5 rows
num_paths = len(df_normalized) // 5  # Use integer division
path_id = np.repeat(np.arange(num_paths), 5)
df_normalized['path_id'] = path_id

# Ensure the length of path_id matches the number of rows in df_normalized
if len(path_id) < len(df_normalized):
    extra_ids = np.array([num_paths] * (len(df_normalized) - len(path_id)))
    path_id = np.concatenate([path_id, extra_ids])

# Now, sort and group by 'path_id' and reshape
grouped = df_normalized.groupby('path_id').apply(lambda x: x.iloc[:, :-1].values.reshape(1, 5, features_num))

# Drop the 'path_id' column after reshaping
df_normalized.drop(['path_id'], axis=1, inplace=True)

# Concatenate the groups back into a single numpy array
reshaped_data = np.concatenate(grouped.values, axis=0)
print(reshaped_data.shape)
# Save the reshaped data to a file. Assume 'phase' is defined elsewhere
np.save(f"TGANreal_normalized_dataset_shape_3D_ph{phase}.npy", reshaped_data)

In [None]:
import torch
import torch.nn as nn
from torch.nn.utils.parametrizations import weight_norm

#The Chomp1d module in the context of Temporal Convolutional Networks (TCNs) is a custom PyTorch module designed to manage
#the issue of padding in convolutional layers that are used for time series data. 
#In convolutional operations, padding is often added to ensure that the convolution output has the same length as the input, 
#which is particularly important for building deep networks that do not reduce the temporal dimension until desired.

class Chomp1d(nn.Module):
    def __init__(self, chomp_size):
        super(Chomp1d, self).__init__()
        self.chomp_size = chomp_size

    def forward(self, x):
        return x[:, :, :-self.chomp_size].contiguous()

class TemporalBlock(nn.Module):
    def __init__(self, n_inputs, n_outputs, kernel_size, stride, dilation, padding, dropout=0.2):
        super(TemporalBlock, self).__init__()
        self.conv1 = weight_norm(nn.Conv1d(n_inputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp1 = Chomp1d(padding)
        self.relu1 = nn.ReLU()
        self.dropout1 = nn.Dropout(dropout)

        self.conv2 = weight_norm(nn.Conv1d(n_outputs, n_outputs, kernel_size,
                                           stride=stride, padding=padding, dilation=dilation))
        self.chomp2 = Chomp1d(padding)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(dropout)

        self.net = nn.Sequential(self.conv1, self.chomp1, self.relu1, self.dropout1,
                                 self.conv2, self.chomp2, self.relu2, self.dropout2)
        self.downsample = nn.Conv1d(n_inputs, n_outputs, 1) if n_inputs != n_outputs else None
        self.relu = nn.ReLU()

        self.init_weights()

    def init_weights(self):
        self.conv1.weight.data.normal_(0, 0.01)
        self.conv2.weight.data.normal_(0, 0.01)
        if self.downsample is not None:
            self.downsample.weight.data.normal_(0, 0.01)

    def forward(self, x):
        out = self.net(x)
        res = x if self.downsample is None else self.downsample(x)
        return self.relu(out + res)

class TemporalConvNet(nn.Module):
    def __init__(self, num_inputs, num_channels, kernel_size=2, dropout=0.2):
        super(TemporalConvNet, self).__init__()
        layers = []
        num_levels = len(num_channels)
        for i in range(num_levels):
            dilation_size = 2 ** i
            in_channels = num_inputs if i == 0 else num_channels[i-1]
            out_channels = num_channels[i]
            layers += [TemporalBlock(in_channels, out_channels, kernel_size, stride=1, dilation=dilation_size,
                                     padding=(kernel_size-1) * dilation_size, dropout=dropout)]

        self.network = nn.Sequential(*layers)

    def forward(self, x):
        return self.network(x)

class Generator(nn.Module):
    def __init__(self):
        super(Generator, self).__init__()
        self.tcn = TemporalConvNet(num_inputs=27, num_channels=[50, 50, 27], kernel_size=2, dropout=0.3)
        self.linear = nn.Linear(27*5, 27*5)

    def forward(self, x):
        x = x.transpose(1, 2)  # Convert to (batch, channels, length)
        x = self.tcn(x)
        x = x.transpose(1, 2)  # Convert back to (batch, length, channels)
        x = x.reshape(x.size(0), -1)
        x = self.linear(x)
        return x.reshape(x.size(0), 5, 27)

class Discriminator(nn.Module):
    def __init__(self):
        super(Discriminator, self).__init__()
        self.cnn = nn.Sequential(
            nn.Conv1d(27, 50, 3, 1, 1),
            nn.LeakyReLU(0.2),
            nn.Conv1d(50, 50, 3, 1, 1),
            nn.LeakyReLU(0.2),
            nn.Conv1d(50, 1, 3, 1, 1),
            nn.AdaptiveAvgPool1d(1),
            nn.Flatten(),
            nn.Sigmoid()
        )

    def forward(self, x):
        x = x.transpose(1, 2)  # Convert to (batch, channels, length)
        x = self.cnn(x)
        return x

# Check for GPU availability
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

# Instantiate the models and move them to the device
generator = Generator().to(device)
discriminator = Discriminator().to(device)
generator

In [None]:
import torch

def generate_noise(batch_size, noise_dim, seed=None):

    return  torch.randn(batch_size, 5, 27, device=device) 


In [None]:
def physics_loss(fake_data):
    """
    Calculate the physics-based loss for generated data.

    Parameters:
    - fake_data (torch.Tensor): Generated data with shape (batch_size, seq_len, num_features).

    Returns:
    - torch.Tensor: Total physics loss.
    """
    fake_data_numpy = fake_data.detach().cpu().numpy()

    df = denormalize_data(fake_data_numpy , scaler, features)
    '''Index(['point_type', 'Tool_x', 'Tool_y', 'Tool_z', 'Speed', 'Acceleration',
       'Movement Type', 'Time', 'Energy_Consumped', 'Tool_rx', 'Tool_ry',
       'Tool_rz', 'Joint_base', 'Joint_Shoulder', 'Joint_Elbow', 'Joint_W1',
       'Joint_W2', 'Joint_W3'],
      dtype='object') '''
    #Penalty for negative values in specific columns

    #Movement type 1-> moveL , 2-> moveJ 
    #Tool x,y,z # Move l ,speed 0-3000 mm/s -> 0-3 m/s
    #Tool x,y,z # Move l ,acceleration 0-150000 mm/s2 - 0-150 m/s 
    #Denormalize speed , 13 index in real data , 22 speed in normalized data
    #[-25.2721, -25.7903, -25.5148, -25.1693, -24.9287],

    # Speed constraints for MoveL
    # i assume movment type = 1 if less than 0.5 
    filtered_speed = len(df[(df['Speed'] >= 0) & (df['Speed'] <= 3) & (df['Movement Type'] <= 1)])
    filtered_Acceleration = len(df[(df['Acceleration'] >= 0) & (df['Acceleration'] <= 150) & (df['Movement Type'] <= 1)])
    speed_loss_l = filtered_speed
    acc_loss_l = filtered_Acceleration
  

    # Speed constraints for MoveJ
        #rx,ry,rz,joints 
        #Move J speed 0-180 degree per second -> radian 3.14159
        #Move J acceleration 0 - 2292  degree/second ^2 -> radian 40
    filtered_speed_mj = len(df[(df['Speed'] >= 0) & (df['Speed'] <=  3.14159) & (df['Movement Type'] >=1)])
    filtered_Acceleration_mj = len(df[(df['Acceleration'] >= 0) & (df['Acceleration'] <= 40) & (df['Movement Type'] >= 1)])
    speed_loss_j = filtered_speed_mj
    acc_loss_j = filtered_Acceleration_mj
  

    # Joint angle constraints
       #validate_joint_angles
        #-363 to 363 degree/s ---->    -6.33555 , 6.33555 rad
        #Joint limits as defined in the screenshot
    joint_columns = ['Joint_base', 'Joint_Shoulder', 'Joint_Elbow', 'Joint_W1', 'Joint_W2', 'Joint_W3']
    # Filter the DataFrame
    filtered_df_joints = df[(df[joint_columns] < -6.33555) | (df[joint_columns] > 6.33555)].dropna(how='all', subset=joint_columns)
    joints_loss = len(filtered_df_joints)
   
    #time, energy negative 
    filtered_df = df[(df['Time'] < 0) | (df['Energy_Consumped'] < 0)]
    time_energy_loss = len(filtered_df)
    total_loss = speed_loss_l + acc_loss_l + speed_loss_j + acc_loss_j + joints_loss + time_energy_loss
    return total_loss


In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import numpy as np

# Assuming 'data' is your dataset available as a NumPy array
# data shape should be (num_samples, time_steps, features) = (any, 5, 27)
   
# Training loop
num_epochs = 5000  # You can adjust the number of epochs
noise_dim = 1000

# Convert data to PyTorch tensors
def train(param_grid):
    tensor_data = torch.tensor(reshaped_data, dtype=torch.float32)
    dataset = TensorDataset(tensor_data)
    data_loader = DataLoader(dataset, batch_size=33, shuffle=True)
    
    # Loss function
    criterion = nn.BCELoss()
    
    # Optimizers
    optimizer_G = optim.Adam(generator.parameters(), lr=0.0002)
    optimizer_D = optim.Adam(discriminator.parameters(), lr=0.0002)
 
    losses = [] 
    
    for epoch in range(num_epochs):
        for real_data_tensor, in data_loader:
            real_data_tensor = real_data_tensor.to(device)
            
            # Train Discriminator
            # Generate fake data
            noise = generate_noise(len(real_data), noise_dim)
           # Random noise
            fake_data = generator(noise)
    
            # Get discriminator results for real and fake data
            D_real = discriminator(real_data_tensor).view(-1)
            D_fake = discriminator(fake_data.detach()).view(-1)
    
            # Calculate loss on real and fake data
            loss_D_real = criterion(D_real, torch.ones_like(D_real))
            loss_D_fake = criterion(D_fake, torch.zeros_like(D_fake))
            loss_D = (loss_D_real + loss_D_fake) / 2
    
            # Update discriminator
            optimizer_D.zero_grad()
            loss_D.backward()
            optimizer_D.step()
    
            # Train Generator
            # Get discriminator results for fake data
            output = discriminator(fake_data).view(-1)
            
            # Calculate loss for generator 
            phy_factor = param_grid
            loss_G = criterion(output, torch.ones_like(output)) + phy_factor * physics_loss(fake_data)
    
            # Update generator
            optimizer_G.zero_grad()
            loss_G.backward()
            optimizer_G.step()
    
        # Optionally print the training progress
        losses.append({"epoch": epoch, "D Loss": loss_D.item(), "G Loss": loss_G.item()})
        if (epoch+1) % 50 == 0:
            print(f"Epoch [{epoch+1}/{num_epochs}] Loss discriminator: {loss_D.item():.4f}, Loss generator: {loss_G.item():.4f}")
    d_losses = [loss['D Loss'] for loss in losses]  # Assuming this contains total D loss
    g_losses = [loss['G Loss'] for loss in losses]  # Assuming this contains G loss
    return generator, discriminator, np.mean(g_losses), losses




In [None]:

# Grid search
best_loss = float('inf')
best_params = {}
phy_factor_param = [0.2] #[0.2, 0.5, 1, 2 ] 
for value in phy_factor_param:
    print(value)
    generator, discriminator, gloss, losses = train(value)
    if gloss < best_loss:
        best_loss = gloss
        best_params = {"phy_factor_param":value, "generator_model": generator, "discriminator_model": discriminator, "gloss": gloss, "losses":losses}

In [None]:
best_params['phy_factor_param'], best_params['gloss'] #mean for g loss
best_params

In [None]:
# Save models if needed
generator = best_params['generator_model']
discriminator = best_params['discriminator_model']
losses = best_params['losses']
torch.save(generator.state_dict(), 'generatorTGAN.pth')
torch.save(discriminator.state_dict(), 'discriminatorTGAN.pth')

In [None]:
from torchviz import make_dot 

# Generate fake data
noise = generate_noise(len(real_data), noise_dim)
# Random noise
fake_data = generator(noise)

dot = make_dot(generator(noise), params=dict(generator.named_parameters()))
dot.render('generator_graph_TGAN', format='png')  # Saves the graph as a PNG file



In [None]:
import matplotlib.pyplot as plt

# Assuming `losses` is a list of dictionaries containing 'epoch', 'D Loss', and 'G Loss'
# Extracting the data
epochs_ls = [i for i in range(1, num_epochs + 1)]  # Integer epochs
d_losses = [loss['D Loss'] for loss in losses]  # Assuming this contains total D loss
g_losses = [loss['G Loss'] for loss in losses]  # Assuming this contains G loss
print(np.mean(d_losses))
print(np.mean(g_losses))
# Plotting
plt.figure(figsize=(10, 5))
plt.plot(epochs_ls, d_losses, label='Discriminator Loss')
plt.plot(epochs_ls, g_losses, label='Generator Loss')
plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Losses Over Epochs')
plt.legend()
plt.savefig('Training_Losses_Over_EpochsTCN.pdf')
plt.show()


In [None]:
def generate_samples(generator, sample_count, noise_dim,seed=None):
    """
    Generate samples using the generator.

    Parameters:
    - generator (nn.Module): The generator model.
    - sample_count (int): Total number of samples to generate.
    - noise_dim (int): Dimension of the noise vector.
    - seed (int, optional): Random seed for reproducibility.

    Returns:
    - torch.Tensor: Generated samples tensor.
    """
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    batch_size = len(reshaped_data)
    all_samples = []

    # Calculate number of full batches
    num_full_batches = sample_count // batch_size

    for i in range(num_full_batches):
        # Generate noise for the batch
        noise = generate_noise(batch_size, noise_dim).to(device)
        
        with torch.no_grad():  # Disable gradient calculation for inference
            samples = generator(noise).cpu()  # Move samples to CPU for concatenation
        all_samples.append(samples)

    # Handle the remainder if sample_count isn't a perfect multiple of batch_size
    remainder = sample_count % batch_size
    if remainder > 0:
        noise = generate_noise(remainder, noise_dim, seed=seed).to(device)
        with torch.no_grad():  # Disable gradient calculation for inference
            samples = generator(noise).cpu()  # Move samples to CPU for concatenation

        all_samples.append(samples)

    # Concatenate all samples into a single tensor
    return torch.cat(all_samples, dim=0)

In [None]:
# Generate samples
noise_dim = 1000  # This should match the dimension used during training
sample_count = 1000

# Generate samples using a for loop
generated_samples = []
for i in range(sample_count):
    sample = generate_samples(generator, 1, noise_dim)
    generated_samples.append(sample)


In [None]:
# Concatenate the generated samples
concatenated_tensor = torch.cat(generated_samples, dim=0)

# Convert to NumPy array (if you need to save it as a .npy file)
concatenated_array = concatenated_tensor.cpu().numpy()
4# Save to file
np.save("syntheticData_100Samplewgan.npy", concatenated_array)
concatenated_array.shape

In [None]:
import torch
import matplotlib.pyplot as plt
import seaborn as sns


# Plotting
plt.figure(figsize=(10, 6))
# Selecting the last column from each series
energy_col = concatenated_array[:, :, -1]
energy_sums = np.sum(energy_col, axis=1)
time_col = concatenated_array[:, :, -2]

# Summing across the rows (time steps)
time_sums = np.sum(time_col, axis=1)

plt.scatter(time_sums, energy_sums, alpha=0.6)
plt.title('Synthetic Paths - Distribution of Cycle Time and Energy')
plt.xlabel('Time (seconds)')
plt.ylabel('Energy')
plt.savefig('Synthetic Data - Distribution of Cycle Time and Energywgan.pdf')
plt.show()


In [None]:

# Extract normalized time and energy into separate variables
denormalize_generated_data = denormalize_data(concatenated_array, scaler, features)
print()
denormalize_generated_data.to_csv("denormalized_sythData_phase1_waed_wgan.csv")

time_denorm = denormalize_generated_data['Time']  # Assuming time is the second last column
energy_denorm = denormalize_generated_data['Energy_Consumped']  # Assuming energy is the last column

# Ensure the series length is a multiple of 5
n = 5
length = len(time_denorm)
trimmed_length = length - (length % n)
# Trim the series if necessary
trimmed_series = time_denorm[:trimmed_length]
# Reshape to groups of 5
reshaped_array = trimmed_series.to_numpy().reshape(-1, n)
# Sum each group of 5
time_denorm_sums = reshaped_array.sum(axis=1)
trimmed_series = energy_denorm[:trimmed_length]
# Reshape to groups of 5
reshaped_array = energy_denorm.to_numpy().reshape(-1, n)
# Sum each group of 5
energy_denorm_sums = reshaped_array.sum(axis=1)

# Plotting
plt.scatter(time_denorm_sums, energy_denorm_sums, color='blue')

# Set title and labels
plt.title('Synthetic Paths - Distribution of Cycle Time and Energy')
plt.xlabel('Time (seconds)')
plt.ylabel('Energy')

# Save and show the plot
plt.savefig('denormalize-Synthetic Datawgan.pdf')
plt.show()



In [None]:
denormalize_generated_data.to_csv("denormalize_generated_data_tgan_adam.csv")

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt 
sns.set_style('whitegrid')

last_col = real_data.iloc[:, 17]
# Group by each set of five rows and sum
# np.arange(len(last_col)) // 5 creates an integer index for each group of five
energy_real = last_col.groupby(np.arange(len(last_col)) // 5).sum()
last_col = real_data.iloc[:, 16]
time_real = last_col.groupby(np.arange(len(last_col)) // 5).sum()

plt.scatter(time_denorm_sums, energy_denorm_sums, color='blue',label='Synthetic Psths')
plt.scatter(time_real, energy_real, color='green',label='Real Paths')

plt.title('Synthetic & Real Paths- Distribution of Cycle time and Energy')
plt.xlabel('Time (seconds)')
plt.ylabel('Energy')
plt.legend()
plt.savefig('Synthetic & Real Paths- Distribution of Cycle time and Energy tgan.pdf')

plt.show()

In [None]:
col_16 = time_denorm_sums.flatten()
plt.figure(figsize=(10, 6))
sns.histplot(col_16, kde=True, bins=10 , label = "Time")
plt.title('Synthetic Data - Distribution of Cycle time')
plt.xlabel('Time (seconds)')
plt.ylabel('Frequency')
plt.legend()
plt.savefig("denormalized_data- Sythetic Data - Distribution of Cycle time tgan.pdf")
plt.show()

In [None]:

import numpy as np
import matplotlib.pyplot as plt

# Generate random data points for Time and Energy
np.random.seed(0)  # For reproducibility
time = time_denorm_sums
energy = energy_denorm_sums

# Function to identify Pareto optimal points
def identify_pareto(scores):
    # Initialize a boolean array to identify Pareto points
    is_pareto = np.ones(scores.shape[0], dtype=bool)
    for i in range(scores.shape[0]):
        for j in range(scores.shape[0]):
            if all(scores[j] <= scores[i]) and any(scores[j] < scores[i]):
                is_pareto[i] = False
                break
    return is_pareto

# Identify Pareto front
pareto_points = identify_pareto(np.c_[time, energy])
pareto_time = time[pareto_points]
pareto_energy = energy[pareto_points]

# Ensure there are Pareto points before proceeding

if pareto_time.size > 0 and pareto_energy.size > 0:
    # Select the best point based on the shortest Euclidean distance from the origin
    distances = np.sqrt(pareto_time**2 + pareto_energy**2)
    best_index = np.argmin(distances)
    best_time = pareto_time[best_index]
    best_energy = pareto_energy[best_index]

    # Plotting
    plt.figure(figsize=(10, 6))
    plt.scatter(time, energy, color='gray', label='Sythetic Paths')
    plt.scatter(time_real, energy_real, color='green', label='Real Paths')

    plt.scatter(pareto_time, pareto_energy, color='blue', label='Pareto Front')
    plt.scatter(best_time, best_energy, color='red', label='Best Point')
    plt.plot([0, best_time], [best_energy, best_energy], 'k--', lw=1)  # Horizontal line to Best Point
    plt.plot([best_time, best_time], [0, best_energy], 'k--', lw=1)  # Vertical line to Best Point
    plt.text(best_time, best_energy, ' Best Point', verticalalignment='bottom')
    plt.xlabel('Time')
    plt.ylabel('Energy')
    plt.title('Pareto Front with Best Selected Point')
    plt.legend()
    plt.grid(True)
    plt.savefig("pareto_front_bsp_tgan_2.pdf")
    #plt.show()

else:
    print("No Pareto optimal points were identified.")
print(best_time,best_energy)


In [None]:
import math


# Improvement calculation
def improvement_ratio(real, generated):
    time_improvement = (real[0] - generated[0]) / real[0] if real[0] != 0 else 0
    energy_improvement = (real[1] - generated[1]) / real[1] if real[1] != 0 else 0
    return time_improvement, energy_improvement

# Example usage:
real_point = (3.18, 58.5091)
generated_point = (best_time, best_energy)


# Get improvements
time_improvement, energy_improvement = improvement_ratio(real_point, generated_point)



print(f"Time Improvement Ratio: {time_improvement:.2%}")
print(f"Energy Improvement Ratio: {energy_improvement:.2%}")



# Phase 2  , Take the fav/accurate points from the engineer then re-fine-tune the generator 

In [None]:
# Load the trained model weights
generator.load_state_dict(torch.load('generatorTGAN.pth'))
discriminator.load_state_dict(torch.load('discriminatorTGAN.pth'))

In [None]:

def collect_human_feedback(samples):
    # Ensure samples are on GPU
    samples = samples.to(device)
    alpha = 0.5
    feedback =[]
    for sample in samples:
        #lower values are better
        rating = (sample[:,-2].sum() * alpha + sample[:,-1].sum() *(1-alpha))
        feedback.append(float(rating))
    
    return torch.tensor(feedback, dtype=torch.float).to(device)
#feedback -> i want to be small value , mse 

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim

# Assuming `generator_ph1` is your policy network and already defined
policy_network = generator

# Define the value network in PyTorch
class ValueNetwork(nn.Module):
    def __init__(self, input_features):
        super(ValueNetwork, self).__init__()
        self.flatten = nn.Flatten()
        self.dense1 = nn.Linear(input_features, 128)
        self.relu = nn.ReLU()
        self.dense2 = nn.Linear(128, 1)
    
    def forward(self, x):
        x = self.flatten(x)
        x = self.dense1(x)
        x = self.relu(x)
        x = self.dense2(x)
        return x

# Assume 'features' is defined elsewhere and provides the number of input features
input_dim = 5 * len(features)  # Update with the actual size of 'features'
value_network = ValueNetwork(input_dim).to(device)

optimizer_value = optim.Adam(value_network.parameters(), lr=0.0001)


In [None]:
import torch
from torch.utils.data import TensorDataset, DataLoader
import torch.optim as optim
import numpy as np

# Assuming 'data' is your dataset available as a NumPy array
# data shape should be (num_samples, time_steps, features) = (any, 5, 27)

hfrl_paths = pd.read_csv("phase RLHF dataset/least_energy_time_paths.csv")

# Fix a typo in the 'point_type' column
real_data['point_type'] = hfrl_paths['point_type'].replace('approuch', 'approach')

# Map 'point_type' and 'Movement Type' to numeric values
points = {'pick': 1, 'approach': 2, 'start_point': 3, 'box1': 4, 'way_point': 5}
movement_types = {'MoveL': 1, 'MoveJ': 2, 'Movel': 1, 'Movej': 2}
hfrl_paths['point_type'] = hfrl_paths['point_type'].map(points)
hfrl_paths['Movement Type'] = hfrl_paths['Movement Type'].map(movement_types)

# Drop columns 'id' and 'row_index'
hfrl_paths.drop(['id'], axis=1, inplace=True)


hfrl_paths_normalized, scaler_ph2 = normalizaData(hfrl_paths)

#exclude the  Joint_W3 and keep sin , cos
hfrl_paths_normalized= hfrl_paths_normalized[features]
num_paths = len(hfrl_paths) // 5
batch_size = num_paths #all data 

path_id = np.repeat(np.arange(num_paths), 5)
hfrl_paths_normalized['path_id'] = path_id
grouped = hfrl_paths_normalized.groupby('path_id').apply(lambda x: x.values[:, :-1].reshape(1, 5,features_num))
hfrl_paths_normalized.drop(['path_id'], axis=1, inplace=True)

# Concatenate the groups back into a single numpy array
hfrl_paths_normalized_3d = np.concatenate(grouped.values, axis=0)



# Convert data to PyTorch tensors
tensor_data = torch.tensor(hfrl_paths_normalized_3d, dtype=torch.float32)
dataset = TensorDataset(tensor_data)
data_loader = DataLoader(dataset, batch_size=33, shuffle=True)

# Loss function
criterion = nn.BCELoss()

# Optimizers
optimizer_G = optim.Adam(generator.parameters(), lr=0.0002)
optimizer_D = optim.Adam(discriminator.parameters(), lr=0.0002)

# Training loop
num_epochs = 5000  # You can adjust the number of epochs
noise_dim = 100
losses_ph = [] 

for epoch in range(num_epochs):
    for real_data_tensor, in data_loader:
        real_data_tensor = real_data_tensor.to(device)
        
        # Train Discriminator
        # Generate fake data
        noise = generate_noise(len(real_data_tensor), noise_dim)
       # Random noise
        fake_data = generator(noise)

        # Get discriminator results for real and fake data
        D_real = discriminator(real_data_tensor).view(-1)
        D_fake = discriminator(fake_data.detach()).view(-1)

        # Calculate loss on real and fake data
        loss_D_real = criterion(D_real, torch.ones_like(D_real))
        loss_D_fake = criterion(D_fake, torch.zeros_like(D_fake))
        loss_D = (loss_D_real + loss_D_fake) / 2

        # Update discriminator
        optimizer_D.zero_grad()
        loss_D.backward()
        optimizer_D.step()

         
        # Update Value Network
        feedback = collect_human_feedback(fake_data)
        optimizer_value.zero_grad()
        mse_loss = nn.MSELoss()
        if isinstance(fake_data, list):
            fake_data = torch.tensor(fake_data, dtype=torch.float).to(device)  # Convert list to tensor
        predicted_feedback = value_network(fake_data)
        feedback = feedback.view(predicted_feedback.shape)
        value_loss = mse_loss(feedback, predicted_feedback)
        value_loss.backward(retain_graph=True)
        optimizer_value.step()
        

        # Train Generator
        # Get discriminator results for fake data
        output = discriminator(fake_data).view(-1)
        
        # Calculate loss for generator 
        phy_factor =best_params['phy_factor_param']
        val_factor = 1
        
        loss_G = criterion(output, torch.ones_like(output)) - val_factor * value_loss.detach()  + phy_factor * physics_loss(fake_data)

        # Update generator
        optimizer_G.zero_grad()
        loss_G.backward()
        optimizer_G.step()

    # Optionally print the training progress
    losses_ph.append({"epoch": epoch, "D Loss": loss_D.item(), "G Loss": loss_G.item() , "Value Loss": value_loss.item()})
    if (epoch+1) % 10 == 0:
        print(f"Epoch [{epoch+1}/{num_epochs}] Loss discriminator: {loss_D.item():.4f}, Loss generator: {loss_G.item():.4f}")



In [None]:
# Extracting the data

#Epoch: 41, Batch: 34, D Loss: 0.7058187127113342, G Loss: 0.6761125326156616, Value Loss: 9.605087280273438

epochs_ls = [loss['epoch'] for loss in losses_ph]
d_losses = [loss['D Loss'] for loss in losses_ph]
print(sum(d_losses)/5000,"d_losses")

g_losses = [loss['G Loss'] for loss in losses_ph]
print(sum(g_losses)/5000,"g_losses")
value_losses =  [loss['Value Loss'] for loss in losses_ph]
print(np.mean(value_losses), "value_losses")
d_losses_real = []
d_losses_fake = []



# Plotting
plt.figure(figsize=(10, 5))
#d_loss_real
plt.plot(epochs_ls, d_losses, label='Discriminator Loss')

plt.plot(epochs_ls, g_losses, label='Generator Loss')
plt.plot(epochs_ls, value_losses, label='Reward Loss')

plt.xlabel('Epoch')
plt.ylabel('Loss')
plt.title('Training Losses Over Epochs')
plt.legend()
plt.savefig('Training Losses Over Epochs tcn.pdf')
plt.show()

In [None]:
# Generate samples
noise_dim = 1000  # This should match the dimension used during training
sample_count = 50000

# Generate samples using a for loop
samples_syth = []
for i in range(sample_count):
    sample = generate_samples(generator, 1, noise_dim)
    samples_syth.append(sample)
    
samples_syth = torch.cat(samples_syth, dim=0)

# Convert to NumPy array (if you need to save it as a .npy file)
samples_syth = samples_syth.cpu().numpy()

In [None]:
samples_syth_ph2 = denormalize_data(samples_syth, scaler_ph2,features)
samples_syth_ph2.head(2)

In [None]:

# Extract normalized time and energy into separate variables
denormalize_generated_data = denormalize_data(concatenated_array, scaler, features)
denormalize_generated_data.to_csv("denormalize_generated_data_ph2.csv")
time_denorm = samples_syth_ph2['Time']  # Assuming time is the second last column
energy_denorm = samples_syth_ph2['Energy_Consumped']  # Assuming energy is the last column

# Ensure the series length is a multiple of 5
n = 5
length = len(time_denorm)
trimmed_length = length - (length % n)
# Trim the series if necessary
trimmed_series = time_denorm[:trimmed_length]
# Reshape to groups of 5
reshaped_array = trimmed_series.to_numpy().reshape(-1, n)
# Sum each group of 5
time_denorm_sums = reshaped_array.sum(axis=1)
trimmed_series = energy_denorm[:trimmed_length]
# Reshape to groups of 5
reshaped_array = energy_denorm.to_numpy().reshape(-1, n)
# Sum each group of 5
energy_denorm_sums = reshaped_array.sum(axis=1)

# Plotting
plt.scatter(time_denorm_sums, energy_denorm_sums, color='blue')

# Set title and labels
plt.title('Synthetic Paths - Distribution of Cycle Time and Energy')
plt.xlabel('Time (seconds)')
plt.ylabel('Energy')

# Save and show the plot
plt.savefig('denormalize-Synthetic Data.pdf')
plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Sample data waed
time = time_denorm_sums
energy = energy_denorm_sums

# Function to determine the Pareto front for minimizing both dimensions
def pareto_frontier_minimize_both(Xs, Ys):
    sorted_points = sorted([[Xs[i], Ys[i]] for i in range(len(Xs))], key=lambda point: (point[0], point[1]))
    pareto_front = [sorted_points[0]]
    for current in sorted_points[1:]:
        if current[1] < pareto_front[-1][1]:
            pareto_front.append(current)
    return [pair[0] for pair in pareto_front], [pair[1] for pair in pareto_front]

# Compute Pareto front
pareto_X, pareto_Y = pareto_frontier_minimize_both(time, energy)

# Plotting the points and Pareto front
plt.scatter(time, energy, color='blue', label='Sythetic Paths')  
plt.scatter(time_real, energy_real, color='green', label='Real Paths')
plt.plot(pareto_X, pareto_Y, color='red', linewidth=2.0, label='Pareto Front')  # Pareto front

plt.title('Synthetic & Real Paths - Distribution of Cycle Time and Energy')
plt.xlabel('Time (seconds)')
plt.ylabel('Energy')
plt.legend()
plt.savefig('Synthetic & Real Paths- Pareto Visulization tgan.pdf')
plt.show()

# Identifying the a-bsolute optimal point (minimum energy and time)
min_energy = min(pareto_Y)
optimal_points = [(x, y) for x, y in zip(pareto_X, pareto_Y) if y == min_energy]
print("Optimal Points (Min Energy and then Min  Time):", optimal_points)
print(pareto_X, pareto_Y)

In [None]:
time = time_denorm_sums
energy =energy_denorm_sums



# Function to identify Pareto optimal points
def identify_pareto(scores):
    # Initialize a boolean array to identify Pareto points
    is_pareto = np.ones(scores.shape[0], dtype=bool)
    for i in range(scores.shape[0]):
        for j in range(scores.shape[0]):
            if all(scores[j] <= scores[i]) and any(scores[j] < scores[i]):
                is_pareto[i] = False
                break
    return is_pareto

# Identify Pareto front
pareto_points = identify_pareto(np.c_[time, energy])
pareto_time = time[pareto_points]
pareto_energy = energy[pareto_points]
print(pareto_time, pareto_energy)
# Ensure there are Pareto points before proceeding

if pareto_time.size > 0 and pareto_energy.size > 0:
    # Select the best point based on the shortest Euclidean distance from the origin
    distances = np.sqrt(pareto_time**2 + pareto_energy**2)
    best_index = np.argmin(distances)
    best_time = pareto_time[best_index]
    best_energy = pareto_energy[best_index]

    # Plotting
    plt.figure(figsize=(10, 6))
    plt.scatter(time, energy, color='gray', label='Sythetic Paths')
    plt.scatter(time_real, energy_real, color='green', label='Real Paths')

    plt.scatter(pareto_time, pareto_energy, color='blue', label='Pareto Front')
    plt.scatter(best_time, best_energy, color='red', label='Best Point')
    plt.plot([0, best_time], [best_energy, best_energy], 'k--', lw=1)  # Horizontal line to Best Point
    plt.plot([best_time, best_time], [0, best_energy], 'k--', lw=1)  # Vertical line to Best Point
    plt.text(best_time, best_energy, ' Best Point', verticalalignment='bottom')
    plt.xlabel('Time')
    plt.ylabel('Energy')
    plt.title('Pareto Front with Best Selected Point')
    plt.legend()
    plt.grid(True)
    plt.savefig('Synthetic & Real Paths- Pareto Visulization tgan best point.pdf')

else:
    print("No Pareto optimal points were identified.")
print(best_time,best_energy)

In [None]:
import torch
import torch.nn as nn

class SimpleCNN(nn.Module):
    def __init__(self):
        super(SimpleCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.pool = nn.MaxPool2d(2, 2)
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        x = self.pool(torch.relu(self.conv1(x)))
        x = self.pool(torch.relu(self.conv2(x)))
        x = torch.flatten(x, 1)  # flatten all dimensions except batch
        x = torch.relu(self.fc1(x))
        x = self.fc2(x)
        return x
from torchinfo import summary

model = SimpleCNN()
summary(model, input_size=(1, 1, 28, 28))


In [None]:
!pip install torchinfo
!pip install torchsummary



In [None]:
import math


# Improvement calculation
def improvement_ratio(real, generated):
    time_improvement = (real[0] - generated[0]) / real[0] if real[0] != 0 else 0
    energy_improvement = (real[1] - generated[1]) / real[1] if real[1] != 0 else 0
    return time_improvement, energy_improvement

# Example usage:
real_point = (3.18, 58.5091)
generated_point = (best_time, best_energy)


# Get improvements
time_improvement, energy_improvement = improvement_ratio(real_point, generated_point)



print(f"Time Improvement Ratio: {time_improvement:.2%}")
print(f"Energy Improvement Ratio: {energy_improvement:.2%}")

In [None]:
generated_point