In [10]:
import numpy as np
import PIL.ImageDraw

import tensorflow as tf

# Create a configuration object
config = tf.compat.v1.ConfigProto()
config.gpu_options.allow_growth = True

# Start a session with the configuration
sess = tf.compat.v1.Session(config=config)

import tensorflow_probability as tfp
from acme import wrappers
import imageio

from flybody.fly_envs import (
    flight_imitation,
    walk_imitation,
    vision_guided_flight,
    walk_on_ball,
)
from flybody.basic_rodent_2020 import rodent_run_gaps, rodent_escape_bowl, rodent_maze_forage, rodent_two_touch

from flybody.tasks.task_utils import (
    get_random_policy,
    real2canonical,
)
from flybody.agents.utils_tf import TestPolicyWrapper
from flybody.utils import (
    display_video,
    rollout_and_render
)

# Helper Function
def blow(x, repeats=2):
    """Repeat columns and rows requested number of times."""
    return np.repeat(np.repeat(x, repeats, axis=0), repeats, axis=1)


def vision_rollout_and_render(env, policy, camera_id=1,
                              eye_blow_factor=5, **render_kwargs):
    """Run vision-guided flight episode and render frames, including eyes."""
    frames = []
    timestep = env.reset()
    # Run full episode until it ends.
    i=0
    while timestep.step_type != 2 and i < 500:
        i+=1
        # Render eyes and scene.
        pixels = env.physics.render(camera_id=camera_id, **render_kwargs)
        eyes = eye_pixels_from_observation(
            timestep, blow_factor=eye_blow_factor)
        # Add eye pixels to scene.
        pixels[0:eyes.shape[0], 0:eyes.shape[1], :] = eyes
        frames.append(pixels)
        # Step environment.
        action = policy(timestep.observation)
        timestep = env.step(action)
    return frames


def eye_pixels_from_observation(timestep, blow_factor=4):
    """Get current eye view from timestep.observation."""
    # In the actual task, the averaging over axis=-1 is done by the visual
    # network as a pre-processing step, so effectively the visual observations
    # are gray-scale.
    egocentric_camera = timestep.observation['walker/egocentric_camera'].mean(axis=-1)

    pixels = egocentric_camera
    pixels = np.tile(pixels[:, :, None], reps=(1, 1, 3))
    pixels = blow(pixels, blow_factor)
    pixels = pixels.astype('uint8')
    return pixels


def eye_pixels_from_cameras(physics, **render_kwargs):
    """Render two-eye view, assuming eye cameras have particular names."""
    for i in range(physics.model.ncam):
        name = physics.model.id2name(i, 'camera')
        if 'eye_left' in name:
            left_eye = physics.render(camera_id=i, **render_kwargs)
        if 'eye_right' in name:
            right_eye = physics.render(camera_id=i, **render_kwargs)
    pixels = np.hstack((left_eye, right_eye))
    return pixels


# Frame width and height for rendering.
render_kwargs = {'width': 640, 'height': 480}

In [11]:
env = rodent_escape_bowl()
env = wrappers.SinglePrecisionWrapper(env)
env = wrappers.CanonicalSpecWrapper(env, clip=False)

policy_path = "/root/vast/scott-yang/flybody/training/ray-rodent-escape-bowl-ckpts/3a455682-5372-11ef-a703-2ae1cc4cdff5/snapshots/policy-1"
walking_policy = tf.saved_model.load(policy_path)
walking_policy = TestPolicyWrapper(walking_policy)

frames = vision_rollout_and_render(env, walking_policy, camera_id=2, eye_blow_factor=3, **render_kwargs)
display_video(frames)

In [5]:
with imageio.get_writer("./videos/from_bowl_escape/run_gaps_from_bowl_escape-2.mp4", fps=30) as video:
    for f in frames:
        video.append_data(f)

In [15]:
env = rodent_two_touch()
env = wrappers.SinglePrecisionWrapper(env)
env = wrappers.CanonicalSpecWrapper(env, clip=False)

policy_path = "/root/vast/scott-yang/training/ray-rodent-general-ckpts/7c96a9ce-4f9d-11ef-8cec-2ae1cc4cdff5/snapshots/policy-80"
walking_policy = tf.saved_model.load(policy_path)
walking_policy = TestPolicyWrapper(walking_policy)

frames = vision_rollout_and_render(env, walking_policy, camera_id=2, eye_blow_factor=3, **render_kwargs)
display_video(frames)

with imageio.get_writer("two_touch_genearlist_policy_80.mp4", fps=30) as video:
    for f in frames:
        video.append_data(f)

In [16]:
env = rodent_run_gaps()
env = wrappers.SinglePrecisionWrapper(env)
env = wrappers.CanonicalSpecWrapper(env, clip=False)

policy_path = "/root/vast/scott-yang/training/ray-rodent-general-ckpts/7c96a9ce-4f9d-11ef-8cec-2ae1cc4cdff5/snapshots/policy-80"
walking_policy = tf.saved_model.load(policy_path)
walking_policy = TestPolicyWrapper(walking_policy)

frames = vision_rollout_and_render(env, walking_policy, camera_id=2, eye_blow_factor=3, **render_kwargs)
display_video(frames)

with imageio.get_writer("run_gaps_genearlist_policy_80.mp4", fps=30) as video:
    for f in frames:
        video.append_data(f)

: 

In [2]:
import tensorflow as tf

# ... (Your model definition or optimizer setup)

# Specify the checkpoint directory
checkpoint_dir = "/root/vast/scott-yang/flybody/training/ray-rodent-general-ckpts/feca92ee-5357-11ef-887b-2ae1cc4cdff5/checkpoints/dmpo_learner"  # Adjust if needed

# Create the checkpoint object
checkpoint = tf.train.Checkpoint(model=your_model, optimizer=your_optimizer)  # Adjust to your objects

# Restore from the latest checkpoint
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
checkpoint.restore(latest_checkpoint)

# Access variables by name (example)
var_value = your_model.trainable_variables[0].numpy()
print(f"Variable value: {var_value}")

NameError: name 'your_model' is not defined