In [None]:
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple , deque
from itertools import  count

from franka import FrankaRoboticsEnv
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F



In [None]:
env = FrankaRoboticsEnv(
    # executable_file='@editor',
    scene_file='FrankaRobotics.json',
    max_episode_length = 1000,
    reward_type='sparse',
    seed=None,
    tolerance=0.05,
    load_object=True,
    target_in_air=True,
    block_gripper=False,
    target_xz_range=0.15,
    target_y_range=0.6,
    object_xz_range=0.15,
    asset_bundle_file=None,
    assets=['Rigidbody_Box']
)
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")


In [None]:
n_actions = env.action_space.shape[0]

# Reset the environment at the beginning of each episode
reset_output = env.reset()

# Extract observation, achieved_goal, and desired_goal from the output
state = reset_output['observation']  # The initial observation state
achieved_goal = reset_output['achieved_goal']  # The initial achieved goal
desired_goal = reset_output['desired_goal']  # The goal to achieve

# Convert to PyTorch tensors and add batch dimension if necessary
n_observations = torch.tensor([state], dtype=torch.float32, device=device)


In [None]:
env.observation_space.sample()

In [None]:
Transition = namedtuple("Transition" , ('state' , 'action' , 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [None]:
class DQN(nn.Module):

    def __init__(self , n_observations , n_actions):
        super(DQN , self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128 , 128)
        self.layer3 = nn.Linear(128 , n_actions)

    def forward(self , x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

In [None]:


BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4



policy_net = DQN(n_observations ,n_actions).to(device)
target_net = DQN(n_observations , n_actions).to(device)

target_net.load_state_dict(policy_net.state_dict())
