# Environments

In reinforcement learning, agents interact with environments to improve their performance through trial and error. This tutorial explores how Tianshou handles environments, from basic single-environment setups to advanced vectorized and parallel configurations.

<div style="text-align: center; padding: 1rem;">
<img src="../_static/images/rl-loop.jpg" style="width: 60%; padding-bottom: 1rem;"><br>
The agent-environment interaction loop
</div>

Tianshou maintains full compatibility with the [Gymnasium](https://gymnasium.farama.org/) API (formerly OpenAI Gym), making it easy to use any Gymnasium-compatible environment.

## The Bottleneck Problem

In a standard Gymnasium environment, each interaction follows a sequential pattern:

1. Agent selects an action
2. Environment processes the action and returns observation and reward
3. Repeat

This sequential process can become a significant bottleneck in deep reinforcement learning experiments, especially when:
- The environment simulation is computationally intensive
- Network training is fast but data collection is slow
- You have multiple CPU cores available but aren't using them

Tianshou addresses this bottleneck through **vectorized environments**, which allow parallel sampling across multiple CPU cores.

## Vectorized Environments

Vectorized environments enable you to run multiple environment instances in parallel, dramatically accelerating data collection. Let's see this in action.

In [19]:
import time

import gymnasium as gym
import numpy as np

from tianshou.env import DummyVectorEnv, SubprocVectorEnv

### Performance Comparison

Let's compare the sampling speed with different numbers of parallel environments:

In [20]:
num_cpus = [1, 2, 5]

for num_cpu in num_cpus:
    # Create vectorized environment with multiple processes
    env = SubprocVectorEnv([lambda: gym.make("CartPole-v1") for _ in range(num_cpu)])
    env.reset()

    sampled_steps = 0
    time_start = time.time()

    # Sample 1000 steps
    while sampled_steps < 1000:
        act = np.random.choice(2, size=num_cpu)
        obs, rew, terminated, truncated, info = env.step(act)

        # Reset terminated environments
        if np.sum(terminated):
            env.reset(np.where(terminated)[0])

        sampled_steps += num_cpu

    time_used = time.time() - time_start
    print(f"Sampled 1000 steps in {time_used:.3f}s using {num_cpu} CPU(s)")
    print(f"  → Speed: {1000 / time_used:.1f} steps/second")

Sampled 1000 steps in 0.235s using 1 CPU(s)
  → Speed: 4249.3 steps/second
Sampled 1000 steps in 0.138s using 2 CPU(s)
  → Speed: 7272.0 steps/second
Sampled 1000 steps in 0.068s using 5 CPU(s)
  → Speed: 14705.6 steps/second


### Understanding the Results

You might notice that the speedup isn't perfectly linear with the number of CPUs. Several factors contribute to this:

1. **Straggler Effect**: In synchronous mode, all environments must complete before the next batch begins. Slower environments hold back faster ones.
2. **Communication Overhead**: Inter-process communication has costs, especially for fast environments.
3. **Environment Complexity**: For simple environments like CartPole, the overhead may outweigh the benefits.

> **Important**: `SubprocVectorEnv` should only be used when environment execution is slow. For simple, fast environments like CartPole, `DummyVectorEnv` (or even raw Gymnasium environments) can be more efficient because they avoid both the straggler effect and inter-process communication overhead.

## Types of Vectorized Environments

Tianshou provides several vectorized environment implementations, each optimized for different scenarios:

### 1. DummyVectorEnv
**Pseudo-parallel simulation using a for-loop**
- Best for: Simple/fast environments, debugging
- Pros: No overhead, deterministic execution
- Cons: No actual parallelization

### 2. SubprocVectorEnv
**Multiple processes for true parallel simulation**
- Best for: Most parallel simulation scenarios
- Pros: True parallelization, good balance
- Cons: Inter-process communication overhead

### 3. ShmemVectorEnv
**Shared memory optimization of SubprocVectorEnv**
- Best for: Environments with large observations (e.g., images)
- Pros: Reduced memory footprint, faster for large states
- Cons: More complex implementation

### 4. RayVectorEnv
**Ray-based distributed simulation**
- Best for: Cluster computing with multiple machines
- Pros: Scales to multiple machines
- Cons: Requires Ray installation and setup

All these classes share the same API through their base class `BaseVectorEnv`, making it easy to switch between them.

## Basic Usage

### Creating a Vectorized Environment

In [21]:
# Standard Gymnasium environment
gym_env = gym.make("CartPole-v1")


# Tianshou vectorized environment
def create_cartpole_env() -> gym.Env:
    return gym.make("CartPole-v1")


# Create 5 parallel environments
vector_env = DummyVectorEnv([create_cartpole_env for _ in range(5)])

print(f"Created vectorized environment with {vector_env.env_num} environments")

Created vectorized environment with 5 environments


### Environment Interaction

The key difference from standard Gymnasium is that actions, observations, and rewards are all vectorized:

In [22]:
# Standard Gymnasium: reset() returns a single observation
print("Standard Gymnasium reset:")
single_obs, info = gym_env.reset()
print(f"  Shape: {single_obs.shape}")
print(f"  Value: {single_obs}")

print("\n" + "=" * 50 + "\n")

# Vectorized environment: reset() returns stacked observations
print("Vectorized environment reset:")
vector_obs, info = vector_env.reset()
print(f"  Shape: {vector_obs.shape}")
print(f"  Value:\n{vector_obs}")

Standard Gymnasium reset:
  Shape: (4,)
  Value: [-0.01712769  0.01610273 -0.0433123  -0.00370169]


Vectorized environment reset:
  Shape: (5, 4)
  Value:
[[-0.04719098  0.04896798 -0.00020475  0.03126983]
 [-0.00214985  0.02361996  0.03003382  0.03810668]
 [-0.03148088 -0.01171132  0.037396    0.00564856]
 [ 0.00979226 -0.00392394 -0.0263445  -0.04695772]
 [-0.01733919  0.04152646 -0.00372542  0.04193886]]


### Taking Vectorized Steps

In [23]:
# Take random actions in all environments
actions = np.random.choice(2, size=vector_env.env_num)
obs, rew, terminated, truncated, info = vector_env.step(actions)

print(f"Actions taken: {actions}")
print(f"Rewards received: {rew}")
print(f"Terminated flags: {terminated}")
print("Info", info)

Actions taken: [1 0 0 0 1]
Rewards received: [1. 1. 1. 1. 1.]
Terminated flags: [False False False False False]
Info [{'env_id': 0} {'env_id': 1} {'env_id': 2} {'env_id': 3} {'env_id': 4}]


### Selective Environment Execution

You can interact with specific environments using the `id` parameter:

In [24]:
# Execute only environments 0, 1, and 3
selected_actions = np.random.choice(2, size=3)
obs, rew, terminated, truncated, info = vector_env.step(selected_actions, id=[0, 1, 3])

print("Executed actions in environments [0, 1, 3]")
print(f"Received {len(rew)} results")

Executed actions in environments [0, 1, 3]
Received 3 results


## Parallel Sampling: Synchronous vs Asynchronous

### Synchronous Mode (Default)

By default, vectorized environments operate synchronously: a step completes only after **all** environments finish their step. This works well when all environments take roughly the same time per step.

### Asynchronous Mode

When environment step times vary significantly (e.g., 90% of steps take 1s, but 10% take 10s), asynchronous mode can help. It allows faster environments to continue without waiting for slower ones.

<div style="text-align: center; padding: 1rem;">
<img src="../_static/images/async.png" style="width: 70%; padding-bottom: 1rem;"><br>
Comparison of synchronous and asynchronous vectorized environments<br>
(Steps with the same color are processed together)
</div>

### Enabling Asynchronous Mode

Use the `wait_num` or `timeout` parameters (or both):

In [28]:
from functools import partial


# Create environments with varying step times
class SlowEnv(gym.Env):
    """Environment with variable step duration."""

    def __init__(self, sleep_time):
        self.sleep_time = sleep_time
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(4,))
        self.action_space = gym.spaces.Discrete(2)
        super().__init__()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        return np.random.rand(4), {}

    def step(self, action):
        time.sleep(self.sleep_time)  # Simulate slow computation
        return np.random.rand(4), 0.0, False, False, {}


# Create async vectorized environment
env_fns = [partial(SlowEnv, sleep_time=0.01 * i) for i in [1, 2, 3, 4]]
async_env = SubprocVectorEnv(env_fns, wait_num=3, timeout=0.1)

print("Asynchronous environment created")
print("  wait_num=3: Returns after 3 environments complete")
print("  timeout=0.1: Or after 0.1 seconds, whichever comes first")

Asynchronous environment created
  wait_num=3: Returns after 3 environments complete
  timeout=0.1: Or after 0.1 seconds, whichever comes first


### How Async Parameters Work

- **`wait_num`**: Minimum number of environments to wait for (e.g., `wait_num=3` means each step returns results from at least 3 environments)
- **`timeout`**: Maximum time to wait in seconds (acts as a dynamic `wait_num`—returns whatever is ready after timeout)
- If no environment finishes within the timeout, the system waits until at least one completes

> **Warning**: Asynchronous collectors can cause exceptions when used as `test_collector` in trainers. Always use synchronous mode for test collectors.

## EnvPool Integration

[EnvPool](https://github.com/sail-sg/envpool/) is a C++-based vectorized environment library that provides significant performance improvements over Python-based solutions. Tianshou fully supports EnvPool with minimal code changes.

### Why EnvPool?

- **Performance**: 10x-100x faster than standard vectorized environments for supported environments
- **Memory Efficient**: Optimized memory usage through shared buffers
- **Drop-in Replacement**: Nearly identical API to Tianshou's vectorized environments

### Supported Environments

EnvPool currently supports:
- Atari games
- MuJoCo physics simulations
- VizDoom 3D environments
- Classic control environments
- Toy text environments

### Using EnvPool

First, install EnvPool:

```bash
pip install envpool
```

Then use it directly with Tianshou:

In [None]:
# Uncomment to install: !pip install envpool

# NOTE: envpool is only supported on Linux
import platform

if platform.system() != "Linux":
    print("EnvPool is only supported on Linux. Skipping EnvPool example.")
else:
    import envpool

    # Create EnvPool vectorized environment
    envs = envpool.make_gymnasium("CartPole-v1", num_envs=10)

    # Use directly with Tianshou collector
    # collector = Collector(policy, envs, buffer)

    print(f"Created EnvPool environment with {envs.spec.config.num_envs} environments")
    print("Ready to use with Tianshou collectors!")

### EnvPool Examples

For complete examples of using EnvPool with Tianshou:
- [Atari with EnvPool](https://github.com/thu-ml/tianshou/tree/master/examples/atari#envpool)
- [MuJoCo with EnvPool](https://github.com/thu-ml/tianshou/tree/master/examples/mujoco#envpool)
- [VizDoom with EnvPool](https://github.com/thu-ml/tianshou/tree/master/examples/vizdoom#envpool)
- [More EnvPool Examples](https://github.com/sail-sg/envpool/tree/master/examples/tianshou_examples)

## Custom Environments and State Representations

Tianshou works seamlessly with custom environments as long as they follow the Gymnasium API. Let's explore how to handle different state representations.

### Required Gymnasium API

Your custom environment must implement:

```python
class MyEnv(gym.Env):
    def reset(self, seed=None, options=None) -> Tuple[observation, info]:
        """Reset environment to initial state."""
        pass
    
    def step(self, action) -> Tuple[observation, reward, terminated, truncated, info]:
        """Execute one step in the environment."""
        pass
    
    def seed(self, seed: int) -> List[int]:
        """Set random seed."""
        pass
    
    def render(self, mode='human') -> Any:
        """Render the environment."""
        pass
    
    def close(self) -> None:
        """Clean up resources."""
        pass
    
    # Required spaces
    observation_space: gym.Space
    action_space: gym.Space
```

> **Important**: Make sure your `seed()` method is implemented correctly:
> ```python
> def seed(self, seed):
>     np.random.seed(seed)
>     # Also seed other random generators used in your environment
> ```
> Without proper seeding, parallel environments may produce identical outputs!

### Dictionary Observations

Many environments return observations as dictionaries rather than simple arrays. Tianshou's `Batch` class handles this elegantly.

Example with the FetchReach environment:

In [None]:
from tianshou.data import Batch, ReplayBuffer

# Example: Creating a mock observation similar to FetchReach
observation = {
    "observation": np.array([1.34, 0.75, 0.53, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0, 0.0]),
    "achieved_goal": np.array([1.34, 0.75, 0.53]),
    "desired_goal": np.array([1.24, 0.78, 0.63]),
}

# Store in replay buffer
buffer = ReplayBuffer(size=10)
buffer.add(Batch(obs=observation, act=0, rew=0.0, terminated=False, truncated=False))

print("Stored observation structure:")
print(buffer.obs)

### Accessing Dictionary Observations

When sampling from the buffer, you can access nested dictionary values in multiple ways:

In [None]:
# Sample a batch
batch, indices = buffer.sample(batch_size=1)

print("Batch keys:", list(batch.keys()))
print("\nAccessing nested observation:")

# Recommended way: access through batch first
print("batch.obs.desired_goal[0]:", batch.obs.desired_goal[0])

# Alternative ways (not recommended)
print("batch.obs[0].desired_goal:", batch.obs[0].desired_goal)
print("batch[0].obs.desired_goal:", batch[0].obs.desired_goal)

### Using Dictionary Observations in Networks

When designing networks for environments with dictionary observations:

In [None]:
import torch
import torch.nn as nn


class CustomNetwork(nn.Module):
    """Network that processes dictionary observations."""

    def __init__(self, obs_dim, goal_dim, hidden_dim, action_dim):
        super().__init__()

        # Separate processing for different observation components
        self.obs_encoder = nn.Linear(obs_dim, hidden_dim)
        self.goal_encoder = nn.Linear(goal_dim * 2, hidden_dim)  # achieved + desired

        # Combined processing
        self.fc = nn.Sequential(
            nn.Linear(hidden_dim * 2, hidden_dim), nn.ReLU(), nn.Linear(hidden_dim, action_dim)
        )

    def forward(self, obs_batch, **kwargs):
        # Extract components from the batch
        observation = obs_batch.observation
        achieved_goal = obs_batch.achieved_goal
        desired_goal = obs_batch.desired_goal

        # Process each component
        obs_feat = self.obs_encoder(observation)
        goal_feat = self.goal_encoder(torch.cat([achieved_goal, desired_goal], dim=-1))

        # Combine and output
        combined = torch.cat([obs_feat, goal_feat], dim=-1)
        return self.fc(combined)


# Example usage
net = CustomNetwork(obs_dim=10, goal_dim=3, hidden_dim=64, action_dim=4)
print("Network created for dictionary observations")
print("  Input: observation (10D) + achieved_goal (3D) + desired_goal (3D)")
print("  Output: actions (4D)")

### Custom Object States

For more complex state representations (e.g., graphs, custom objects), Tianshou stores references in numpy arrays. However, you must ensure deep copies to avoid state aliasing:

In [None]:
import copy

import networkx as nx


class GraphEnv(gym.Env):
    """Example environment with graph-based states."""

    def __init__(self):
        super().__init__()
        self.graph = nx.Graph()
        self.action_space = gym.spaces.Discrete(5)
        self.observation_space = gym.spaces.Box(low=0, high=1, shape=(10,))  # for compatibility

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.graph = nx.erdos_renyi_graph(10, 0.3)
        # IMPORTANT: Return deep copy to avoid reference issues
        return copy.deepcopy(self.graph), {}

    def step(self, action):
        # Modify graph based on action
        if action < 4 and len(self.graph.nodes) > 0:
            nodes = list(self.graph.nodes)
            if len(nodes) >= 2:
                self.graph.add_edge(nodes[0], nodes[1])

        # IMPORTANT: Return deep copy
        return copy.deepcopy(self.graph), 0.0, False, False, {}


# Test storing graph objects
graph_buffer = ReplayBuffer(size=5)
env = GraphEnv()
obs, _ = env.reset()
graph_buffer.add(Batch(obs=obs, act=0, rew=0.0, terminated=False, truncated=False))

print("Graph objects stored in buffer:")
print(graph_buffer.obs)

> **Important**: When using custom objects as states:
> 1. Always return `copy.deepcopy(state)` in both `reset()` and `step()`
> 2. Ensure the object is numpy-compatible: `np.array([your_object])` should not result in an empty array
> 3. The object may be stored as a shallow copy in the buffer—deep copying prevents state aliasing

## Best Practices Summary

### Choosing the Right Environment Wrapper

| Scenario | Recommended Wrapper | Why |
|----------|-------------------|-----|
| Simple/fast environments | `DummyVectorEnv` or raw Gym | Minimal overhead |
| Most parallel scenarios | `SubprocVectorEnv` | Good balance of speed and simplicity |
| Large observations (images) | `ShmemVectorEnv` | Optimized memory usage |
| Multi-machine clusters | `RayVectorEnv` | Distributed computing support |
| Maximum performance | EnvPool | C++-based, 10x-100x speedup |

### Performance Tips

1. **Profile First**: Measure whether environment or training is your bottleneck before optimizing
2. **Start Simple**: Begin with `DummyVectorEnv` for debugging, then upgrade to parallel versions
3. **Use EnvPool**: If your environment is supported, EnvPool offers the best performance
4. **Async for Variable Times**: Use asynchronous mode only when environment step times vary significantly
5. **Proper Seeding**: Always implement the `seed()` method correctly in custom environments

### Common Pitfalls

- ❌ Using `SubprocVectorEnv` for fast environments → Use `DummyVectorEnv` instead
- ❌ Forgetting to deep-copy custom states → States will be aliased in the buffer
- ❌ Not implementing `seed()` properly → Parallel environments produce identical results
- ❌ Using async collectors for testing → Causes exceptions in trainers
- ❌ Assuming linear speedup → Account for communication overhead and straggler effects

## Further Reading

- **Tianshou Documentation**: [Environment API Reference](https://tianshou.org/en/master/03_api/env/venvs.html)
- **EnvPool**: [Official Documentation](https://envpool.readthedocs.io/)
- **Gymnasium**: [Environment Creation Tutorial](https://gymnasium.farama.org/tutorials/gymnasium_basics/environment_creation/)
- **Ray**: [Distributed RL with Ray](https://docs.ray.io/en/latest/rllib/index.html)