# Envlogger and TFDS

In [10]:
#@title Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

In [11]:
# python=3.10.12

In [12]:
#@title Install Pip packages
# !pip install tensorflow[and-cuda]==2.15.0
# !pip install rlds[tensorflow]
# !pip install envlogger[tfds]
# !apt-get install libgmp-dev
# !pip install numpy
# !pip install dm-env
# !pip install tqdm
# !pip install joblib

In [None]:
# pip install tensorflow[and-cuda]==2.13.1
# pip install rlds[tensorflow]
# pip install envlogger[tfds]
# sudo apt-get install libgmp-dev
# pip install numpy
# pip install dm-env
# pip install tqdm
# pip install joblib

In [13]:
#@title Imports
import os
import rlds
import envlogger
from envlogger.backends import rlds_utils
from envlogger.backends import tfds_backend_writer
from envlogger.testing import catch_env
import numpy as np
import tensorflow as tf
import tensorflow_datasets as tfds
import time
from typing import Optional, List

# Generate a dataset

In this example, we use the local TFDS backend.

In [14]:
#@title load custom data

from tqdm import tqdm
import joblib
from os import listdir
from os.path import isfile, join

load_dir = "../data/teleop"
exp = "pick_red"
version = "0.0.1" # debug 0.0.1 full 0.0.2
load_data_dir = os.path.join(load_dir, exp)
filenames = [f for f in listdir(load_data_dir) if isfile(join(load_data_dir, f)) and f.split('_')[0] == "traj" and f.split('.')[-1] == "gz"]

# maximum number of episodes to include per file (episodes will be stored in multiple files and then read as a single dataset).
max_episodes_per_shard = 1000
save_dir = "../data/rlds"
generate_data_dir = os.path.join(save_dir, exp, version)
os.makedirs(generate_data_dir, exist_ok=True)

num_episodes = len(filenames)
obs_keys = ['lowdim_ee', 'lowdim_qpos', '207322251049_rgb']
key = obs_keys[2]

In [15]:
#@title create pass through env

import dm_env
from dm_env import specs, TimeStep, StepType

class DataGym(dm_env.Environment):

  def __init__(self, obs):
    self.img = obs["207322251049_rgb"]
    self.joint_state = obs["lowdim_qpos"]
    self.state = obs["lowdim_ee"]
    
    self.timestep = 0

  def get_obs(self, timestep):
    obs_dict = {
      "image": self.img[self.timestep].astype(self.observation_spec()["image"].dtype),
      "joint_state": self.joint_state[self.timestep].astype(self.observation_spec()["joint_state"].dtype),
      "state": self.state[self.timestep].astype(self.observation_spec()["state"].dtype)
    }
    return obs_dict
  
  def reset(self):
    self.timestep = 0

    reward = 0.
    discount = 0.
    step_type = StepType.FIRST

    return TimeStep(step_type, reward, discount, self.get_obs(self.timestep))

  def step(self, act):
    self.timestep += 1

    reward = 0.
    discount = 0.
    step_type = StepType.MID if self.timestep < len(self.img)-1 else StepType.LAST

    return TimeStep(step_type, reward, discount, self.get_obs(self.timestep))

  def observation_spec(self):
    """Returns the observation spec."""
    return {
      "image": specs.BoundedArray(shape=self.img[0].shape, dtype=np.uint8, name="image", minimum=0, maximum=255),
      "joint_state": specs.BoundedArray(shape=self.joint_state[0].shape, dtype=np.float32, name="joint_state", minimum=-1., maximum=1.),
      "state": specs.BoundedArray(shape=self.state[0].shape, dtype=np.float32, name="state", minimum=-1., maximum=1.),
    }
  
  def action_spec(self):
    """Returns the action spec."""
    return specs.BoundedArray(
        shape=(7,), dtype=np.float32, name="action", minimum=-1., maximum=1.)
    # return {
    #   "world_vector": specs.BoundedArray(shape=(3,), dtype=np.float32, name="world_vector", minimum=-1., maximum=1.),
    #   "rotation_delta": specs.BoundedArray(shape=(3,), dtype=np.float32, name="rotation_delta", minimum=-1., maximum=1.),
    #   "open_gripper": specs.BoundedArray(shape=(1,), dtype=np.float32, name="open_gripper", minimum=-1., maximum=1.),
    # }
  
  # def preprocess_action(self, action):
  def preprocess_action(self, action, instruction):
    action_dict = {
      # "world_vector": action[:3].astype(self.action_spec()["world_vector"].dtype),
      # "rotation_delta": action[3:6].astype(self.action_spec()["rotation_delta"].dtype),
      # "open_gripper": action[6:].astype(self.action_spec()["open_gripper"].dtype),
      "action": action.astype(self.action_spec().dtype),
      "natural_language_instruction": np.array(instruction.encode('utf-8'), dtype=np.object_)
    }
    return action_dict
  
e = 0
obs, act = joblib.load(os.path.join(load_data_dir, filenames[e]))
env = DataGym(obs)

In [20]:
#@title convert custom data to rlds

from tqdm import trange

def step_fn(unused_timestep, unused_action, unused_env):
  return {'timestamp_ns': time.time_ns()}

ds_config = tfds.rlds.rlds_base.DatasetConfig(
      name=exp,
      observation_info=tfds.features.FeaturesDict({
         "image": tfds.features.Image(
          shape=env.observation_spec()["image"].shape,
          dtype=env.observation_spec()["image"].dtype),
          "joint_state": tfds.features.Tensor(
          shape=env.observation_spec()["joint_state"].shape,
          dtype=env.observation_spec()["joint_state"].dtype,),
          "state": tfds.features.Tensor(
          shape=env.observation_spec()["state"].shape,
          dtype=env.observation_spec()["state"].dtype,),
      }),
      action_info=
      # tfds.features.FeaturesDict({
        #  "world_vector": tfds.features.Tensor(
        #   shape=env.action_spec()["world_vector"].shape,
        #   dtype=env.action_spec()["world_vector"].dtype),
        #   "rotation_delta": tfds.features.Tensor(
        #   shape=env.action_spec()["rotation_delta"].shape,
        #   dtype=env.action_spec()["rotation_delta"].dtype),
        #   "open_gripper": tfds.features.Tensor(
        #   shape=env.action_spec()["open_gripper"].shape,
        #   dtype=env.action_spec()["open_gripper"].dtype,),
        # })
          tfds.features.Tensor(
          shape=env.action_spec().shape,
          dtype=env.action_spec().dtype,),
          # "natural_language_instruction": tfds.features.Tensor(
          # shape=(),
          # dtype=tf.string,),
      reward_info=tf.float64,
      discount_info=tf.float64,
      step_metadata_info={'timestamp_ns': tf.int64})

with envlogger.EnvLogger(
        env,
        backend = tfds_backend_writer.TFDSBackendWriter(
          data_directory=generate_data_dir,
          split_name='train',
          max_episodes_per_file=max_episodes_per_shard,
          ds_config=ds_config),
        step_fn=step_fn) as env:
  
  for e in trange(num_episodes):

      obs, act = joblib.load(os.path.join(load_data_dir, filenames[e]))
      data_env = DataGym(obs)

      env._environment = data_env
      timestep = env.reset()

      for i in range(act.shape[0]-1):
      # while not timestep.last():
      # while not timestep.last():

        # instruction = "pick up the red block"
        # action = env.preprocess_action(act[e], instruction)
        # action.pop("natural_language_instruction")

        action = act[i].astype(np.float32)
        timestep = env.step(action)


100%|██████████| 79/79 [07:59<00:00,  6.06s/it]


In [21]:
b = tfds.builder_from_directory(builder_dir=generate_data_dir)
b.info.features["steps"]

Dataset({
    'action': Tensor(shape=(7,), dtype=float32),
    'discount': float64,
    'is_first': bool,
    'is_last': bool,
    'is_terminal': bool,
    'observation': FeaturesDict({
        'image': Image(shape=(480, 640, 3), dtype=uint8),
        'joint_state': Tensor(shape=(8,), dtype=float32),
        'state': Tensor(shape=(7,), dtype=float32),
    }),
    'reward': float64,
    'timestamp_ns': int64,
})

# Recover a dataset

When the process of generating one dataset didn't finish properly, it is possible for the last shard to be incomplete. Envlogger provides the functionality to recover this last shard.

In [None]:
recover_dataset_path = generate_data_dir

builder = tfds.builder_from_directory(recover_dataset_path)
builder = rlds_utils.maybe_recover_last_shard(builder)

# Load one dataset

Loading one dataset generated with the TFDS backend uses just regular TFDS functionality.



In [None]:
load_dataset_path = generate_data_dir

loaded_dataset = tfds.builder_from_directory(load_dataset_path).as_dataset(split='all')

for e in loaded_dataset:
  print(e)


{'steps': <_VariantDataset element_spec={'action': {'open_gripper': TensorSpec(shape=(1,), dtype=tf.float32, name=None), 'rotation_delta': TensorSpec(shape=(3,), dtype=tf.float32, name=None), 'world_vector': TensorSpec(shape=(3,), dtype=tf.float32, name=None)}, 'discount': TensorSpec(shape=(), dtype=tf.float64, name=None), 'is_first': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_last': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_terminal': TensorSpec(shape=(), dtype=tf.bool, name=None), 'observation': {'image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'joint_state': TensorSpec(shape=(8,), dtype=tf.float32, name=None), 'state': TensorSpec(shape=(7,), dtype=tf.float32, name=None)}, 'reward': TensorSpec(shape=(), dtype=tf.float64, name=None), 'timestamp_ns': TensorSpec(shape=(), dtype=tf.int64, name=None)}>}
{'steps': <_VariantDataset element_spec={'action': {'open_gripper': TensorSpec(shape=(1,), dtype=tf.float32, name=None), 'rotation_delta': TensorSpec

In [None]:
e["steps"]

<_VariantDataset element_spec={'action': {'open_gripper': TensorSpec(shape=(1,), dtype=tf.float32, name=None), 'rotation_delta': TensorSpec(shape=(3,), dtype=tf.float32, name=None), 'world_vector': TensorSpec(shape=(3,), dtype=tf.float32, name=None)}, 'discount': TensorSpec(shape=(), dtype=tf.float64, name=None), 'is_first': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_last': TensorSpec(shape=(), dtype=tf.bool, name=None), 'is_terminal': TensorSpec(shape=(), dtype=tf.bool, name=None), 'observation': {'image': TensorSpec(shape=(480, 640, 3), dtype=tf.uint8, name=None), 'joint_state': TensorSpec(shape=(8,), dtype=tf.float32, name=None), 'state': TensorSpec(shape=(7,), dtype=tf.float32, name=None)}, 'reward': TensorSpec(shape=(), dtype=tf.float64, name=None), 'timestamp_ns': TensorSpec(shape=(), dtype=tf.int64, name=None)}>