<a href="https://colab.research.google.com/github/mariam7084/Analog-Clock/blob/main/Copy_of_cGAN_implementation_in_kaggle.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

this is the notebook through which the fake data was generated, the run for this is in kaggle with the notebook of same name in version 2. refer that for any results. otherwise the code here is final and complete

In [None]:
import pandas as pd
import numpy as np
import tensorflow as tf
import random
import matplotlib.pyplot as plt
import seaborn as sns

In [None]:
# setting seed for reproducibility

def set_seed(seed_value=42):
    # Set the seed for Python's built-in random module
    random.seed(seed_value)

    # Set the seed for NumPy
    np.random.seed(seed_value)

    # Set the seed for TensorFlow
    tf.random.set_seed(seed_value)

set_seed(42)


In [None]:
df = pd.read_csv("/kaggle/input/iot-air-pollution-data/pollutionData204273.csv")

In [None]:
df

In [None]:
# Function to calculate AQI based on WHO standards
def calculate_aqi(concentration, breakpoints, aqi_values):
    for i in range(len(breakpoints) - 1):
        if breakpoints[i] <= concentration <= breakpoints[i + 1]:
            aqi = ((aqi_values[i + 1] - aqi_values[i]) / (breakpoints[i + 1] - breakpoints[i])) * (concentration - breakpoints[i]) + aqi_values[i]
            return aqi
    return 0  # Return 0 if concentration is out of range

# Function to calculate overall AQI for a row
def calculate_overall_aqi(row):
    # Placeholder WHO standards for illustration
    ozone_breakpoints = [0, 50, 100, 150, 200, 300, 400, 500]
    ozone_aqi_values = [0, 50, 100, 150, 200, 300, 400, 500]

    pm_breakpoints = [0, 12, 35.4, 55.4, 150.4, 250.4, 350.4, 500.4]
    pm_aqi_values = [0, 50, 100, 150, 200, 300, 400, 500]

    co_breakpoints = [0, 4.4, 9.4, 12.4, 15.4, 30.4, 40.4, 50.4]
    co_aqi_values = [0, 50, 100, 150, 200, 300, 400, 500]

    so2_breakpoints = [0, 35, 75, 185, 304, 604, 804, 1004]
    so2_aqi_values = [0, 50, 100, 150, 200, 300, 400, 500]

    no2_breakpoints = [0, 53, 100, 360, 649, 1249, 1649, 2049]
    no2_aqi_values = [0, 50, 100, 150, 200, 300, 400, 500]

    # Calculate overall AQI as the maximum of individual pollutant AQI values
    overall_aqi = max(
        calculate_aqi(row['ozone'], ozone_breakpoints, ozone_aqi_values),
        calculate_aqi(row['particullate_matter'], pm_breakpoints, pm_aqi_values),
        calculate_aqi(row['carbon_monoxide'], co_breakpoints, co_aqi_values),
        calculate_aqi(row['sulfure_dioxide'], so2_breakpoints, so2_aqi_values),
        calculate_aqi(row['nitrogen_dioxide'], no2_breakpoints, no2_aqi_values)
    )
    return overall_aqi

# Apply the calculate_overall_aqi function to each row to calculate the AQI
df['AQI'] = df.apply(calculate_overall_aqi, axis=1)

# Display the DataFrame with the calculated AQI
# print(df)

In [None]:
df

In [None]:
df.info()

In [None]:
from tensorflow import keras
from tensorflow.keras import layers
from sklearn.model_selection import train_test_split

#  conditional GAN

In [None]:
# Extract features and labels
features = df[['ozone', 'particullate_matter', 'carbon_monoxide', 'sulfure_dioxide', 'nitrogen_dioxide']]
labels = df['AQI']

In [None]:
# Normalize features
# features = (features - features.min()) / (features.max() - features.min())

# Split the dataset into training and test splits
features_train, features_test, labels_train, labels_test = train_test_split(features, labels, test_size=0.2, random_state=42)


In [None]:
# Set hyperparameters
latent_dim = 100  # Size of the random noise vector
num_features = len(features.columns)

# Build the generator model
generator = keras.Sequential([
    layers.Input(shape=(latent_dim + 1,)),  # Additional input for the label
    layers.Dense(128, activation='relu'),
    layers.Dense(256, activation='relu'),
    layers.Dense(512, activation='relu'),
    layers.Dense(num_features, activation='linear') # change activation to ReLu or sigmoid and then train again from linear
])

# Build the discriminator model
discriminator = keras.Sequential([
    layers.Input(shape=(num_features + 1,)),  # Additional input for the label
    layers.Dense(512, activation='relu'),
    layers.Dense(256, activation='relu'),
    layers.Dense(128, activation='relu'),
    layers.Dense(1, activation='sigmoid')  # Using sigmoid for binary classification
])

# Compile the discriminator model
discriminator.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
                      loss='binary_crossentropy', metrics=['accuracy'])

# Build and compile the CGAN model
z = layers.Input(shape=(latent_dim,))
label = layers.Input(shape=(1,), dtype='float32')
concatenated_gen = layers.concatenate([z, label])

fake_data = generator(concatenated_gen)
concatenated_disc = layers.concatenate([fake_data, label])

validity = discriminator(concatenated_disc)

cgan = keras.Model(inputs=[z, label], outputs=validity)

# Compile the CGAN model after compiling the generator and discriminator
cgan.compile(optimizer=keras.optimizers.Adam(learning_rate=0.0002, beta_1=0.5),
             loss='binary_crossentropy', metrics=['accuracy'])

# Training the CGAN
epochs = 1000
batch_size = 64

for epoch in range(epochs):
    # Generate random noise and random labels
    noise = np.random.normal(0, 1, size=(batch_size, latent_dim))
    random_labels = np.random.uniform(0, 1, size=(batch_size, 1))  # Generate random float labels

    # Generate fake samples with the generator
    generated_data = generator.predict(np.concatenate([noise, random_labels], axis=1))

    # Sample real data
    real_samples_indices = np.random.randint(0, features_train.shape[0], batch_size)
    real_samples = features_train.iloc[real_samples_indices].values
    real_labels = labels_train.iloc[real_samples_indices].values.reshape(-1, 1)

    # Train the discriminator
    d_loss_real = discriminator.train_on_batch(np.concatenate([real_samples, real_labels], axis=1), np.ones((batch_size, 1)))
    d_loss_fake = discriminator.train_on_batch(np.concatenate([generated_data, random_labels], axis=1), np.zeros((batch_size, 1)))
    d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

    # Train the generator (via the CGAN model)
    noise = np.random.normal(0, 1, size=(batch_size, latent_dim))
    random_labels = np.random.uniform(0, 1, size=(batch_size, 1))  # Generate random float labels
    labels_gan = np.ones((batch_size, 1))  # Labels for the generator are set to be real
    g_loss = cgan.train_on_batch([noise, random_labels], labels_gan)

    # Print progress
    if epoch % 100 == 0:
        print(f"Epoch {epoch}, D Loss: {d_loss[0]}, D Accuracy: {d_loss[1]}, G Loss: {g_loss[0]}")


In [None]:
#printing the final metrics after training

print('Final Training Results')
print('---'*20)
print(f"Discriminator Loss: {d_loss[0]}")
print(f"Discriminator Accuracy: {d_loss[1]}")
print(f"Generator Loss: {g_loss[0]}")


In [None]:
# Optionally, evaluate the generator on the test set
noise_test = np.random.normal(0, 1, size=(features_test.shape[0], latent_dim))
random_labels_test = np.random.uniform(0, 1, size=(features_test.shape[0], 1))
generated_data_test = generator.predict(np.concatenate([noise_test, random_labels_test], axis=1))


In [None]:
# checking and printing the results on the test set

# Evaluate the generator on the test set
real_samples_test = features_test.values
real_labels_test = labels_test.values.reshape(-1, 1)

# Calculate the performance of the discriminator on the test set
d_loss_real_test = discriminator.evaluate(np.concatenate([real_samples_test, real_labels_test], axis=1), np.ones((features_test.shape[0], 1)), verbose=0)
d_loss_fake_test = discriminator.evaluate(np.concatenate([generated_data_test, random_labels_test], axis=1), np.zeros((features_test.shape[0], 1)), verbose=0)

print(f"Test Set - Discriminator Real Loss: {d_loss_real_test[0]}, Discriminator Real Accuracy: {d_loss_real_test[1]}")
print(f"Test Set - Discriminator Fake Loss: {d_loss_fake_test[0]}, Discriminator Fake Accuracy: {d_loss_fake_test[1]}")

# Optionally calculate the generator's performance by evaluating how well it fools the discriminator
g_loss_test = cgan.evaluate([noise_test, random_labels_test], np.ones((features_test.shape[0], 1)), verbose=0)
print(f"Test Set - Generator Loss: {g_loss_test[0]}")


## generating the fake data


In [None]:
# Number of samples to generate
num_samples = 10000

# Generate random noise and random labels
noise_gen = np.random.normal(0, 1, size=(num_samples, latent_dim))
random_labels_gen = np.random.uniform(0, 1, size=(num_samples, 1))

# Generate fake data using the generator
generated_data_gen = generator.predict(np.concatenate([noise_gen, random_labels_gen], axis=1))

# Create a DataFrame to store the generated data
generated_df = pd.DataFrame(generated_data_gen, columns=features.columns)

# Add the labels to the DataFrame
generated_df['AQI'] = random_labels_gen

# Display the first few rows of the generated DataFrame
print(generated_df.head())


### generating unnormalized data

In [None]:
# Number of samples to generate
num_samples = 10000

# Generate random noise and random labels
noise_gen = np.random.normal(0, 1, size=(num_samples, latent_dim))
random_labels_gen = np.random.uniform(0, 1, size=(num_samples, 1))

# Generate fake data using the generator
generated_data_gen = generator.predict(np.concatenate([noise_gen, random_labels_gen], axis=1))

# Reverse the normalization process
# Assuming the original features were normalized as: (features - min) / (max - min)
features_min = df[['ozone', 'particullate_matter', 'carbon_monoxide', 'sulfure_dioxide', 'nitrogen_dioxide']].min()
features_max = df[['ozone', 'particullate_matter', 'carbon_monoxide', 'sulfure_dioxide', 'nitrogen_dioxide']].max()

# Unnormalize the generated data
generated_data_unnorm = generated_data_gen * (features_max.values - features_min.values) + features_min.values

# Create a DataFrame to store the unnormalized generated data
generated_df_unnorm = pd.DataFrame(generated_data_unnorm, columns=features.columns)

# Since the labels were not normalized, we can directly assign the random_labels_gen
generated_df_unnorm['AQI'] = random_labels_gen

# Display the first few rows of the unnormalized generated DataFrame
print(generated_df_unnorm.head())

# Save the unnormalized generated data to a file if needed
# generated_df_unnorm.to_csv('generated_data_unnormalized.csv', index=False)


In [None]:
# Saving the generated data to a csv file
generated_df.to_csv('generated_data.csv', index=False)