In [1]:
import logging
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
import gymnasium as gym
import torch
import torch.nn as nn

# Configure logging
logger = logging.getLogger(__name__)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

class TimeSeriesTransformer(nn.Module):
    """
    A Transformer-based model for time series data.
    This class projects input features to an embedding, adds positional
    encodings, and then processes the inputs using a Transformer encoder.
    Finally, a decoder layer is used to produce the output.
    Args:
        input_size (int): Number of features in the input time series data.
        embed_dim (int): Dimensionality of the learned embedding space.
        num_heads (int): Number of attention heads in each Transformer layer.
        num_layers (int): Number of Transformer encoder layers.
        sequence_length (int): Length of the input sequences (time steps).
        dropout (float, optional): Dropout probability to apply in the
            Transformer encoder layers. Defaults to 0.1.
    Attributes:
        model_type (str): Identifier for the model type ('Transformer').
        embedding (nn.Linear): Linear layer for input feature embedding.
        positional_encoding (torch.nn.Parameter): Parameter storing the
            positional encodings used to retain temporal information.
        transformer_encoder (nn.TransformerEncoder): Stack of Transformer
            encoder layers with optional final LayerNorm.
        decoder (nn.Linear): Linear layer used to produce the final output
            dimensions.
    Forward Inputs:
        src (torch.Tensor): Input tensor of shape (batch_size, sequence_length,
            input_size).
    Forward Returns:
        torch.Tensor: Output tensor of shape (batch_size, embed_dim) from the
            last time step.
    Raises:
        ValueError: If the model output contains NaN or Inf values, indicating
            numerical instability.
    """
    # input_size: Input features အရေအတွက် (ဥပမာ 10၊ price + SMA/RSI indicators စတာ)။
    # embed_dim: Internal embedding အတိုင်းအတာ (ဥပမာ 64၊ data ကို ပိုနက်ရှိုင်း အောင် ပြောင်း)။
    # num_heads: Attention heads အရေအတွက် (multi-head attention အတွက်၊ မတူညီ အနေနဲ့ အာရုံ စိုက်)။
    # num_layers: Encoder layers အရေအတွက် (ဥပမာ 2၊ ရိုးရှင်း ထားတာ)။
    # sequence_length: Input sequence အရှည် (ဥပမာ 20 timesteps)။
    # dropout=0.1: Overfitting ကနေ ကာကွယ် တဲ့ dropout rate။
    def __init__(self, input_size, embed_dim, num_heads, num_layers,sequence_length, dropout=0.1):
        super(TimeSeriesTransformer, self).__init__()
        self.model_type = 'Transformer'
        self.embed_dim = embed_dim

        # Embedding layer to project input features to embed_dim dimensions
        self.embedding = nn.Linear(input_size, embed_dim).to(device)

        # Positional encoding parameter
        self.positional_encoding = nn.Parameter(torch.zeros(1, sequence_length, embed_dim).to(device))

        # Transformer encoder layer
        encoder_layer = nn.TransformerEncoderLayer(
            d_model=embed_dim,
            nhead=num_heads,
            dropout=dropout,
            norm_first=True  # Apply LayerNorm before attention and feedforward
        ).to(device)
        self.transformer_encoder = nn.TransformerEncoder(
            encoder_layer,
            num_layers=num_layers,
            norm=nn.LayerNorm(embed_dim).to(device) # Add LayerNorm at the end of the encoder
        )

        # Decoder layer to produce final output
        self.decoder = nn.Linear(embed_dim, embed_dim).to(device)

    def forward(self, src):
        # Apply embedding layer and add positional encoding
        src = self.embedding(src) + self.positional_encoding

        # Pass through the transformer encoder
        output = self.transformer_encoder(src)

        # Pass through the decoder layer
        output = self.decoder(output)

        # Check for NaN or Inf values for debugging
        if torch.isnan(output).any() or torch.isinf(output).any():
            logger.error("Transformer output contains NaN or Inf values")
            raise ValueError("Transformer output contains NaN or Inf values")

        # Return the output from the last time step
        return output[:, -1, :]


In [2]:
class CustomCombinedExtractor(BaseFeaturesExtractor):
    """
    A custom feature extractor that normalizes input observations and processes them
    using a transformer-based architecture for dimensionality reduction and enhanced
    feature representation.
    Parameters:
        observation_space (gym.spaces.Box): Defines the shape and limits of input data.
        sequence_length (int): The length of the time series to be processed.
    Attributes:
        layernorm_before (nn.LayerNorm): Normalizes input data to improve training stability.
        transformer (TimeSeriesTransformer): Processes normalized input sequences and extracts features.
    Methods:
        forward(observations):
            Applies layer normalization to the incoming observations, then passes them
            through the transformer. Raises a ValueError if invalid values (NaNs or inf)
            are detected in the output.
    """
    
    def __init__(self, observation_space: gym.spaces.Box, sequence_length):
        super(CustomCombinedExtractor, self).__init__(observation_space, features_dim=64)
        num_features = observation_space.shape[1]  # Should be 10 in this case

        # Ensure that embed_dim is divisible by num_heads
        embed_dim = 64
        num_heads = 2

        self.layernorm_before = nn.LayerNorm(num_features) # Added Layer Normalization before transformer

        self.transformer = TimeSeriesTransformer(
            input_size=num_features,
            embed_dim=embed_dim,
            num_heads=num_heads,
            num_layers=2,
            sequence_length =sequence_length
        )

    def forward(self, observations):
        # မူရင်း input tensor ရဲ့ device ကို မှတ်သားထားပါ
        input_device = observations.device
        
        # Apply layer normalization
        # Apply layer normalization, ဝင်လာတဲ့ observations ကို Transformer ရဲ့ device ပေါ်ကို ရွှေ့ပါ
        normalized_observations = self.layernorm_before(observations.float().to(device)) # Ensure float type

        x = self.transformer(normalized_observations)
        if torch.isnan(x).any() or torch.isinf(x).any():
            logger.error("Invalid values in transformer output")
            raise ValueError("Invalid values in transformer output")
        
        # ⚠️ ပြင်ဆင်ချက်: Output tensor ကို မူရင်း input tensor ရဲ့ device သို့ ပြန်ပို့ပါ
        # PPO Agent ရဲ့ Policy/Value Network က အလုပ်လုပ်တဲ့ device ပေါ်ကို ပြန်ပို့ဖို့ လိုပါတယ်။
        # သို့သော်လည်း၊ Stable-Baselines3 က Policy/Value Network ကို နောက်ပိုင်းမှာ to(device) နဲ့ ရွှေ့တဲ့အတွက်
        # ဒီနေရာမှာ အန္တရာယ်ကင်းအောင် မူရင်း input device ကို ပြန်ပို့တာ ဒါမှမဟုတ် Agent သုံးမယ့် device ပေါ်မှာပဲ ထားတာ နှစ်မျိုး လုပ်နိုင်ပါတယ်။
        # အကောင်းဆုံးကတော့ Policy Network တွေက GPU ပေါ်မှာရှိရင် GPU မှာပဲ ထားခဲ့တာပါ။
        
        # သို့သော်လည်း၊ SB3 ရဲ့ စံနှုန်းကို လိုက်နာဖို့၊ CPU ပေါ်ကလာရင် CPU ကို ပြန်ပို့တာ ပိုကောင်းပါတယ်။
        if str(input_device) == 'cpu':
            return x.to(input_device)
        else:
             # Agent က GPU မှာ Run ရင်တော့ GPU မှာပဲ ထားခဲ့ပါ
            return x


In [3]:
# Test အတွက် လိုအပ်တဲ့ parameters 
SEQ_LEN = 20
INPUT_SIZE = 10
EMBED_DIM = 64
NUM_HEADS = 2
NUM_LAYERS = 2
BATCH_SIZE = 4 # တစ်ပြိုင်နက်တည်း ထည့်သွင်းမယ့် sample အရေအတွက်

# Test အတွက် dummy data ကို ပြင်ဆင်ခြင်း
# Shape: (Batch Size, Sequence Length, Input Size)
dummy_observations = torch.randn(BATCH_SIZE, SEQ_LEN, INPUT_SIZE) 

# Dummy observation_space (gym.spaces.Box ကို အတုလုပ်သည်)
class DummyObsSpace:
    def __init__(self, shape):
        self.shape = shape
dummy_observation_space = DummyObsSpace((SEQ_LEN, INPUT_SIZE)) 

print(f"Input Shape: {dummy_observations.shape}")
# Input Shape: torch.Size([4, 20, 10])

Input Shape: torch.Size([4, 20, 10])


In [4]:
# 1. Extractor ကို စတင်တည်ဆောက်
extractor = CustomCombinedExtractor(
    observation_space=dummy_observation_space, 
    sequence_length=SEQ_LEN
).to(device) # Extractor ကို သတ်မှတ်ထားတဲ့ device ပေါ် ရွှေ့ပါ

# 2. Forward Pass လုပ်ခြင်း
# dummy_observations ကို device ပေါ်ကို ရွှေ့စရာမလို၊ extractor ရဲ့ forward ထဲမှာ သူ့ဘာသာ ရွှေ့ပါလိမ့်မယ်။
output_features = extractor(dummy_observations) 

# 3. Output Shape ကို စစ်ဆေးခြင်း
print(f"Output Features Shape: {output_features.shape}")

# မျှော်မှန်းထားသော Output: (Batch Size, Feature Dimension/Embed Dim)
EXPECTED_SHAPE = torch.Size([BATCH_SIZE, EMBED_DIM])

assert output_features.shape == EXPECTED_SHAPE, \
    f"Output shape error! Expected: {EXPECTED_SHAPE}, Got: {output_features.shape}"
print("✅ Output Shape is Correct.")

print(f"Output Shape: {EXPECTED_SHAPE}")


Output Features Shape: torch.Size([4, 64])
✅ Output Shape is Correct.
Output Shape: torch.Size([4, 64])




In [5]:
# Output tensor က မှန်ကန်တဲ့ device ပေါ်မှာ ရှိမရှိ စစ်ဆေးခြင်း
# သင့် code က global 'device' variable ကို သုံးထားတဲ့အတွက်၊ output က 'device' ပေါ်မှာ ရှိရပါမယ်။
assert output_features.device == device, \
    f"Device Error! Expected: {device}, Got: {output_features.device}"
print(f"✅ Output is on the Correct Device: {device}.")

# (စမ်းသပ်ချက် အပို): CPU ပေါ်က input ဝင်လာရင်တောင် GPU ကို ရွှေ့သွားခြင်း ရှိ/မရှိ စစ်ဆေးပြီးသား ဖြစ်ပါတယ်။

✅ Output is on the Correct Device: cpu.


In [6]:
# NaN တန်ဖိုးပါတဲ့ input ကို ဖန်တီးခြင်း
nan_observations = torch.full((BATCH_SIZE, SEQ_LEN, INPUT_SIZE), float('nan'))
nan_observations[0, 0, 0] = 1.0 # NaN မဟုတ်တဲ့ တန်ဖိုးတစ်ခု ထည့်ကြည့်

try:
    extractor(nan_observations)
except ValueError as e:
    # NaN ပါရင် ValueError ထွက်လာရမယ်။
    assert "NaN or Inf values" in str(e) or "Invalid values" in str(e)
    print("✅ NaN/Inf Error Handling is Working (Caught ValueError).")
except Exception as e:
    print(f"⚠️ Unexpected Error Type: {e}")

Transformer output contains NaN or Inf values


✅ NaN/Inf Error Handling is Working (Caught ValueError).


In [7]:
from gymnasium import spaces
import numpy as np
from stable_baselines3 import PPO


# 1. Custom Environment (Env) ကို တည်ဆောက်ခြင်း
# ဒါက test အတွက် အလွယ်ဆုံး environment ပုံစံ ဖြစ်ပါတယ်။
class DummyTimeEnv(gym.Env):
    def __init__(self, seq_len, input_size):
        super().__init__()
        # Observation space ကို CustomCombinedExtractor မျှော်မှန်းထားတဲ့အတိုင်း သတ်မှတ်
        self.observation_space = spaces.Box(
            low=-np.inf, high=np.inf, 
            shape=(seq_len, input_size), 
            dtype=np.float32
        )
        self.action_space = spaces.Discrete(3) # Buy, Sell, Hold
        self.state = np.zeros((seq_len, input_size), dtype=np.float32)

    def step(self, action):
        # State ကို random အနေနဲ့ အနည်းငယ် ပြောင်းလဲ
        self.state = np.roll(self.state, -1, axis=0) 
        self.state[-1] = np.random.randn(self.observation_space.shape[1]).astype(np.float32)
        reward = 0.0
        terminated = False
        truncated = False
        info = {}
        return self.state, reward, terminated, truncated, info

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.state = np.random.randn(*self.observation_space.shape).astype(np.float32)
        info = {}
        return self.state, info
    
# 2. Environment နှင့် Agent ကို တည်ဆောက်ခြင်း
env = DummyTimeEnv(SEQ_LEN, INPUT_SIZE)

try:
    # PPO Agent ကို Custom Extractor နဲ့ သတ်မှတ်ပြီး GPU ပေါ်မှာ train ဖို့ စမ်းသပ်
    model = PPO(
        "MlpPolicy", 
        env, 
        policy_kwargs=dict(
            features_extractor_class=CustomCombinedExtractor, 
            features_extractor_kwargs=dict(sequence_length=SEQ_LEN)
        ),
        verbose=0,
        device=device # Agent ကို သတ်မှတ်ထားတဲ့ device (GPU/CPU) ပေါ်မှာ ထားဖို့
    )
    
    # 3. Agent ကို အတိုချုံး Train လုပ်ခြင်း (Test အောင်မြင်ကြောင်း သေချာစေရန်)
    model.learn(total_timesteps=100)
    print(f"✅ PPO Agent Successfully Trained with Custom Extractor on {device}.")

except Exception as e:
    print(f"❌ PPO Integration Failed: {e}")

✅ PPO Agent Successfully Trained with Custom Extractor on cpu.
