In [None]:
# -*- coding: utf-8 -*-
"""
Created on Fri Jun  7 16:17:46 2024

@author: User

1a: There is only 1 value in each alphas.
"""

from CFT_lego import CFT_lego
import cvxpy as cp
import numpy as np
from DG_analytic_v2 import DG
from Where_execute import Where_execute

root_dir = Where_execute()
cft = CFT_lego()

class LP:
    def __init__(self, low_delta_list, delta_truncation, delta_spacing, delta_max, 
                 rescale_factor, how_many_constraints,data):
        self.low_delta_list = low_delta_list
        self.delta_truncation = delta_truncation
        self.delta_spacing = delta_spacing
        self.delta_max = delta_max
        self.rescale_factor = rescale_factor
        
        self.num_grid = int((delta_max-max(low_delta_list))/ delta_spacing)
        self.delta_list = low_delta_list+[delta_truncation+delta_spacing*i for i in range(1, self.num_grid+2)]
        
        dn_constraints = [2*i+1 for i in range(how_many_constraints)]
        
        self.derivative_constraints = DG(data, self.delta_list, 2, how_many_constraints)*np.exp(-rescale_factor*np.array(self.delta_list))
        self.solver = np.array([2]+[0 for _ in range(how_many_constraints-1)])
        
        self.a1_eq = np.array(list(DG(data, self.low_delta_list, 2, 1)[0]*np.exp(-rescale_factor*np.array(self.low_delta_list))) + 
                              [0 for _ in range(1, self.num_grid+2)])
    
    def a1_bound(self):
        # Define and solve the CVXPY problem.
        ope = cp.Variable(len(self.delta_list))
        prob_low = cp.Problem(cp.Minimize(self.a1_eq@ope-2),
                               [self.derivative_constraints @ ope == self.solver, 
                                # ope[:len(self.low_delta_list)]>=1e-5,
                                ope>=0])
        
        prob_up = cp.Problem(cp.Maximize(self.a1_eq@ope-2),
                               [self.derivative_constraints @ ope == self.solver, 
                                # ope[:len(self.low_delta_list)]>=1e-5,
                                ope>=0])
        
        try:
            prob_low.solve()
            prob_up.solve()
            if type(prob_low.value) == np.float64 and type(prob_up.value) == np.float64:
                return [prob_low.value, prob_up.value]
            else:
               # print('Inf bound: ', [prob_low.value, prob_up.value])
                return None
        except:
           # print('No bound')
            return None
    
    def alpha_criteria(self):
        a1_range = self.a1_bound()
        if a1_range == None:
            return 'No such bound'
        else:
            return 'Exist bound', a1_range

class LP_data:
    def __init__(self, low_delta_list, delta_truncation, delta_spacing, delta_max, rescale_factor):
        self.low_delta_list = low_delta_list
        self.delta_truncation = delta_truncation
        self.delta_spacing = delta_spacing
        self.delta_max = delta_max
        self.rescale_factor = rescale_factor
    
    def stored_a_bound(self, how_many_constraints):
        import os
        import json
        file_name = f'LP_data/a_region_constraint_num{how_many_constraints}_with_alphabound.json'
        
        d_property = {'delta':self.low_delta_list, 'd_truncate':self.delta_truncation, 
                      'd_space':self.delta_spacing, 'd_max':self.delta_max, 'd_rf':self.rescale_factor}
        
        if os.path.isfile(root_dir+file_name):
            with open(os.path.join(root_dir, file_name), 'r') as file:
                combined_data = json.load(file)
            # 将 target_label 转换为 JSON 字符串
            target_label_str = json.dumps(d_property)
            
            # 获取对应的值
            values = combined_data.get(target_label_str)
            
            if values is None:
                print('Building new alpha json element!')
                lp = LP(self.low_delta_list, self.delta_truncation, 
                        self.delta_spacing, self.delta_max, self.rescale_factor, how_many_constraints)
                a_bound = lp.alpha_criteria()
                alpha_container = {'property':a_bound[0], 'alpha bound': a_bound[1]}
                # 创建以 d_property 标签为键的字典，值为 alpha_container
                combined_data[json.dumps(d_property)] = alpha_container
                
                # Write to JSON file
                with open(os.path.join(root_dir, file_name), 'w') as file:
                    json.dump(combined_data, file, indent=4)
                
                return alpha_container
            else:
                return values
        else:
            print('Building new alpha json file!')
            combined_data={}
            lp = LP(self.low_delta_list, self.delta_truncation, 
                    self.delta_spacing, self.delta_max, self.rescale_factor, how_many_constraints)
            a_bound = lp.alpha_criteria()
            alpha_container = {'property':a_bound[0], 'alpha bound': a_bound[1]}
            # 创建以 d_property 标签为键的字典，值为 alpha_container
            combined_data[json.dumps(d_property)] = alpha_container
            
            # Write to JSON file
            with open(os.path.join(root_dir, file_name), 'w') as file:
                json.dump(combined_data, file, indent=4)
            return alpha_container

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
from stable_baselines3 import PPO, SAC
from stable_baselines3.common.env_checker import check_env
from stable_baselines3.common.env_util import make_vec_env
from get_allowed_2states import get_allowed_2states
import random

# Define the custom environment
class LpEnv(gym.Env):
    def __init__(self,deltas_1_2,data,max_action=.5,delta_spacing=0.1,delta_max=50,penalty_not_allowed=5.,step_cost=.1,nder=5,max_steps=100,rew=1,delta_tol=.1):
        super(LpEnv, self).__init__()
        self.observation_space = spaces.Box(low=2, high=100, shape=(5,), dtype=np.float32)
        self.action_space = spaces.Box(low=-max_action, high=max_action, shape=(1,), dtype=np.float32)
        self.delta_spacing=delta_spacing
        self.delta_max=delta_max
        self.state = np.random.rand(3)
        self.penalty_not_allowed=penalty_not_allowed
        self.step_cost=step_cost
        self.nder=nder
        self.deltas_1_2=deltas_1_2
        self.max_steps=max_steps
        self.current_step=0
        self.reward =rew
        self.data=data
        self.delta_tol=delta_tol
        self.reset()
        
        
    def step(self, action):
        self.current_step+=1
        self.state[2]+=action  
        if self.state[2]<self.state[1]:
            self.state[2]=self.state[1]+(self.state[1]-self.state[2])

        bounds1=self.get_lp_bound(self.state.tolist())
        bounds2=self.get_lp_bound((self.state+np.array([0,0,self.delta_tol])).tolist())

        if bounds1!=None and bounds2==None:
            reward=self.reward
            done=True
            truncated=False
        elif bounds1==None:
            reward=-self.penalty_not_allowed
            done=True
            truncated=False
        else:
            reward=-self.step_cost
            truncated = self.current_step>=self.max_steps
            done=truncated
        bounds1=np.array(bounds1)
        if(bounds1.shape!=(2,)): bounds1=np.array([0,-1])
        return np.concatenate((self.state,bounds1)), reward, done, truncated,{}
        
    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.current_step=0
        [delta1,delta2]=self.random_deltas()
        self.state =np.array([delta1,delta2,delta2],dtype=np.float32)
        bounds1=self.get_lp_bound(self.state.tolist())
        bounds1=np.array(bounds1)
        if(bounds1.shape!=(2,)): bounds1=np.array([0,-1])
        return np.concatenate((self.state,bounds1),dtype=np.float32),{}
    
    def random_deltas(self):
        return random.choice(self.deltas_1_2)
    def render(self, mode='human'):
        pass
    
    def close(self):
        pass
    
    def get_lp_bound(self, state):
        
        lp =LP( state, state[-1], self.delta_spacing, self.delta_max, 
                 0.8, self.nder,self.data)
       
        

        return lp.a1_bound()

In [None]:
from get_allowed_2states import get_allowed_2states

import random
import pandas as pd

path = root_dir+"LP_data/isingDGn_v5.csv"
data = pd.read_csv(path)

arr=get_allowed_2states()
filtered_data = [item for item in arr if item[1] > 2]

In [None]:
import gym
import matplotlib.pyplot as plt
from stable_baselines3 import SAC
from stable_baselines3.common.callbacks import BaseCallback

class RewardCallback(BaseCallback):
    def __init__(self, verbose=0):
        super(RewardCallback, self).__init__(verbose)
        self.episode_rewards = []

    def _on_step(self) -> bool:
        if done := self.locals["dones"]:
            self.episode_rewards.append(self.locals["rewards"][0])
        return True



In [None]:
training_deltas=random.sample(filtered_data, 200)

env = LpEnv(training_deltas,data)


# Check if the environment follows the Gym API
#check_env(env, warn=True)

# Create a vectorized environment
vec_env = make_vec_env(lambda: LpEnv(training_deltas,data), n_envs=1)

# Choose the algorithm: PPO or SAC
model = SAC('MlpPolicy', vec_env, verbose=1)
reward_callback = RewardCallback()
# Train the model
model.learn(total_timesteps=100000, callback=reward_callback)

# Save the model
model.save("sac_custom_env")

# Load the model
model = SAC.load("sac_custom_env")

# Plot the rewards
plt.figure(figsize=(10, 6))
plt.plot(reward_callback.episode_rewards, label='Episode Reward')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Reward Over Training Duration')
plt.legend()
plt.grid(True)
plt.show()
