In [1]:
import warnings
warnings.filterwarnings("ignore")

In [2]:
import joblib
import numpy as np
import pandas as pd
import lightgbm as lgb


import torch
import torch.nn as nn

import gym
from stable_baselines3 import PPO, A2C, DQN, SAC, TD3
from stable_baselines3.common.vec_env import DummyVecEnv
from stable_baselines3.common.evaluation import evaluate_policy

import altair as alt
import seaborn as sns
import matplotlib.pyplot as plt
random_state = 6
np.random.seed(random_state)

In [3]:
if torch.cuda.is_available():
    device = torch.device('cuda')
else:
    device = torch.device('cpu')
print(device)

cuda


### Load data

In [4]:
## look data with pandas
train_file = "data/rupturemodel_train.txt"
val_file = "data/rupturemodel_validate.txt"
test_file = "data/rupturemodel_test.txt"

df_train= pd.read_csv(train_file, sep=" ", header = None)
df_val= pd.read_csv(val_file, sep=" ", header = None)
df_test= pd.read_csv(test_file, sep=" ", header = None)

columns =  ['height', 'width', 'sxx', 'sxy', 'syy', 'sdrop', 'mud', 'dc', 'label']
df_train.columns = columns
df_val.columns = columns
df_test.columns = columns

frames = [df_train, df_val]
df_train = pd.concat(frames)
print('train data shape {} and test data shape {}'.format(np.shape(df_train), np.shape(df_test)))

train data shape (1600, 9) and test data shape (400, 9)


### Feature Engineering

In [5]:
def create_new_features(df: pd.DataFrame) -> pd.DataFrame:
    df_new = df.copy()
    # Create new features
    df_new['height_width_ratio'] = df_new['height'] / df_new['width']
    df_new['normal_stress_diff'] = df_new['sxx'] - df_new['syy']
    df_new['friction_product'] = df_new['mud'] * (df_new['sdrop'])
    df_new['stress_ratio'] = df_new['sxy'] / df_new['syy']
    df_new['static_dynamic_friction_diff'] = (
        df_new['mud'] + df_new['sdrop']) - df_new['mud']
    df_new['stress_diff_dynamic_strength'] = df_new['sxy'] - \
        (df_new['syy'] * df_new['mud'])
    df_new['normalized_dc'] = df_new['dc'] / df_new['width']
    return df_new

In [6]:
train_file = "data/rupturemodel_train.txt"
columns = ['height', 'width', 'sxx', 'sxy',
           'syy', 'sdrop', 'mud', 'dc', 'label']
df_train = pd.read_csv(train_file, sep=" ", header=None)
df_train.columns = columns

In [14]:
# Define your reinforcement learning environment
from typing import List

class GeneratorEnv(gym.Env):
    def __init__(self, supervised_model):
        super(GeneratorEnv, self).__init__()
        self.supervised_model = supervised_model
        self.input_size = 100
        self.scaler = joblib.load('./models/scaler.pkl')
        self.state = torch.randn(100).to(device)
        self.action_space = gym.spaces.Box( low = 0, high = 1, shape = (8,), dtype = np.float32)
        self.observation_space = gym.spaces.Box(
            low=-np.inf, high=np.inf, shape=(100,), dtype=np.float32)

    def step(self, action):
        self.state = torch.randn(1, self.input_size).to(device)
        processed_data = self.process_for_supervised_model(np.array(action))
        reward = self.supervised_model.predict(processed_data)
        done = False
        info = {}
        return self.state.cpu().numpy(), reward, done, info

    def reset(self):
        self.state = torch.randn(100).to(device)
        return self.state.cpu().numpy()

    def process_for_supervised_model(self, generated_data: np.array) -> np.array:
        # Process the generated data to make it compatible with the supervised model
        columns = ['height', 'width', 'sxx',
                   'sxy', 'syy', 'sdrop', 'mud', 'dc']
        de_normalized = self.scaler.inverse_transform(
            generated_data.reshape(1, -1))  # Reshape to a 2D array
        df = pd.DataFrame(de_normalized, columns=columns)
        df = create_new_features(df)
        return df.values

In [15]:
# Create the custom environment
supervised_model = lgb.Booster(model_file='./models/best_supervised_model.txt')
env = DummyVecEnv([lambda: GeneratorEnv(supervised_model)])

In [16]:
# Train the generator using PPO
model_name = 'rl_model_ppo'
model = PPO('MlpPolicy', env,
            verbose=0,
            tensorboard_log="./logs/rl_logs/")
model.learn(total_timesteps=100000)

<stable_baselines3.ppo.ppo.PPO at 0x7f654c4fda30>

In [17]:
model_name = 'rl_model_ppo'

In [18]:
### Save the model
model.save(f'./models/{model_name}')

### Save the environment
joblib.dump(env, f'./models/{model_name}_env.joblib')

['./models/rl_model_ppo_env.joblib']

In [19]:
### Load the model
loaded_model = PPO.load(f'./models/{model_name}')

### Load the environment
loaded_env = joblib.load(f'./models/{model_name}_env.joblib')
loaded_model.set_env(loaded_env)

In [22]:
# loaded_model.policy

ActorCriticPolicy(
  (features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (pi_features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (vf_features_extractor): FlattenExtractor(
    (flatten): Flatten(start_dim=1, end_dim=-1)
  )
  (mlp_extractor): MlpExtractor(
    (policy_net): Sequential(
      (0): Linear(in_features=100, out_features=64, bias=True)
      (1): Tanh()
      (2): Linear(in_features=64, out_features=64, bias=True)
      (3): Tanh()
    )
    (value_net): Sequential(
      (0): Linear(in_features=100, out_features=64, bias=True)
      (1): Tanh()
      (2): Linear(in_features=64, out_features=64, bias=True)
      (3): Tanh()
    )
  )
  (action_net): Linear(in_features=64, out_features=8, bias=True)
  (value_net): Linear(in_features=64, out_features=1, bias=True)
)

In [None]:
rewards_array = []
generated_data = []
obs = loaded_env.reset()
for i in range(1000):
    action, _states = loaded_model.predict(obs)
    obs, rewards, dones, info = loaded_env.step(action)
    generated_data.append(list(action[0]))
    rewards_array.append(rewards[0])

In [None]:
## Process the generated data to make it compatible with the supervised model
scaler = joblib.load('./models/scaler.pkl')
def process_for_supervised_model(generated_data):
    # Process the generated data to make it compatible with the supervised model
    columns = ['height', 'width', 'sxx',
               'sxy', 'syy', 'sdrop', 'mud', 'dc']
    de_normalized = scaler.inverse_transform(generated_data)  # Reshape to a 2D array
    df = pd.DataFrame(de_normalized, columns=columns)
    df = create_new_features(df)
    return df

data = np.array(generated_data)
df_generated = process_for_supervised_model(data)

In [None]:
columns = ['height', 'width', 'sxx',
           'sxy', 'syy', 'sdrop', 'mud', 'dc']
df_generated[columns].sample(5)

In [None]:
df_train.describe()

In [None]:
df_generated.describe()

In [None]:
df_rewards = pd.DataFrame(rewards_array, columns=['reward'])
df_rewards.describe()

In [None]:
# Plot distribution plot using altaire
alt.Chart(df_rewards).mark_bar().encode(
    alt.X('reward', bin=alt.Bin(maxbins=100)),
    y='count()',
).properties(
    width=600,
    height=400
).interactive()

In [None]:
combined_df = pd.concat([df_train, df_generated], ignore_index=True)
combined_df['data_type'] = ['train'] * len(df_train) + ['generated'] * len(df_generated)

In [None]:
# Plot distribution plot using altaire
alt.Chart(combined_df).mark_bar().encode(
    alt.X('width', bin=alt.Bin(maxbins=100)),
    y='count()',
    color='data_type'
).properties(
    width=600,
    height=400
).interactive()
 

In [None]:
df_generated['rewards']  = df_rewards['reward']

In [None]:
alt.Chart(df_generated).mark_rect().encode(
    alt.X('sdrop', bin=alt.Bin(maxbins=100)),
    alt.Y('width', bin=alt.Bin(maxbins=100)),
    alt.Color('rewards', scale=alt.Scale(scheme='redyellowblue'))
).properties(
    width=600,
    height=400
).interactive()


### Understand the parameters of the generator

In [None]:
import optuna
import numpy as np

# Define the objective function to optimize


def objective(trial):
    input_noise = torch.randn(1, 100).to(device)
    generator_model.eval()
    generator_output = generator_model(input_noise)
    generator_output = generator_output.squeeze(
        0).detach().cpu().numpy().reshape(1, -1)
    df_generated = process_for_supervised_model(generator_output)
    reward = supervised_model.predict(df_generated)
    return reward

In [None]:
# Define the optimization study
study = optuna.create_study(direction='maximize')

# Optimize the objective function for a fixed number of trials
n_trials = 1000
study.optimize(objective, n_trials=n_trials, show_progress_bar=False)


In [None]:
# Print the best parameter settings and reward found
best_params = study.best_params
best_reward = study.best_value
print(f"Best parameter settings: {best_params}")
print(f"Best reward: {best_reward}")