<a href="https://colab.research.google.com/github/jiachoi-ds/Reinforcement-Learning/blob/Ji-Hyeon-Yoo/J.H._FinalProject.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [3]:
import numpy as np
import gym
import random


from gym import spaces


In [12]:
class MyEnv(gym.Env):

    metadata = {"render.modes": ["human"]}

    def __init__(self, K_min=1000.0, K_max=20000.0, max_steps=200):
        super().__init__()

        # state(obs): q, L, C, T, X, xI_cur, xH_cur
        # there are no done
        obs_low = np.array([0.0,    0.0,   0.0,   0.0,   0.0, 0.0, 0.0], dtype=np.float32)
        obs_high = np.array([1.0, 50000., 50000., 20000., 50.0, 1.0, 1.0], dtype=np.float32)
        self.observation_space = spaces.Box(obs_low, obs_high, dtype=np.float32) # define for obs space, and for using PPO (caution: string match)

        # Government Cap
        self.K_min = K_min
        self.K_max = K_max

        self.action_space = spaces.Box(low=np.array([self.K_min], dtype=np.float32), high=np.array([self.K_max], dtype=np.float32), dtype=np.float32) # define for action space, and using PPO (caution: string match)

        # weight parameter alpha for compute UG
        self.alpha_1 = 1.0 #for insurance company
        self.alpha_2 = 1.0 #for homeowner

        self.m = 500.0 # risk premium for Insurance Company

        # state transition hyper parameters
        self.q_bar = 0.04 #default(usual expect) probability for fire probability
        self.L_bar = 10000.0 #default(usual expect) loss when fire outbreak
        self.C_bar = 5000.0 #

        # momentum(mean-reversion) coefficient
        self.phi_q = 0.9
        self.phi_L = 0.9
        self.phi_C = 0.9

        # noise scaling parameter(effect on momentum)
        self.sigma_q = 0 #0.002
        self.sigma_L = 0 #500.0
        self.sigma_C = 0 #100.0

        ## noise from gaussian distribution
        # self.noise_q = random.randomm()
        # self.noise_L = random.randomm()
        # self.noise_C = random.randomm()
        ## >> redundant while using step()!

        ## dynamic parameter of market status. Especcially for 'when a insurance company puts a product on the market'
        # # it'll change while step() goes on, just initial settings as float.
        # self.delta_C_pos = 150.0
        # self.delta_C_neg = 80.0
        ## >> no use while using step()

        # extra management cost increase ratio for Insurance company.
        self.gamma_T = 50.0

        # satisfying differential ratio parameter for home owner
        self.gamma_X = 0.2
        self.rho_bar = 0.8 # goal coverage for home owner, never set as 1

        self.max_steps = max_steps
        self.step_count = 0

        self.state = None # set whatever you want as initial. I'll just leave it to reset()
        self.last_info = {}

    def seed(self, seed=None):
        np.random.seed(seed)

    def reset(self):
        q_0 = np.clip(np.random.normal(self.q_bar, 0.005), 0.001, 0.2)
        L_0 = np.random.normal(self.L_bar, 1000.0)
        C_0 = np.random.normal(self.C_bar, 300.0)
        T_0 = 1000.0
        X_0 = 5.0
        xI_cur = 0.0 # 0.0 or 1.0
        xH_cur = 0.0 # 0.0 or 1.0

        self.state = np.array( [q_0, L_0, C_0, T_0, X_0, xI_cur, xH_cur], dtype=np.float32)
        self.step_count = 0
        self.last_info = {}
        return self.state

    def step(self, action):
        self.step_count += 1
        # _cur : current state indexing label
        # _next: next state indexing label

        # action: government cap
        K_cur = float(np.clip(action[0], self.K_min, self.K_max))

        # unpack the state
        q_cur, L_cur, C_cur, T_cur, X_cur, xI_cur, xH_cur = self.state #set xI and xH as previousone for easy-comprehension

        # insurer premium decision (capped)
        P_cur = min(K_cur, q_cur * C_cur + self.m)

        # homeowner decision making (buy or not)
        desire_buy = 1.0 if P_cur <= (q_cur * C_cur + X_cur) else 0.0

        # expected profit for insurance company when homeowner buys
        exp_profit_if_buy = desire_buy * (P_cur - q_cur * C_cur) - T_cur

        if exp_profit_if_buy >= 0:
            xI_next = 1.0
            xH_next = desire_buy  # homeowner actually buys if it wanted to
        else:
            xI_next = 0.0
            xH_next = 0.0  # no insurer -> no purchase

        # compute utilities
        U_I = xI_next * (xH_next * (P_cur - q_cur * C_cur) - T_cur)
        U_H = xI_next * xH_next * (q_cur * C_cur - P_cur + X_cur) - q_cur * L_cur
        reward = self.alpha_1 * U_I + self.alpha_2 * U_H


        # state transition
        # fire probability (mean reversion + noise)
        q_next = self.phi_q * q_cur + (1 - self.phi_q) * self.q_bar + self.sigma_q * np.random.randn()
        q_next = float(np.clip(q_next, 0.001, 0.5))

        # loss size
        L_next = self.phi_L * L_cur + (1 - self.phi_L) * self.L_bar + self.sigma_L * np.random.randn()
        L_next = float(max(100.0, L_next))

        # cost of homeowner
        C_base = self.phi_C * C_cur + (1 - self.phi_C) * self.C_bar

        # market-response scale parameter

        up_ratio = 0.05 # no entry -> cost up by 5%
        down_ratio = 0.03 # entry -> cost down by 3%

        if xI_next == 0.0:
            C_next = C_base + up_ratio * C_cur
        else:
            C_next = C_base - down_ratio * C_cur

        # optional stochastic noise on C
        if self.sigma_C > 0.0:
            C_next += self.sigma_C * np.random.randn()

        C_next = float(max(100.0, C_next))

        # 7-4. operating cost
        T_next = T_cur + self.gamma_T * (1.0 - xI_next)

        # 7-5. psychological utility
        X_next = X_cur + self.gamma_X * (xH_next - self.rho_bar)
        X_next = float(np.clip(X_next, 0.0, 50.0))

        # 7-6. pack next state
        self.state = np.array(
            [q_next, L_next, C_next, T_next, X_next, xI_next, xH_next],
            dtype=np.float32
        )

        # truncation case
        done = self.step_count >= self.max_steps

        return self.state, float(reward), done

    def render(self, mode="human"):
      print("need to be implemented")
      return


In [13]:
env = MyEnv()
obs = env.reset()

num_steps = 10
for t in range(num_steps):
    action = env.action_space.sample()
    next_obs, reward, done = env.step(action)

    print(f"\n[STEP {t+1}]")
    print(f"Action (K): {action}")
    print(f"Reward: {reward:.3f}")
    print(f"Next state: {next_obs}")

    if done:
        print("\nEpisode finished early (max_steps reached).")
        break




[STEP 1]
Action (K): [6792.655]
Reward: -413.257
Next state: [3.9705385e-02 1.0375014e+04 5.6196143e+03 1.0500000e+03 4.8400002e+00
 0.0000000e+00 0.0000000e+00]

[STEP 2]
Action (K): [13398.543]
Reward: -411.944
Next state: [3.9734844e-02 1.0337512e+04 5.8386338e+03 1.1000000e+03 4.6800003e+00
 0.0000000e+00 0.0000000e+00]

[STEP 3]
Action (K): [8122.4663]
Reward: -410.759
Next state: [3.9761361e-02 1.0303761e+04 6.0467021e+03 1.1500000e+03 4.5200005e+00
 0.0000000e+00 0.0000000e+00]

[STEP 4]
Action (K): [5348.85]
Reward: -409.692
Next state: [3.9785225e-02 1.0273385e+04 6.2443667e+03 1.2000000e+03 4.3600006e+00
 0.0000000e+00 0.0000000e+00]

[STEP 5]
Action (K): [14809.225]
Reward: -408.729
Next state: [3.9806701e-02 1.0246046e+04 6.4321479e+03 1.2500000e+03 4.2000008e+00
 0.0000000e+00 0.0000000e+00]

[STEP 6]
Action (K): [8515.467]
Reward: -407.861
Next state: [3.9826032e-02 1.0221441e+04 6.6105405e+03 1.3000000e+03 4.0400009e+00
 0.0000000e+00 0.0000000e+00]

[STEP 7]
Action (K)