In [6]:
import os
import torch
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
import plotly.subplots as sp
import wandb

def visualize_distributions(self, num_samples=10000):
    if not os.path.exists(self.train_dir):
        os.makedirs(self.train_dir)
    
    with torch.no_grad():
        # Access mean and std for qy0 and py0, convert to numpy
        mean_q = self.qy0_mean.cpu().detach().numpy()
        std_q = torch.clamp(self.qy0_std, min=1e-6).cpu().detach().numpy()
        mean_p = self.py0_mean.cpu().detach().numpy()
        std_p = torch.clamp(self.py0_std, min=1e-6).cpu().detach().numpy()
    
    # Generate samples from normal distributions using mean and std in numpy
    qy0_samples = np.random.normal(mean_q, std_q, size=(num_samples, mean_q.shape[0]))
    py0_samples = np.random.normal(mean_p, std_p, size=(num_samples, mean_p.shape[0]))
    
    # Assume dimensions from the shape of samples
    num_dims = qy0_samples.shape[1]

    # Create subplots for each dimension
    fig = sp.make_subplots(rows=num_dims, cols=1, subplot_titles=[f'Distribution of Dimension {i+1}' for i in range(num_dims)])
    
    for i in range(num_dims):
        # Create histograms for qy0 and py0 samples for each dimension
        qy0_hist = np.histogram(qy0_samples[:, i], bins=50, density=True)
        py0_hist = np.histogram(py0_samples[:, i], bins=50, density=True)
        
        qy0_hist_x = (qy0_hist[1][1:] + qy0_hist[1][:-1]) / 2  # Bin centers
        py0_hist_x = (py0_hist[1][1:] + py0_hist[1][:-1]) / 2  # Bin centers
        
        # Add histogram bars to the figure
        fig.add_trace(go.Bar(x=qy0_hist_x, y=qy0_hist[0], marker_color='blue', opacity=0.75, name='qy0 (Posterior)', showlegend=(i == 0)), row=i+1, col=1)
        fig.add_trace(go.Bar(x=py0_hist_x, y=py0_hist[0], marker_color='red', opacity=0.75, name='py0 (Prior)', showlegend=(i == 0)), row=i+1, col=1)
        
        # Add line plots to represent the distribution
        fig.add_trace(go.Scatter(x=qy0_hist_x, y=qy0_hist[0], mode='lines', line=dict(color='blue'), name='qy0 (Posterior)', showlegend=False), row=i+1, col=1)
        fig.add_trace(go.Scatter(x=py0_hist_x, y=py0_hist[0], mode='lines', line=dict(color='red'), name='py0 (Prior)', showlegend=False), row=i+1, col=1)

    # Update layout for better spacing
    fig.update_layout(height=300*num_dims, width=800, title_text="Distribution Comparison", showlegend=True)
    
    # Save the plot as an HTML file
    plot_path = os.path.join(self.train_dir, 'distribution_comparison.html')
    fig.write_html(plot_path)

    # Log to wandb if required
    if self.log_wandb:
        wandb.log({"Distribution Comparison": wandb.Html(plot_path)})

    # Show the figure (optional, if you want to see the plot immediately in a Jupyter notebook)
    fig.show()

# Mock class and method call for demonstration
class MockClass:
    train_dir = './train_dir'
    log_wandb = False
    device = 'cpu'
    qy0_mean = torch.zeros(14)  # Example means
    qy0_std = torch.ones(14)  # Example std devs
    py0_mean = torch.ones(14)  # Example means
    py0_std = torch.ones(14)  # Example std devs

    visualize_distributions = visualize_distributions

# Instantiate and call the method
mock_instance = MockClass()
mock_instance.visualize_distributions()

In [13]:
import plotly.graph_objects as go
from plotly.subplots import make_subplots
import torch
import numpy as np

class MockModel:
    def __init__(self, train_dir=".", log_wandb=False):
        self.train_dir = train_dir
        self.log_wandb = log_wandb

    def plot_trajectories(self, predicted_traj, true_traj, k=3):
        idx = np.random.choice(predicted_traj.shape[0], k, replace=False)
        num_dims = true_traj.shape[2]

        fig = make_subplots(rows=k, cols=1, subplot_titles=[f'Batch {i+1}' for i in range(k)])

        for i, b in enumerate(idx):
            for d in range(num_dims):
                pred_samples = predicted_traj[b, :, :, d].detach().cpu().numpy()
                true_values = true_traj[b, :, d].detach().cpu().numpy()
                lower_bound = np.min(pred_samples, axis=0)
                upper_bound = np.max(pred_samples, axis=0)
                times = np.arange(pred_samples.shape[1])

                # Concatenate for plotly area plot (shaded region for prediction bounds)
                fig.add_trace(
                    go.Scatter(
                        x=np.concatenate([times, times[::-1]]),
                        y=np.concatenate([upper_bound, lower_bound[::-1]]),
                        fill='toself',
                        fillcolor=f'rgba(0,100,80,{0.2 if d == 0 else 0.1})',  # vary opacity by dimension
                        line=dict(color='rgba(255,255,255,0)'),
                        showlegend=False,
                        legendgroup=f'Batch {b}',
                        name=f'Bounds Dim {d+1}'
                    ),
                    row=i+1, col=1
                )

                # True trajectory
                fig.add_trace(
                    go.Scatter(
                        x=times, y=true_values, mode='lines+markers',
                        line=dict(color='blue', width=2),
                        marker=dict(size=4, color='red'),
                        name=f'True Trajectory Dim {d+1}',
                        legendgroup=f'Batch {b}',
                    ),
                    row=i+1, col=1
                )

        fig.update_layout(
            height=300*k,  # Adjust height based on the number of batches
            title=f'Trajectories Comparison Across Batches',
            showlegend=True
        )
        fig.show()

# Assuming the instantiation and method call are done as before

# Assuming the instantiation and method call are done as before

# Generate mock data
batch, samples, times, dim = 5, 10, 100, 2  # Configuration
t = torch.linspace(0, 6 * torch.pi, times)  # Time axis

# Reshape t for broadcasting
t_reshaped = torch.sin(t).unsqueeze(0).unsqueeze(-1).repeat(batch, 1, dim)  # Adjusted reshaping for true_traj
t_reshaped_pred = torch.cos(t).unsqueeze(0).unsqueeze(0).unsqueeze(-1)

predicted_traj = t_reshaped_pred + 0.1 * torch.randn(batch, samples, times, dim)  # Small noise around true values
true_traj = t_reshaped  # Simplified true_traj assignment

# Example usage
model = MockModel(train_dir='your_directory', log_wandb=False)
model.plot_trajectories(predicted_traj, true_traj, k=3)

In [12]:
import numpy as np
import plotly.graph_objects as go
from plotly.subplots import make_subplots

def plot_trajectories(self, predicted_traj, true_traj, k=3):
    idx = np.random.choice(predicted_traj.shape[0], k, replace=False)
    num_dims = true_traj.shape[2]

    fig = make_subplots(rows=k, cols=1, subplot_titles=[f'Batch {i+1}' for i in range(k)])

    for i, b in enumerate(idx):
        for d in range(num_dims):
            pred_samples = predicted_traj[b, :, :, d].detach().cpu().numpy()
            true_values = true_traj[b, :, d].detach().cpu().numpy()
            times = np.arange(pred_samples.shape[1])
            mean_pred = np.mean(pred_samples, axis=0)

            # Add individual predicted trajectories with lighter opacity
            for sample in pred_samples:
                fig.add_trace(
                    go.Scatter(
                        x=times, y=sample, mode='lines',
                        line=dict(color=f'rgba(255, 182, 193, 0.1)', width=1),
                        showlegend=False,
                        legendgroup=f'Batch {b}',
                        name=f'Predicted Trajectory Dim {d+1}'
                    ),
                    row=i+1, col=1
                )

            # Add mean predicted trajectory with darker opacity
            fig.add_trace(
                go.Scatter(
                    x=times, y=mean_pred, mode='lines',
                    line=dict(color=f'rgba(255, 105, 180, 1)', width=2),
                    name=f'Mean Predicted Trajectory Dim {d+1}',
                    legendgroup=f'Batch {b}',
                ),
                row=i+1, col=1
            )

            # Add true trajectory
            fig.add_trace(
                go.Scatter(
                    x=times, y=true_values, mode='lines+markers',
                    line=dict(color=f'rgba(0, 255, 255, 1)', width=2),
                    marker=dict(size=4, color='blue'),
                    name=f'True Trajectory Dim {d+1}',
                    legendgroup=f'Batch {b}',
                ),
                row=i+1, col=1
            )

    fig.update_layout(
        height=300*k,  # Adjust height based on the number of batches
        title='Trajectories Comparison Across Batches',
        showlegend=True
    )
    fig.show()

# Mock class and method call for demonstration
class MockClass:
    def __init__(self):
        self.device = 'cpu'

    plot_trajectories = plot_trajectories

# Instantiate and call the method
mock_instance = MockClass()

# Create some mock data for testing
num_batches = 10
time_steps = 50
num_samples = 10
num_dims = 3

predicted_traj = torch.randn((num_batches, num_samples, time_steps, num_dims))
true_traj = torch.randn((num_batches, time_steps, num_dims))

mock_instance.plot_trajectories(predicted_traj, true_traj, k=3)