# U-Shaped Automated Container Terminal Scheduling Experiments

This notebook implements and experiments with the Digital Twin-driven real-time collaborative scheduling system for U-shaped automated container terminals based on the research paper.

In [None]:
# Import required libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from u_act_digital_twin import UACTEnvironment, PPOAgent, DigitalTwinVisualization, compare_scheduling_rules

# Set random seed for reproducibility
np.random.seed(42)

## 1. Environment Setup

Initialize the U-shaped automated container terminal environment with different configurations.

In [None]:
# Initialize environment
env_small = UACTEnvironment(num_containers=40, num_agvs=3)
env_medium = UACTEnvironment(num_containers=100, num_agvs=5)
env_large = UACTEnvironment(num_containers=200, num_agvs=10)

print("Environment initialized with:")
print(f"Small: {env_small.num_containers} containers, {env_small.num_agvs} AGVs")
print(f"Medium: {env_medium.num_containers} containers, {env_medium.num_agvs} AGVs")
print(f"Large: {env_large.num_containers} containers, {env_large.num_agvs} AGVs")

## 2. State Space Analysis

Examine the state representation of the environment.

In [None]:
# Get initial state
state = env_medium.get_state()

state_features = [
    "Num Import Containers", "Num Export Containers",
    "Mean Import CR", "Std Import CR",
    "Mean Export CR", "Std Export CR",
    "Mean Remaining Time", "Std Remaining Time",
    "Mean AGV Load", "Std AGV Load",
    "Mean YC Load", "Std YC Load",
    "Mean AGV Queue Length", "Std AGV Queue Length",
    "Mean AGV Wait Time", "Std AGV Wait Time",
    "Mean ET Queue Length", "Std ET Queue Length",
    "Mean ET Wait Time", "Std ET Wait Time"
]

state_df = pd.DataFrame({
    'Feature': state_features,
    'Value': state
})

print("Initial State Representation:")
print(state_df)

## 3. PPO Agent Training

Train the PPO agent for dynamic scheduling.

In [None]:
def train_agent(env, num_episodes=500):
    """Train PPO agent on given environment"""
    agent = PPOAgent(state_dim=env.state_dim, action_dim=6)
    metrics_history = []
    
    for episode in range(num_episodes):
        state = env.reset()
        episode_rewards = []
        
        for step in range(200):  # Max steps per episode
            action, _ = agent.get_action(state)
            next_state, reward, done, _ = env.step(action)
            
            episode_rewards.append(reward)
            state = next_state
            
            if done:
                break
        
        metrics_history.append({
            'episode': episode,
            'avg_reward': np.mean(episode_rewards),
            'makespan': env.makespan,
            'tci': env.terminal_congestion_index
        })
        
        if episode % 100 == 0:
            print(f"Episode {episode}: Reward = {np.mean(episode_rewards):.4f}")
    
    return agent, metrics_history

# Train on medium environment
print("Training PPO agent on medium-scale environment...")
trained_agent, training_metrics = train_agent(env_medium, num_episodes=500)

# Plot training progress
metrics_df = pd.DataFrame(training_metrics)

plt.figure(figsize=(12, 4))

plt.subplot(1, 3, 1)
plt.plot(metrics_df['episode'], metrics_df['avg_reward'])
plt.title('Average Reward')
plt.xlabel('Episode')
plt.ylabel('Reward')

plt.subplot(1, 3, 2)
plt.plot(metrics_df['episode'], metrics_df['makespan'])
plt.title('Makespan')
plt.xlabel('Episode')
plt.ylabel('Time')

plt.subplot(1, 3, 3)
plt.plot(metrics_df['episode'], metrics_df['tci'])
plt.title('Terminal Congestion Index')
plt.xlabel('Episode')
plt.ylabel('TCI (%)')

plt.tight_layout()
plt.show()

## 4. Scheduling Rule Comparison

Compare the performance of different scheduling rules.

In [None]:
# Compare scheduling rules
comparison_results = compare_scheduling_rules()

print("Scheduling Rule Comparison:")
print(comparison_results)

# Visualize comparison
fig, axes = plt.subplots(1, 3, figsize=(15, 5))

# Makespan comparison
axes[0].bar(comparison_results['Rule'], comparison_results['Makespan'], color='skyblue')
axes[0].set_title('Makespan Comparison')
axes[0].set_ylabel('Makespan')
axes[0].tick_params(axis='x', rotation=45)

# TCI comparison
axes[1].bar(comparison_results['Rule'], comparison_results['TCI'], color='lightcoral')
axes[1].set_title('Terminal Congestion Index Comparison')
axes[1].set_ylabel('TCI (%)')
axes[1].tick_params(axis='x', rotation=45)

# Total waiting time comparison
axes[2].bar(comparison_results['Rule'], comparison_results['Total Waiting'], color='lightgreen')
axes[2].set_title('Total Waiting Time Comparison')
axes[2].set_ylabel('Waiting Time')
axes[2].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

## 5. Impact of AGV Quantity

Analyze how the number of AGVs affects scheduling performance.

In [None]:
def analyze_agv_impact():
    """Analyze impact of AGV quantity on performance"""
    agv_counts = [3, 5, 8, 10]
    results = []
    
    for num_agvs in agv_counts:
        env = UACTEnvironment(num_containers=100, num_agvs=num_agvs)
        state = env.reset()
        
        # Run simulation with PPO agent
        for step in range(200):
            action = np.random.randint(0, 6)  # Random action for demonstration
            state, reward, done, _ = env.step(action)
            if done:
                break
        
        results.append({
            'Num_AGVs': num_agvs,
            'Makespan': env.makespan,
            'TCI': env.terminal_congestion_index,
            'Total_Waiting': env.total_waiting_time
        })
    
    return pd.DataFrame(results)

agv_impact_df = analyze_agv_impact()
print("AGV Quantity Impact Analysis:")
print(agv_impact_df)

# Plot results
fig, axes = plt.subplots(1, 2, figsize=(12, 5))

axes[0].plot(agv_impact_df['Num_AGVs'], agv_impact_df['Makespan'], 'o-', linewidth=2)
axes[0].set_xlabel('Number of AGVs')
axes[0].set_ylabel('Makespan')
axes[0].set_title('Impact of AGV Quantity on Makespan')
axes[0].grid(True, alpha=0.3)

axes[1].plot(agv_impact_df['Num_AGVs'], agv_impact_df['TCI'], 'o-', linewidth=2, color='red')
axes[1].set_xlabel('Number of AGVs')
axes[1].set_ylabel('TCI (%)')
axes[1].set_title('Impact of AGV Quantity on Congestion')
axes[1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

## 6. Digital Twin Visualization

Create comprehensive visualization of the digital twin environment.

In [None]:
# Create visualization
viz_env = UACTEnvironment(num_containers=50, num_agvs=5)
viz = DigitalTwinVisualization(viz_env)

# Run simulation and update visualization
metrics = []
state = viz_env.reset()

for step in range(100):
    action = np.random.randint(0, 6)
    state, reward, done, _ = viz_env.step(action)
    
    metrics.append({
        'makespan': viz_env.makespan,
        'tci': viz_env.terminal_congestion_index,
        'avg_reward': reward
    })
    
    if step % 20 == 0:
        viz.update_plots(metrics)
    
    if done:
        break

print("Visualization completed!")
print(f"Final Metrics: Makespan = {viz_env.makespan}, TCI = {viz_env.terminal_congestion_index:.2f}%")

## 7. Real-time Performance Monitoring

Monitor real-time performance metrics during scheduling.

In [None]:
def monitor_real_time_performance():
    """Monitor real-time performance metrics"""
    env = UACTEnvironment(num_containers=80, num_agvs=6)
    state = env.reset()
    
    metrics_over_time = []
    
    for step in range(150):
        action = np.random.randint(0, 6)
        state, reward, done, _ = env.step(action)
        
        # Collect metrics
        agv_utilization = np.mean([agv['load_rate'] for agv in env.agvs])
        yc_utilization = np.mean([yc['load_rate'] for yc in env.ycs])
        
        metrics_over_time.append({
            'step': step,
            'reward': reward,
            'agv_utilization': agv_utilization,
            'yc_utilization': yc_utilization,
            'completed_tasks': len(env.completed_tasks),
            'total_waiting': env.total_waiting_time
        })
        
        if done:
            break
    
    return pd.DataFrame(metrics_over_time)

real_time_metrics = monitor_real_time_performance()

# Plot real-time metrics
fig, axes = plt.subplots(2, 2, figsize=(12, 8))

axes[0, 0].plot(real_time_metrics['step'], real_time_metrics['reward'])
axes[0, 0].set_title('Real-time Reward')
axes[0, 0].set_xlabel('Time Step')
axes[0, 0].set_ylabel('Reward')
axes[0, 0].grid(True, alpha=0.3)

axes[0, 1].plot(real_time_metrics['step'], real_time_metrics['agv_utilization'], label='AGV Utilization')
axes[0, 1].plot(real_time_metrics['step'], real_time_metrics['yc_utilization'], label='YC Utilization')
axes[0, 1].set_title('Equipment Utilization')
axes[0, 1].set_xlabel('Time Step')
axes[0, 1].set_ylabel('Utilization Rate')
axes[0, 1].legend()
axes[0, 1].grid(True, alpha=0.3)

axes[1, 0].plot(real_time_metrics['step'], real_time_metrics['completed_tasks'])
axes[1, 0].set_title('Task Completion Progress')
axes[1, 0].set_xlabel('Time Step')
axes[1, 0].set_ylabel('Completed Tasks')
axes[1, 0].grid(True, alpha=0.3)

axes[1, 1].plot(real_time_metrics['step'], real_time_metrics['total_waiting'])
axes[1, 1].set_title('Total Waiting Time')
axes[1, 1].set_xlabel('Time Step')
axes[1, 1].set_ylabel('Waiting Time')
axes[1, 1].grid(True, alpha=0.3)

plt.tight_layout()
plt.show()