In [3]:
import os
import random
import warnings
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from imblearn.over_sampling import SMOTE, ADASYN
from sklearn.preprocessing import StandardScaler, OneHotEncoder
from sklearn.compose import ColumnTransformer
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers, Model
from tensorflow.keras.utils import to_categorical

def seed_everything(seed:int=42):
    os.environ['PYTHONHASHSEED'] = str(seed)
    random.seed(seed)
    np.random.seed(seed)
    tf.random.set_seed(seed)

warnings.filterwarnings('ignore')
seed_everything()

In [2]:
def build_generator(z_dim, condition_size, output_dim):
    # Latent input and condition input
    z = layers.Input(shape=(z_dim,))
    condition = layers.Input(shape=(condition_size,))
    x = layers.concatenate([z, condition])

    # Hidden layers
    x = layers.Dense(128, activation='relu')(x)
    x = layers.Dense(256, activation='relu')(x)

    # Output layer with output_dim dimensions
    output = layers.Dense(output_dim, activation='tanh')(x)

    # Generator model definition
    generator = Model([z, condition], output, name='generator')
    return generator

In [4]:
def build_discriminator(data_dim, condition_size):
    # Data input and condition input
    data = layers.Input(shape=(data_dim,))
    condition = layers.Input(shape=(condition_size,))
    x = layers.concatenate([data, condition])

    # Hidden layers
    x = layers.Dense(256, activation='relu')(x)
    x = layers.Dense(128, activation='relu')(x)

    # Output layer with single neuron (binary classification)
    output = layers.Dense(1, activation='sigmoid')(x)

    # Discriminator model definition
    discriminator = Model([data, condition], output, name='discriminator')
    return discriminator

In [5]:
class CGAN():
    def __init__(self, z_dim, data_dim, condition_size):
        self.z_dim = z_dim
        self.data_dim = data_dim
        self.condition_size = condition_size

        # Build and compile the discriminator
        self.discriminator = build_discriminator(data_dim, condition_size)
        self.discriminator.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

        # Build and compile the generator
        self.generator = build_generator(z_dim, condition_size, data_dim)

        # Create the combined model
        z = layers.Input(shape=(z_dim,))
        condition = layers.Input(shape=(condition_size,))
        generated_data = self.generator([z, condition])

        # Only train the generator in the combined model
        self.discriminator.trainable = False
        valid = self.discriminator([generated_data, condition])

        # Combined model (stacked generator and discriminator)
        self.combined = Model([z, condition], valid)
        self.combined.compile(loss='binary_crossentropy', optimizer='adam')

In [52]:
def train_cgan(cgan, data, conditions, epochs, batch_size, z_dim):
    half_batch = int(batch_size / 2)

    for epoch in range(epochs):

        # ---------------------
        # Train Discriminator
        # ---------------------
        # Select a random half batch of real data
        idx = np.random.randint(0, len(data), half_batch)
        real_data, real_conditions = data[idx], conditions[idx]

        # Generate a half batch of new data
        noise = np.random.normal(0, 1, (half_batch, z_dim))
        gen_data = cgan.generator.predict([noise, real_conditions])

        # Train the discriminator (real classified as ones and generated as zeros)
        d_loss_real = cgan.discriminator.train_on_batch([real_data, real_conditions], np.ones((half_batch, 1)))
        d_loss_fake = cgan.discriminator.train_on_batch([gen_data, real_conditions], np.zeros((half_batch, 1)))
        d_loss = 0.5 * np.add(d_loss_real, d_loss_fake)

        # ---------------------
        #  Train Generator
        # ---------------------
        # Condition for generator training
        sampled_conditions = conditions[np.random.randint(0, conditions.shape[0], batch_size)]

        # Train the generator (wants discriminator to mistake images as real)
        g_loss = cgan.combined.train_on_batch([np.random.normal(0, 1, (batch_size, z_dim)), sampled_conditions], np.ones((batch_size, 1)))

        # Print progress
        print(f"Epoch {epoch}/{epochs} [D loss: {d_loss[0]}, acc.: {100*d_loss[1]}%] [G loss: {g_loss}]")

In [37]:
df = pd.read_csv('./dataset/ff.csv')
target = 'BE5_1'
idx = df[target].dropna().index
df = df.loc[idx]
df.head(3)

df.drop(['year', 'LQ1_mn'], axis=1, inplace=True)
df = df[df['Total_slp_wk'] <= 24]

def make_targets(targets):
    return 1 if targets >=5 else 0

df[target] = df.apply(
    lambda row: make_targets(row[target]), axis=1
)

df[target].value_counts()

BE5_1
0    3305
1     415
Name: count, dtype: int64

In [38]:
# 1차년도
NUMERIC = ['year', 'HE_FEV1FVC', 'age', 'Total_slp_wk']
CATEGORICAL = ['sex', 'occp', 'EC1_1', 'cfam', 'marri_1', 'BH9_11', 'HE_DM', 'DC6_dg', 'DF2_dg', 'HE_HPdg', 'BP5']
ONE_HOT = ['BS3_1', 'edu', 'BP1', 'D_1_1', 'LQ_1EQL', 'LQ_2EQL', 'LQ_3EQL', 'LQ_4EQL', 'LQ_5EQL', 'BO1_1', 'BO2_1']
# 2차년도
NUMERIC2 = ['DI1_ag', 'DE1_ag', 'LQ1_mn', 'BS6_2_1', 'BS6_3']
CATEGORICAL2 = ['DI1_pt', 'DE1_pt', 'DE1_3', 'BH1', 'BH2_61', 'LQ4_00', 'LQ4_05', 'LQ1_sb', 'MO1_wk', 'HE_cough1', 'HE_sput1', 'BD1', 'BD7_5', 'BP6_10', 'BP6_31']
ONE_HOT2 = ['DI1_2', 'EC_pedu_1', 'EC_pedu_2', 'BS5_1', 'BD7_4']
# 생성변수
NUMERIC3 = ['eq_5d']
CATEGORICAL3 = ['sm_present', 'mh_stress']
ONE_HOT3 = ['cage']
# Targets
TARGETS = ['BE3_31', 'BE5_1']

f_NUMERIC = (NUMERIC + NUMERIC2 + NUMERIC3).copy()
for idx in ['DI1_ag', 'DE1_ag']:
    f_NUMERIC.remove(idx)

f_CATEGORICAL = CATEGORICAL + CATEGORICAL2 + CATEGORICAL3.copy()
for idx in ['DC6_dg', 'DF2_dg', 'DI1_pt', 'DE1_pt', 'DE1_3', 'BP6_31']:
    f_CATEGORICAL.remove(idx)

f_ONE_HOT = ONE_HOT + ONE_HOT2 + ONE_HOT3.copy()
f_ONE_HOT.remove('DI1_2')

In [39]:
df.drop([
    'HE_cough1', 'HE_sput1', 'EC_pedu_1', 'EC_pedu_2', 'HE_DM', 'BD7_4', 'BD7_5', 'BH9_11', 'BP6_10',
    'BP5', 'BH1', 'BS5_1'
], axis=1, inplace=True)

In [40]:
for col in df.columns:
    df[col].fillna(df[col].mode()[0], inplace=True)

In [43]:
for val in f_NUMERIC:
    if val not in list(df.columns):
        f_NUMERIC.remove(val)
        print(val)

for val in f_CATEGORICAL:
    if val not in list(df.columns):
        f_CATEGORICAL.remove(val)
        print(val)

for val in f_ONE_HOT:
    if val not in list(df.columns):
        f_ONE_HOT.remove(val)
        print(val)

BD7_4


In [34]:
standard = StandardScaler()
onehot = OneHotEncoder(handle_unknown='ignore')

preprocessor = ColumnTransformer(
    transformers=[
        ('numeric', standard, f_NUMERIC),
        ('onehot', onehot, f_ONE_HOT)
    ]
)

In [44]:
conditions = df['BE5_1']
df = preprocessor.fit_transform(df)

In [55]:
conditions = conditions.values

In [56]:
conditions.shape

(3720,)

In [57]:
# 모델 파라미터 설정
z_dim = 100  # 잠재 공간의 차원
data_dim = df.shape[1]  # 데이터 차원 (여기서는 49)
condition_size = 1  # 조건 벡터의 크기

# CGAN 모델 초기화
cgan = CGAN(z_dim, data_dim, condition_size)

# 모델 학습
train_cgan(cgan, df, conditions, epochs=1000, batch_size=32, z_dim=z_dim)

Epoch 0/1000 [D loss: 0.7597938776016235, acc.: 37.5%] [G loss: 0.7688602805137634]
Epoch 1/1000 [D loss: 0.651587724685669, acc.: 68.75%] [G loss: 0.7229795455932617]
Epoch 2/1000 [D loss: 0.6325002610683441, acc.: 65.625%] [G loss: 0.7158691883087158]
Epoch 3/1000 [D loss: 0.5938315540552139, acc.: 56.25%] [G loss: 0.6460100412368774]
Epoch 4/1000 [D loss: 0.5922499448060989, acc.: 50.0%] [G loss: 0.6240088939666748]
Epoch 5/1000 [D loss: 0.5749521553516388, acc.: 53.125%] [G loss: 0.5722019076347351]
Epoch 6/1000 [D loss: 0.6225968152284622, acc.: 53.125%] [G loss: 0.5649540424346924]
Epoch 7/1000 [D loss: 0.6364603936672211, acc.: 50.0%] [G loss: 0.5774600505828857]
Epoch 8/1000 [D loss: 0.5867083072662354, acc.: 53.125%] [G loss: 0.5851554274559021]
Epoch 9/1000 [D loss: 0.5811437740921974, acc.: 50.0%] [G loss: 0.6305828094482422]
Epoch 10/1000 [D loss: 0.588242918252945, acc.: 50.0%] [G loss: 0.6665276288986206]
Epoch 11/1000 [D loss: 0.5925745964050293, acc.: 53.125%] [G loss: 

In [58]:
def generate_data(generator, num_samples, z_dim, condition):
    noise = np.random.normal(0, 1, (num_samples, z_dim))
    generated_data = generator.predict([noise, condition])
    return generated_data

# 새로운 데이터 생성
num_samples = 1000  # 생성할 데이터 샘플 수
condition = np.array([[1]] * num_samples)  # 조건 벡터 예시
new_data = generate_data(cgan.generator, num_samples, z_dim, condition)



In [61]:
df.shape

(3720, 49)

In [67]:
df[2]

array([ 0.94474763, -1.730421  ,  0.91005417, -0.64934995, -0.61515284,
        0.6288537 ,  0.        ,  0.        ,  1.        ,  0.        ,
        1.        ,  0.        ,  0.        ,  0.        ,  0.        ,
        0.        ,  0.        ,  1.        ,  0.        ,  0.        ,
        0.        ,  1.        ,  0.        ,  0.        ,  1.        ,
        0.        ,  0.        ,  1.        ,  0.        ,  0.        ,
        1.        ,  0.        ,  0.        ,  1.        ,  0.        ,
        0.        ,  1.        ,  0.        ,  0.        ,  1.        ,
        0.        ,  0.        ,  1.        ,  0.        ,  0.        ,
        1.        ,  0.        ,  0.        ,  0.        ])

In [68]:
new_data[2]

array([ 1.       , -1.       ,  0.9999995,  1.       ,  1.       ,
       -1.       , -1.       , -1.       ,  1.       ,  1.       ,
        1.       ,  1.       ,  1.       ,  1.       ,  1.       ,
       -1.       , -1.       ,  1.       ,  1.       , -1.       ,
       -1.       ,  1.       , -1.       ,  1.       ,  1.       ,
        1.       ,  1.       ,  1.       ,  1.       , -1.       ,
        1.       , -1.       ,  0.9785934,  1.       ,  1.       ,
       -1.       ,  1.       ,  1.       ,  1.       ,  1.       ,
       -1.       ,  1.       ,  1.       , -1.       ,  0.9999975,
       -1.       ,  1.       ,  0.6595622,  1.       ], dtype=float32)