In [None]:
import matplotlib.pyplot as plt
import librosa
import pickle
import numpy as np
import pickle
import os

import tensorflow as tf
from keras.utils.vis_utils import plot_model
from keras import backend as K
import cv2


In [None]:
def trial_file_processor(file_addr, trial):
    
    start_points_list = list()
    start_offset = 3
    end_of_record_starts = 33
    window_length = 10
    for start_point in range(start_offset, end_of_record_starts, 5):
        start_points_list.append([start_point, 
                                start_point+window_length,
                                start_point+2*window_length])
    

    with open(file_addr, 'rb') as file:
        subject_file = pickle.load(file, encoding='latin1')    
    
    max_after_normalize = 1
    min_after_normalize = -1 

    color_channel_stft = None
    full_norm_stft = np.zeros(shape=(len(start_points_list), 129, 440, 3))
    
    start_point_number = 0
    for start_points in start_points_list:
        for color_channel in range(0, 3):
            for channel in range(0, 40):
                stft = librosa.stft(subject_file["data"][trial, channel,
                                              128*(start_points[color_channel]):128*(start_points[color_channel]+window_length)],
                                              n_fft=256, hop_length=125)
                stft_db = librosa.amplitude_to_db(abs(stft))
                norm_stft = (stft_db - stft_db.min())/(stft_db.max() - stft_db.min())
                norm_stft = norm_stft*(max_after_normalize - min_after_normalize) + min_after_normalize

                if color_channel_stft is None:
                    color_channel_stft = norm_stft
                else:
                    color_channel_stft = np.append(color_channel_stft, norm_stft, axis=1)
                    
            full_norm_stft[start_point_number, : , :, color_channel] = color_channel_stft
            color_channel_stft = None
            
        start_point_number = start_point_number + 1
        
    return full_norm_stft, np.array(subject_file["labels"][trial])


raw_files_addr = "E:/Hamavar/DEAP_Dataset/data_preprocessed_python/"
transformed_files_addr = "A:/Hamavar/DEAP_Dataset/augmented_transformed_data_resized/"
file_names = os.listdir(raw_files_addr)

dsize = (512, 128)

for file_name in file_names:
    subject_name = file_name.split(".")[0]
    for trial in range(0, 40):
        data, labels = trial_file_processor(raw_files_addr+file_name, trial) 
        for sample in range(0, data.shape[0]):
            data_label_dict = {"data":cv2.resize(data[sample], dsize), "labels":labels}
            
            with open(transformed_files_addr+subject_name+"t"+str(trial)+"s"+str(sample), 'wb') as handle:
                pickle.dump(data_label_dict, handle, protocol=pickle.HIGHEST_PROTOCOL)



In [None]:
transformed_files_addr = "A:/Hamavar/DEAP_Dataset/augmented_transformed_data_resized/"
with open(transformed_files_addr+"s01t1s4", 'rb') as handle:
    test = pickle.load(handle)

plt.imshow(test["data"])
print(test["data"].shape)

In [None]:
def Disc_Model(input_shape=(128, 512, 3), n_classes=10):
    input_label = tf.keras.layers.Input(shape=(1, ))
    embedded_label = tf.keras.layers.Embedding(n_classes, 50)(input_label)
    
    n_nodes = input_shape[0]*input_shape[1]
    
    embedded_label = tf.keras.layers.Dense(n_nodes)(embedded_label)
    embedded_label = tf.keras.layers.Reshape((input_shape[0], input_shape[1], 1))(embedded_label)
    
    input_img = tf.keras.layers.Input(shape=input_shape)
    
    merge = tf.keras.layers.Concatenate()([input_img, embedded_label])
    
    disc = tf.keras.layers.Conv2D(128, kernel_size=(3, 3), strides=(2, 2), padding="same")(merge)
    disc = tf.keras.layers.LeakyReLU(alpha=0.2)(disc)
    
    disc = tf.keras.layers.Conv2D(128, kernel_size=(3, 3), strides=(2, 2), padding="same")(disc)
    disc = tf.keras.layers.LeakyReLU(alpha=0.2)(disc)
    
    disc = tf.keras.layers.Flatten()(disc)
    disc = tf.keras.layers.Dropout(0.4)(disc)
    disc = tf.keras.layers.Dense(1, activation="sigmoid")(disc)
    
    model = tf.keras.models.Model([input_img, input_label], disc)
    
    opt = tf.keras.optimizers.Adam(lr=0.0002, beta_1=0.5)
    model.compile(loss='binary_crossentropy', optimizer=opt, metrics=['accuracy'])
    
    return model


def Generator_Model(latent_dim, n_classes=10):
    
    input_label = tf.keras.layers.Input(shape=(1, ))
    input_seed = tf.keras.layers.Embedding(n_classes, 50)(input_label)
    
    n_nodes = 8*32
    
    input_seed = tf.keras.layers.Dense(n_nodes)(input_seed)
    input_seed = tf.keras.layers.Reshape((8, 32, 1))(input_seed)
    
    input_latent = tf.keras.layers.Input(shape=(latent_dim,))
    
    n_nodes = 128*8*32
    
    gen = tf.keras.layers.Dense(n_nodes)(input_latent)
    gen = tf.keras.layers.LeakyReLU(alpha=0.2)(gen)
    gen = tf.keras.layers.Reshape((8, 32, 128))(gen)
    
    merge = tf.keras.layers.Concatenate()([gen, input_seed])
    
    gen = tf.keras.layers.Conv2DTranspose(128, kernel_size=(4, 4), strides=(2, 2), padding="same")(merge)
    gen = tf.keras.layers.LeakyReLU(alpha=0.2)(gen)
    
    gen = tf.keras.layers.Conv2DTranspose(128, kernel_size=(4, 4), strides=(2, 2), padding="same")(gen)
    gen = tf.keras.layers.LeakyReLU(alpha=0.2)(gen)
    
    gen = tf.keras.layers.Conv2DTranspose(128, kernel_size=(4, 4), strides=(2, 2), padding="same")(gen)
    gen = tf.keras.layers.LeakyReLU(alpha=0.2)(gen)
    
    gen = tf.keras.layers.Conv2DTranspose(128, kernel_size=(4, 4), strides=(2, 2), padding="same")(gen)
    gen = tf.keras.layers.LeakyReLU(alpha=0.2)(gen)
    
    output_layer = tf.keras.layers.Conv2D(3, kernel_size=(7, 7), activation='tanh', padding="same")(gen)
    
    model = tf.keras.models.Model([input_latent, input_label], output_layer)
    
    return model
  
    
def GAN_Model(gen_model, disc_model):
    disc_model.trainable = False
    
    gen_noise, gen_label = gen_model.input
    gen_output = gen_model.output
    
    gan_output = disc_model([gen_output, gen_label])
    
    GAN = tf.keras.models.Model([gen_noise, gen_label], gan_output)
    
    opt = tf.keras.optimizers.Adam(lr=0.0002, beta_1=0.5)
    GAN.compile(loss='binary_crossentropy', optimizer=opt)
    
    return GAN
    


In [None]:
transformed_files_addr = "A:/Hamavar/DEAP_Dataset/augmented_transformed_data_resized/"
dataset_addr = os.listdir(transformed_files_addr)


def generate_real_samples(dataset_addr, n_samples):
    X = list()
    labels = list()
    
    rand_idx = np.random.randint(0, len(dataset_addr), n_samples)
    for idx in rand_idx:
        with open(transformed_files_addr+dataset_addr[idx], 'rb') as handle:
            file_info = pickle.load(handle)
            X.append(file_info["data"])
            labels.append(int(file_info["labels"][0]))
            
    y = np.ones((n_samples, 1))
    
    return [np.array(X), np.array(labels)], y


def generate_latent_points(latent_dim, n_samples, n_classes=10):
    x_input = np.random.randn(latent_dim * n_samples)
    z_input = x_input.reshape(n_samples, latent_dim)
    labels = np.random.randint(0, n_classes, n_samples)
    
    return [z_input, labels]


def generate_fake_samples(generator, latent_dim, n_samples):
    z_input, labels_input = generate_latent_points(latent_dim, n_samples)
    images = generator.predict([z_input, labels_input])
    y = np.zeros((n_samples, 1))
    
    return [images, labels_input], y


def generate_images(generator, latent_dim, n_samples, epoch):
    z_input, labels_input = generate_latent_points(latent_dim, n_samples)
    images = generator.predict([z_input, labels_input])
    
    for img_idx in range(0, n_samples):
        plt.subplot(2,2, img_idx+1)
        plt.axis('off')
        plt.imshow((images[img_idx]+1)/2)
    
    filename1 = f"A:/Hamavar/DEAP_Dataset/cGAN/generated_img/plot_{epoch}.png"
    plt.savefig(filename1)
    plt.close()
    


In [None]:
def train(g_model, d_model, gan_model, dataset_addr, latent_dim, n_epochs=100, n_batch=32):
    bat_per_epo = int(len(dataset_addr) / n_batch)
    half_batch = int(n_batch / 2)

    for i in range(n_epochs):
        for j in range(2*bat_per_epo):
            
            [X_real, labels_real], y_real = generate_real_samples(dataset_addr, half_batch)
            d_loss1, _ = d_model.train_on_batch([X_real, labels_real], y_real)
            
            [X_fake, labels], y_fake = generate_fake_samples(g_model, latent_dim, half_batch)
            d_loss2, _ = d_model.train_on_batch([X_fake, labels], y_fake)
            
            [z_input, labels_input] = generate_latent_points(latent_dim, n_batch)
            y_gan = np.ones((n_batch, 1))
            g_loss = gan_model.train_on_batch([z_input, labels_input], y_gan)
        
        generate_images(g_model, latent_dim, 4, i+1)
        print(f"Epoch : {i+1}, D_Loss : {(d_loss1+d_loss2)/2:.2f}, G_Loss : {g_loss:.2f}")
    g_model.save('A:/Hamavar/DEAP_Dataset/cGAN/Model/cgan_generator.h5')

In [None]:
latent_dim = 100

d_model = Disc_Model()
g_model = Generator_Model(latent_dim)
gan_model = GAN_Model(g_model, d_model)

train(g_model, d_model, gan_model, dataset_addr, latent_dim)