In [None]:
from gymnasium import Env
from gymnasium.spaces import Discrete,Box 
import numpy as np
import random
from math import sqrt
from matplotlib import pyplot as plt
import pygame
white = (255, 255, 255)
colors = [
    (0, 0, 255),  # Blue
    (0, 255, 0),  # Green
    (255, 0, 0),  # Red
    (255, 255, 0),  # Yellow
    (255, 0, 255),  # Magenta
    (0, 255, 255),  # Cyan
    (128, 0, 0),  # Maroon
    (0, 128, 0),  # Olive
    (128, 128, 0),  # Yellow Green
    (0, 0, 128),  # Navy
    (128, 0, 128),  # Purple
    (0, 128, 128),  # Teal
    (128, 128, 128),  # Gray
    (255, 165, 0)  # Orange (for special cell)
]


In [None]:


class rlEnv(Env):
    
    def __init__(self, W, H):
        super(rlEnv, self).__init__()
        self.W = W
        self.H = H
        self.stepsize = (2 / self.W)
        self.gravity = 9.81
        self.truncated = False
        self.done = False
        self.action_space = Discrete(4)
        self.observation_space = Discrete(W * H)
        self.grid = np.zeros((H, W))
        for i in range(W):
            for j in range(H):
                x = -3 + i * (6 / self.W)
                y = -3 + j * (6 / self.H)
                self.grid[j, i] = 16 * 0.4 * (x**2 + y**2 - 1/4)**2 if x**2 + y**2 < 1/4 else 0

        self.screen_width = 800
        self.screen_height = 800
        self.screen = None
        self.state = self._to_s(int((self.H) / 2), int(0.25 * (self.W)))
        self.velocity = 1
        self.reward = 0
        self.collective = 0
        
    def _to_s(self, row, col):
        return row * self.W + col

    def step(self, action):
        row, col = divmod(self.state, self.W)
        prev_row, prev_col = row, col
        h_prev = self.grid[row, col]

        if self.velocity == 0 or self.collective < -30:  # Check termination conditions
            self.reward = -10
            self.truncated = True
        elif (row == int((self.H) / 2) and col == int(0.75 * self.W)):
            self.reward = 0
            self.done = True
        else:
            self.done = False
            if action == 0:  # Move left
                col = max(col - 1, 0)
            elif action == 1:  # Move down
                row = min(row + 1, self.H - 1)
            elif action == 2:  # Move right
                col = min(col + 1, self.W - 1)
            elif action == 3:  # Move up
                row = max(row - 1, 0)

            if ((prev_col - col + prev_row - row) == 0):
                self.reward = -1
            else:
                self.reward = -float(self.stepsize / self.velocity)

            h_new = self.grid[row, col]
            update_vel = self.velocity ** 2 + 2 * self.gravity * (h_prev - h_new)
            if update_vel < 0:
                self.velocity = 0
            else:
                self.velocity = sqrt(update_vel)
            
            self.state = self._to_s(row, col)
            self.collective += self.reward

        return self.state, self.reward, self.done, self.truncated, {}

    def render(self):
        if self.screen is None:
            pygame.init()
            self.screen = pygame.display.set_mode((self.screen_width, self.screen_height))
            self.screen.fill((255, 255, 255))

            for i in range(self.W):
                for j in range(self.H):
                    x = i * (self.screen_width / self.W)
                    y = j * (self.screen_height / self.H)
                    color_index = int(self.grid[i, j] * 255)
                    color = (255 - color_index, 0, color_index)
                    pygame.draw.rect(self.screen, color, (x, y, self.screen_width / self.W, self.screen_height / self.H))
            
            agent_row, agent_col = divmod(self.state, self.W)
            agent_x = agent_col * (self.screen_width / self.W)
            agent_y = agent_row * (self.screen_height / self.H) + (self.screen_height / self.H) / 2
            pygame.draw.circle(self.screen, (0, 0, 0), (int(agent_x), int(agent_y)), 10)
            pygame.display.flip()
        
        else:
            agent_row, agent_col = divmod(self.state, self.W)
            agent_x = agent_col * (self.screen_width / self.W)
            agent_y = agent_row * (self.screen_height / self.H) + (self.screen_height / self.H) / 2
            pygame.draw.circle(self.screen, (0, 0, 0), (int(agent_x), int(agent_y)), 10)
            pygame.display.flip()

        return

    def close(self):
        if self.screen is not None:
            pygame.quit()
            self.screen = None

    def reset(self):
        self.state = self._to_s(int((self.H) / 2), int(0.25 * (self.W)))
        self.velocity = 1
        self.reward = 0
        self.truncated = False
        self.done = False
        self.collective = 0
        return self.state


In [None]:
env=rlEnv(20,20)
env.render()
pygame.time.wait(1000)
print(env.screen)

In [None]:
from stable_baselines3.common.env_checker import check_env
check_env(env)

In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import cv2
import random
import time
from collections import deque

SNAKE_LEN_GOAL = 30

def collision_with_apple(apple_position, score):
	apple_position = [random.randrange(1,50)*10,random.randrange(1,50)*10]
	score += 1
	return apple_position, score

def collision_with_boundaries(snake_head):
	if snake_head[0]>=500 or snake_head[0]<0 or snake_head[1]>=500 or snake_head[1]<0 :
		return 1
	else:
		return 0

def collision_with_self(snake_position):
	snake_head = snake_position[0]
	if snake_head in snake_position[1:]:
		return 1
	else:
		return 0


class SnekEnv(gym.Env):

	def __init__(self):
		super(SnekEnv, self).__init__()
		# Define action and observation space
		# They must be gym.spaces objects
		# Example when using discrete actions:
		self.action_space = spaces.Discrete(4)
		# Example for using image as input (channel-first; channel-last also works):
		self.observation_space = spaces.Box(low=-500, high=500,
											shape=(5+SNAKE_LEN_GOAL,), dtype=np.float32)

	def step(self, action):
		self.prev_actions.append(action)
		cv2.imshow('a',self.img)
		cv2.waitKey(1)
		self.img = np.zeros((500,500,3),dtype='uint8')
		# Display Apple
		cv2.rectangle(self.img,(self.apple_position[0],self.apple_position[1]),(self.apple_position[0]+10,self.apple_position[1]+10),(0,0,255),3)
		# Display Snake
		for position in self.snake_position:
			cv2.rectangle(self.img,(position[0],position[1]),(position[0]+10,position[1]+10),(0,255,0),3)
		
		# Takes step after fixed time
		t_end = time.time() + 0.05
		k = -1
		while time.time() < t_end:
			if k == -1:
				k = cv2.waitKey(1)
			else:
				continue

		button_direction = action
		# Change the head position based on the button direction
		if button_direction == 1:
			self.snake_head[0] += 10
		elif button_direction == 0:
			self.snake_head[0] -= 10
		elif button_direction == 2:
			self.snake_head[1] += 10
		elif button_direction == 3:
			self.snake_head[1] -= 10

		# Increase Snake length on eating apple
		if self.snake_head == self.apple_position:
			self.apple_position, self.score = collision_with_apple(self.apple_position, self.score)
			self.snake_position.insert(0,list(self.snake_head))

		else:
			self.snake_position.insert(0,list(self.snake_head))
			self.snake_position.pop()
		
		# On collision kill the snake and print the score
		if collision_with_boundaries(self.snake_head) == 1 or collision_with_self(self.snake_position) == 1:
			font = cv2.FONT_HERSHEY_SIMPLEX
			self.img = np.zeros((500,500,3),dtype='uint8')
			cv2.putText(self.img,'Your Score is {}'.format(self.score),(140,250), font, 1,(255,255,255),2,cv2.LINE_AA)
			cv2.imshow('a',self.img)
			self.done = True

		self.total_reward = len(self.snake_position) - 3  # default length is 3
		self.reward = self.total_reward - self.prev_reward
		self.prev_reward = self.total_reward

		if self.done:
			self.reward = -10
		info = {}


		head_x = self.snake_head[0]
		head_y = self.snake_head[1]

		snake_length = len(self.snake_position)
		apple_delta_x = self.apple_position[0] - head_x
		apple_delta_y = self.apple_position[1] - head_y

		# create observation:

		observation = [head_x, head_y, apple_delta_x, apple_delta_y, snake_length] + list(self.prev_actions)
		observation = np.array(observation)

		return observation, self.reward, self.done, info

	def reset(self):
		self.img = np.zeros((500,500,3),dtype='uint8')
		# Initial Snake and Apple position
		self.snake_position = [[250,250],[240,250],[230,250]]
		self.apple_position = [random.randrange(1,50)*10,random.randrange(1,50)*10]
		self.score = 0
		self.prev_button_direction = 1
		self.button_direction = 1
		self.snake_head = [250,250]

		self.prev_reward = 0

		self.done = False

		head_x = self.snake_head[0]
		head_y = self.snake_head[1]

		snake_length = len(self.snake_position)
		apple_delta_x = self.apple_position[0] - head_x
		apple_delta_y = self.apple_position[1] - head_y

		self.prev_actions = deque(maxlen = SNAKE_LEN_GOAL)  # however long we aspire the snake to be
		for i in range(SNAKE_LEN_GOAL):
			self.prev_actions.append(-1) # to create history

		# create observation:
		observation = [head_x, head_y, apple_delta_x, apple_delta_y, snake_length] + list(self.prev_actions)
		observation = np.array(observation)

		return observation

In [None]:
from stable_baselines3.common.env_checker import check_env



env = SnekEnv()
# It will check your custom environment and output additional warnings if needed
check_env(env)

In [None]:
from stable_baselines3 import PPO
import os
# from snakeenv import SnekEnv
import time



models_dir = f"models/{int(time.time())}/"
logdir = f"logs/{int(time.time())}/"

if not os.path.exists(models_dir):
	os.makedirs(models_dir)

if not os.path.exists(logdir):
	os.makedirs(logdir)

env = SnekEnv()
env.reset()

model = PPO('MlpPolicy', env, verbose=1, tensorboard_log=logdir)

TIMESTEPS = 10000
iters = 0
while True:
	iters += 1
	model.learn(total_timesteps=TIMESTEPS, reset_num_timesteps=False, tb_log_name=f"PPO")
	model.save(f"{models_dir}/{TIMESTEPS*iters}")

In [None]:
import gymnasium as gym

from stable_baselines3 import PPO,DQN
from stable_baselines3.common.vec_env import DummyVecEnv, SubprocVecEnv
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.utils import set_random_seed

def make_env(env_id: str, rank: int, seed: int = 0):
    """
    Utility function for multiprocessed env.

    :param env_id: the environment ID
    :param num_env: the number of environments you wish to have in subprocesses
    :param seed: the inital seed for RNG
    :param rank: index of the subprocess
    """
    def _init():
        env = gym.make(env_id)
        env.reset(seed=seed + rank)
        return env
    set_random_seed(seed)
    return _init

if __name__ == "__main__":
    env_id = "CartPole-v1"
    num_cpu = 1# Number of processes to use
    # Create the vectorized environment
    vec_env = SubprocVecEnv([make_env(env_id, i) for i in range(num_cpu)])

    # Stable Baselines provides you with make_vec_env() helper
    # which does exactly the previous steps for you.
    # You can choose between `DummyVecEnv` (usually faster) and `SubprocVecEnv`
    # env = make_vec_env(env_id, n_envs=num_cpu, seed=0, vec_env_cls=SubprocVecEnv)

    model = PPO("MlpPolicy", vec_env, verbose=1)
    model.learn(total_timesteps=25_000)

    obs = vec_env.reset()
    for _ in range(1000):
        action, _states = model.predict(obs)
        obs, rewards, dones, info = vec_env.step(action)
        vec_env.render()