
# RL Simulator



In [None]:
# Install the gymnasium library for creating and using reinforcement learning environments.
%pip install gymnasium



In [None]:
# Authenticate with Google Cloud to access services like BigQuery.
from google.colab import auth
auth.authenticate_user()

# Import the BigQuery client library.
from google.cloud import bigquery
# Initialize a BigQuery client with your project ID.
client = bigquery.Client(project="rl-semester-project")

In [None]:
# Import the BigQuery client library.
from google.cloud import bigquery
# Initialize the BigQuery client with the correct project ID
client = bigquery.Client(project="rl-semester-project")

# Define the public project, dataset, and table to query.
public_proj = "google.com:google-cluster-data"
dataset     = "clusterdata_2019_h"
table       = "instance_usage"

# SQL query to retrieve schema information for the 'instance_usage' table
schema_df = client.query(f"""
SELECT
  column_name,  -- Select the column name
  data_type     -- Select the data type of the column
FROM `{public_proj}.{dataset}.INFORMATION_SCHEMA.COLUMNS`
WHERE table_name = '{table}' -- Filter for the specified table
ORDER BY ordinal_position   -- Order by the position of the column in the table
""").to_dataframe()

# Display the schema information in a DataFrame.
schema_df

Unnamed: 0,column_name,data_type
0,start_time,INT64
1,end_time,INT64
2,collection_id,INT64
3,instance_index,INT64
4,machine_id,INT64
5,alloc_collection_id,INT64
6,alloc_instance_index,INT64
7,collection_type,INT64
8,average_usage,"STRUCT<cpus FLOAT64, memory FLOAT64>"
9,maximum_usage,"STRUCT<cpus FLOAT64, memory FLOAT64>"


  return datetime.utcnow().replace(tzinfo=utc)


Aggregate CPU and memory load from instance_usage:

In [None]:
# SQL query to aggregate CPU and memory usage from the 'instance_usage' table
sql = """
SELECT
  # Truncate the start time to the minute to create time windows
  TIMESTAMP_TRUNC(TIMESTAMP_MICROS(start_time), MINUTE) AS time_window,
  # Calculate the average CPU usage for each time window
  AVG(average_usage.cpus) AS avg_cpu,
  # Calculate the average memory usage for each time window
  AVG(average_usage.memory) AS avg_mem,
  # Count the number of distinct machines active in each time window
  COUNT(DISTINCT machine_id) AS active_machines
FROM `google.com:google-cluster-data.clusterdata_2019_a.instance_usage`
# Group the results by the time window
GROUP BY time_window
# Order the results by the time window
ORDER BY time_window
# Limit the results to the first 10000 rows
LIMIT 10000
"""
# Execute the BigQuery query and load the results into a pandas DataFrame
df_usage = client.query(sql).to_dataframe()
# Display the first few rows of the DataFrame
df_usage.head()

  return datetime.utcnow().replace(tzinfo=utc)
  return datetime.utcnow().replace(tzinfo=utc)


Unnamed: 0,time_window,avg_cpu,avg_mem,active_machines
0,1970-01-01 00:05:00+00:00,0.006623,0.004912,9525
1,1970-01-01 00:06:00+00:00,0.003254,0.002733,3805
2,1970-01-01 00:07:00+00:00,0.00307,0.00277,4167
3,1970-01-01 00:08:00+00:00,0.00195,0.001823,4338
4,1970-01-01 00:09:00+00:00,0.001689,0.001468,5545


  return datetime.utcnow().replace(tzinfo=utc)


In [None]:
# Inspect the shape of the DataFrame (number of rows and columns)
print(df_usage.shape)
# Print the list of column names
print(df_usage.columns.tolist())
# Display the first 3 rows of the DataFrame to get a preview of the data
df_usage.head(3)

(10000, 4)
['time_window', 'avg_cpu', 'avg_mem', 'active_machines']


  return datetime.utcnow().replace(tzinfo=utc)


Unnamed: 0,time_window,avg_cpu,avg_mem,active_machines
0,1970-01-01 00:05:00+00:00,0.006623,0.004912,9525
1,1970-01-01 00:06:00+00:00,0.003254,0.002733,3805
2,1970-01-01 00:07:00+00:00,0.00307,0.00277,4167


In [None]:
#Make sure the time series is in order, since RL transitions depend on consecutive steps:
df_usage = df_usage.sort_values("time_window").reset_index(drop=True)
df_usage.head(3)

Unnamed: 0,time_window,avg_cpu,avg_mem,active_machines
0,1970-01-01 00:05:00+00:00,0.006623,0.004912,9525
1,1970-01-01 00:06:00+00:00,0.003254,0.002733,3805
2,1970-01-01 00:07:00+00:00,0.00307,0.00277,4167


  return datetime.utcnow().replace(tzinfo=utc)


## Building the Simulated RL Environment

Based on the EDA, we will now build a simulated reinforcement learning environment for autoscaling.



### Approach to Building the Simulated Environment

The `df_usage` DataFrame, containing the time-series data of aggregated CPU and memory usage and the number of active machines, serves as the driving force for our simulated autoscaling environment. Here's the approach:

1.  **Time-Based Simulation:** The simulator progresses through the `df_usage` DataFrame minute by minute, with each row representing a step in the simulation.
2.  **Workload Input:** At each step, the `avg_cpu` and `active_machines` values from the current minute in `df_usage` are used to estimate the total workload demand on the cluster.
3.  **Agent's Capacity:** The reinforcement learning agent controls a simulated cluster capacity (`self.current_capacity` in the `AutoScalingEnv`).
4.  **Action Impact:** The agent's scaling actions (scale up, scale down, or hold) directly modify this simulated capacity, within defined limits.
5.  **Utilization Calculation:** The key to the simulation is calculating the cluster utilization by comparing the estimated workload demand to the agent's current simulated capacity.
6.  **Reward Calculation:** A reward is calculated based on the utilization and the cost of the simulated capacity. The reward function encourages the agent to balance cost minimization with maintaining acceptable utilization to avoid performance issues and SLA violations.
7.  **Next State:** The simulator moves to the next minute in the `df_usage` DataFrame to provide the workload data for the subsequent state observation.

This process creates a dynamic environment where the agent learns to make optimal scaling decisions in response to realistic workload patterns provided by the historical data.

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import pandas as pd

# Define the RL Environment
class AutoScalingEnv(gym.Env):
    def __init__(self, usage_dataframe, initial_capacity=None, target_utilization=0.6, sla_threshold=0.8, cost_per_machine_per_minute=0.002, sla_penalty_weight=10.0, min_capacity=1, max_capacity=50):
        super(AutoScalingEnv, self).__init__()

        self.df_usage = usage_dataframe
        self.current_step = 0
        self.max_steps = len(self.df_usage) - 1

        # Simulation parameters
        # Use a reasonable initial capacity, not necessarily tied to the 'active_machines' in the dataset
        self.initial_capacity = initial_capacity if initial_capacity is not None else 10 # Starting with 10 machines as an example
        self.current_capacity = self.initial_capacity
        self.target_utilization = target_utilization
        self.sla_threshold = sla_threshold
        self.cost_per_machine_per_minute = cost_per_machine_per_minute
        self.sla_penalty_weight = sla_penalty_weight
        self.min_capacity = min_capacity
        self.max_capacity = max_capacity
        self.cooldown = 0 # To implement cooldown between scaling actions

        # Define action and observation space
        # Action: -1 (scale down), 0 (hold), 1 (scale up)
        self.action_space = spaces.Discrete(3)

        # Observation: [avg_cpu, avg_mem, current_capacity]
        # We'll add lagged values and trends later
        self.observation_space = spaces.Box(low=0, high=np.inf, shape=(3,), dtype=np.float32)

        # Define initial state
        self.state = self._get_obs()

    def _get_obs(self):
        if self.current_step > self.max_steps:
            return None

        row = self.df_usage.iloc[self.current_step]
        # Observation includes current workload (avg_cpu, avg_mem) and the agent's managed capacity
        obs = np.array([row['avg_cpu'], row['avg_mem'], self.current_capacity], dtype=np.float32)
        return obs


    def step(self, action):
        # Apply action
        # -1: scale down, 0: hold, 1: scale up

        # Apply action to capacity, respecting min/max and cooldown
        if self.cooldown == 0:
            if action == 2: # Scale up (mapping 0,1,2 to -1,0,1 for Discrete space)
                self.current_capacity = min(self.current_capacity + 1, self.max_capacity)
                self.cooldown = 5 # Example cooldown period
            elif action == 0: # Scale down
                 self.current_capacity = max(self.current_capacity - 1, self.min_capacity)
                 self.cooldown = 5 # Example cooldown period
            # Action 1 is hold, capacity doesn't change
        else:
            self.cooldown -= 1

        self.current_step += 1

        # Check if episode is done
        done = self.current_step > self.max_steps

        # Calculate reward if not done
        reward = 0
        utilization = 0 # Initialize utilization

        if not done:
            row = self.df_usage.iloc[self.current_step - 1] # Get data for the step *before* moving to the next
            # Assuming avg_cpu is a percentage or fraction, scale it by a representative machine capacity
            # For simplicity, let's assume a nominal machine capacity of 1 unit of CPU
            # The 'active_machines' in the original data is the actual number of machines,
            # which we can use to estimate total actual load
            estimated_total_cpu_load = row['avg_cpu'] * row['active_machines']


            # Calculate utilization based on simulated capacity
            utilization = estimated_total_cpu_load / self.current_capacity if self.current_capacity > 0 else 0

            # Reward components
            # Penalize cost based on the number of active machines managed by the agent
            cost_penalty = self.current_capacity * self.cost_per_machine_per_minute

            # Penalize high utilization (SLA violation)
            sla_penalty = 0
            if utilization > self.sla_threshold:
                sla_penalty = (utilization - self.sla_threshold) * self.sla_penalty_weight

            # Reward for being close to target utilization (optional, can use negative penalty for deviation)
            # Using negative absolute difference to penalize deviation from target
            util_deviation_penalty = -abs(utilization - self.target_utilization)


            # Total reward (example combination)
            # We want to minimize cost and SLA violations, and ideally stay near target utilization
            # A common approach is to use negative rewards for penalties and positive for goals.
            # Here, we'll use negative for all components we want to minimize/avoid.
            reward = -cost_penalty - sla_penalty + util_deviation_penalty


        # Get next observation
        next_obs = self._get_obs() if not done else None

        # Additional info (optional)
        info = {
            'current_capacity': self.current_capacity,
            'utilization': utilization, # Report utilization for the step
            'estimated_total_cpu_load': estimated_total_cpu_load if not done else 0,
            'reward_components': { # Optional: include components for debugging
                'cost_penalty': -cost_penalty,
                'sla_penalty': -sla_penalty,
                'util_deviation_penalty': util_deviation_penalty
            }
        }


        return next_obs, reward, done, info


    def reset(self):
        self.current_step = 0
        self.current_capacity = self.initial_capacity
        self.cooldown = 0
        self.state = self._get_obs()
        return self.state

    def render(self, mode='human'):
        # Implement rendering if needed
        pass

    def close (self):
        pass

  return datetime.utcnow().replace(tzinfo=utc)


In [None]:
# Example usage:
# Make sure to run the preceding cells that define the 'df_usage' DataFrame first.
env = AutoScalingEnv(df_usage)
obs = env.reset()
print("Initial Observation:", obs)

# # Take a sample action (e.g., hold)
action = 1 # Scale up example
next_obs, reward, done, info = env.step(action)
print("Next Observation:", next_obs, "Reward:", reward, "Done:", done, "Info:", info)

Initial Observation: [6.6225836e-03 4.9124165e-03 1.0000000e+01]
Next Observation: [3.2542923e-03 2.7331815e-03 1.0000000e+01] Reward: -60.80811775319608 Done: False Info: {'current_capacity': 10, 'utilization': np.float64(6.308010704836007), 'estimated_total_cpu_load': np.float64(63.08010704836007), 'reward_components': {'cost_penalty': -0.02, 'sla_penalty': np.float64(-55.08010704836007), 'util_deviation_penalty': np.float64(-5.708010704836007)}}
