# **Welcome to my Farm**

In [1]:
import numpy as np
import gymnasium as gym
from tqdm import tqdm
from typing import Optional
from collections import defaultdict
from matplotlib import pyplot as plt

class FarmGridWorldEnv(gym.Env):
    def __init__(self, size: int = 5, harvest_goal: int = 10):
        """
        Observations (Discrete but stored in a Dict):
          - agent_loc, grid_rep, crop_timer, soil_moisture, dry_counter, harvest_count, fertility_rep
        Actions:
          - 0..8 => Movement (including diagonals, stop)
          - 9 => Plough
          - 10 => Plant
          - 11 => Water
          - 12 => Harvest
        """

        # Grid Size (n x n)
        self.size = size
        self.goal = harvest_goal

        # Agent location
        self._agent_location = np.array([-1, -1], dtype=np.int32)

        # Grids
        self._grid = np.zeros((size, size), dtype=np.int32)
        self._crop_timer_grid = np.zeros((size, size), dtype=np.int32)
        self._soil_moisture_grid = np.zeros((size, size), dtype=np.int32)
        self._dry_counter_grid = np.zeros((size, size), dtype=np.int32)
        self._fertility_grid = np.zeros((size, size), dtype=np.int32)

        # For tracking how many crops have been harvested
        self._harvested = 0

        # Track visits (for heatmap & "touching unused parts" reward)
        self._usage_grid = np.zeros((size, size), dtype=np.int32)

        # Configurations
        self._max_crop_timer = 30
        self._max_soil_moisture = 15
        self._max_dry_counter = 10
        self._max_fertility = 30

        # Spaces
        self.observation_space = gym.spaces.Dict({
            "agent_loc":     gym.spaces.Box(low=0, high=size-1, shape=(2,), dtype=np.int32),
            "grid_rep":      gym.spaces.Box(low=0, high=3, shape=(size,size), dtype=np.int32),
            "crop_timer_rep":gym.spaces.Box(low=0, high=self._max_crop_timer, shape=(size,size), dtype=np.int32),
            "soil_moisture_rep": gym.spaces.Box(low=0, high=self._max_soil_moisture, shape=(size,size), dtype=np.int32),
            "dry_counter_rep":   gym.spaces.Box(low=0, high=self._max_dry_counter, shape=(size,size), dtype=np.int32),
            "harvest_count":     gym.spaces.Box(low=0, high=harvest_goal, shape=(), dtype=np.int32),
            "fertility_rep":     gym.spaces.Box(low=0, high=self._max_fertility, shape=(size, size), dtype=np.int32),
        })

        self.action_space = gym.spaces.Discrete(13)
        self._action_to_direction = {
            0: np.array([-1,  0]), # up
            1: np.array([ 1,  0]), # down
            2: np.array([ 0, -1]), # left
            3: np.array([ 0,  1]), # right
            4: np.array([ 0,  0]), # stop/no movement
            5: np.array([-1,  1]), # top-right
            6: np.array([-1, -1]), # top-left
            7: np.array([ 1,  1]), # bottom-right
            8: np.array([ 1, -1]), # bottom-left
        }

    def _get_obs(self):
        return {
            "agent_loc":       self._agent_location.astype(np.int32),
            "grid_rep":        self._grid.astype(np.int32),
            "crop_timer_rep":  self._crop_timer_grid.astype(np.int32),
            "soil_moisture_rep": self._soil_moisture_grid.astype(np.int32),
            "dry_counter_rep": self._dry_counter_grid.astype(np.int32),
            "harvest_count":   self._harvested,
            "fertility_rep":   self._fertility_grid.astype(np.int32),
        }

    def _get_info(self):
        return {
            "harvest_goal": self.goal,
            "usage_grid":   self._usage_grid.copy(),
        }

    def reset(self, seed: Optional[int] = None, options: Optional[dict] = None):
        super().reset(seed=seed)

        # Random agent location
        self._agent_location = self.np_random.integers(0, self.size, size=2, dtype=np.int32)

        self._harvested = 0

        # Reset grids
        self._grid.fill(0)
        self._crop_timer_grid.fill(0)
        self._dry_counter_grid.fill(0)

        # Random initial moisture
        self._soil_moisture_grid = np.random.randint(
            0, self._max_soil_moisture + 1, size=(self.size, self.size), dtype=np.int32
        )
        # Random fertility from 10..30
        self._fertility_grid = np.random.randint(
            10, self._max_fertility + 1, size=(self.size, self.size), dtype=np.int32
        )

        return self._get_obs(), self._get_info()

    def _update_crop_growth(self):
        growing_crops = (self._grid == 2)
        moist_soils = (self._soil_moisture_grid > 0)

        can_grow = growing_crops & moist_soils
        self._crop_timer_grid[can_grow] -= 1

        fully_grown = (self._crop_timer_grid <= 0) & can_grow
        self._grid[fully_grown] = 3

    def _decay_soil_moisture(self):
        has_moisture = (self._soil_moisture_grid > 0)
        self._soil_moisture_grid[has_moisture] -= 1

    def _handle_crop_death(self, penalty: int = -5):
        """Kill crops if dryness is too long, apply penalty per dead crop."""
        planted = (self._grid == 2)
        dry = (self._soil_moisture_grid == 0)
        dry_crops = planted & dry

        self._dry_counter_grid[dry_crops] += 1
        rehydrated = planted & (self._soil_moisture_grid > 0)
        self._dry_counter_grid[rehydrated] = 0

        dead_crops = (self._dry_counter_grid >= 10) & planted
        num_dead = np.count_nonzero(dead_crops)
        if num_dead > 0:
            self._grid[dead_crops] = 0
            self._crop_timer_grid[dead_crops] = 0
            self._soil_moisture_grid[dead_crops] = 0
            self._dry_counter_grid[dead_crops] = 0

        return penalty * num_dead

    def _recover_fertility(self):
        # Soils that are empty(0) or ploughed(1) can recover up to max
        empty_or_ploughed = (self._grid == 0) | (self._grid == 1)
        needs_recovery = empty_or_ploughed & (self._fertility_grid < self._max_fertility)

        self._fertility_grid[needs_recovery] += 1
        above_max = self._fertility_grid > self._max_fertility
        self._fertility_grid[above_max] = self._max_fertility

    def _fertility_harvest_bonus(self, x, y, default_reward):
        """Scale harvest reward by fertility, degrade a bit."""
        fertility_factor = self._fertility_grid[x, y] / float(self._max_fertility)
        actual_reward = default_reward * fertility_factor
        self._fertility_grid[x, y] = max(0, self._fertility_grid[x, y] - 3)
        return actual_reward

    def step(self, action):
        reward = -0.001   # baseline cost per step
        truncated = False
        terminated = False

        x, y = self._agent_location

        # (1) REWARD FOR TOUCHING UNUSED PARTS:
        #    If usage_grid[x,y]==0 BEFORE we move here, then +1
        if self._usage_grid[x, y] == 0:
            reward += 1.0  # exploring a new tile
        self._usage_grid[x, y] += 1

        # Movement
        if action in self._action_to_direction:
            direction = self._action_to_direction[action]
            old_x, old_y = x, y
            # Clip so we don't go out of bounds
            self._agent_location = np.clip(self._agent_location + direction, 0, self.size-1)
            x, y = self._agent_location

        # Define base rewards
        action_rewards = {
            "plough":      5,
            "plant":       10,
            "water":       8,
            "re_water":    5,
            "harvest":     200,
        }
        penalties = {
            "plough_inv":   -2,
            "plant_inv":    -2,
            "water_inv":    -2,
            "harvest_inv":  -10,
            "dead_crop":    -5,
        }

        # Farming actions
        if action == 9:   # Plough
            if self._grid[x, y] == 0:
                self._grid[x, y] = 1
                reward += action_rewards["plough"]
            else:
                reward += penalties["plough_inv"]

        elif action == 10: # Plant
            if self._grid[x, y] == 1:
                self._grid[x, y] = 2
                reward += action_rewards["plant"]
            else:
                reward += penalties["plant_inv"]

        elif action == 11: # Water
            if self._grid[x, y] == 2:
                if self._soil_moisture_grid[x, y] > 0:
                    reward += action_rewards["re_water"]
                else:
                    reward += action_rewards["water"]
                    self._crop_timer_grid[x, y] = self._max_crop_timer
                self._soil_moisture_grid[x, y] = self._max_soil_moisture
            else:
                reward += penalties["water_inv"]

        elif action == 12: # Harvest
            if self._grid[x, y] == 3:
                harvest_reward = self._fertility_harvest_bonus(x, y, action_rewards["harvest"])
                reward += harvest_reward

                # reset tile
                self._grid[x, y] = 0
                self._crop_timer_grid[x, y] = 0
                self._soil_moisture_grid[x, y] = 0

                self._harvested += 1
                # (3) REWARD for achieving harvest goal quickly:
                if self._harvested >= self.goal:
                    # bonus that scales inversely with steps taken
                    # e.g., bigger if the agent finishes early
                    # or just a flat big bonus
                    quick_bonus = 500.0  # or (1000 - step_count), your call
                    reward += quick_bonus
                    terminated = True
            else:
                reward += penalties["harvest_inv"]

        # (2) REWARD FOR NURTURING A CROP:
        # e.g., each step, if you have any planted tile that is moist, small +0.05
        # so we do that after the environment updates:
        # We'll do it with how many planted crops are still alive & moist
        # We'll compute this after the environment updates.

        # environment updates
        self._update_crop_growth()
        self._decay_soil_moisture()
        dead_crop_penalty = self._handle_crop_death(penalty=penalties["dead_crop"])
        reward += dead_crop_penalty
        self._recover_fertility()

        # Now do the "nurture reward"
        # Count how many crops are still in state=2 or 3 and have moisture>0
        planted_or_grown = (self._grid == 2) | (self._grid == 3)
        moist_tiles = (self._soil_moisture_grid > 0)
        # Crop that is alive+moist
        nurturing_mask = planted_or_grown & moist_tiles
        num_nurtured = np.count_nonzero(nurturing_mask)
        # small reward for each tile that is successfully nurtured
        reward += 0.05 * num_nurtured

        return self._get_obs(), float(reward), terminated, truncated, self._get_info()


In [2]:
# Register environment

gym.register(
    id="gymnasium_env/FarmGridWorld-v0",
    entry_point=FarmGridWorldEnv,
)

In [3]:
class Dash:
    def __init__(
        self,
        env: gym.Env,
        learning_rate: float,
        initial_epsilon: float,
        epsilon_decay: float,
        final_epsilon: float,
        discount_factor: float = 0.95,
    ):
        
        self.env = env
        self.q_values = defaultdict(lambda: np.zeros(env.action_space.n))

        self.lr = learning_rate
        self.discount_factor = discount_factor

        self.epsilon = initial_epsilon
        self.epsilon_decay = epsilon_decay
        self.final_epsilon = final_epsilon

        self.training_error = []
        

    def get_action(self, obs: dict) -> int:
        
        state_key = self._obs_to_key(obs)
        if np.random.random() < self.epsilon:
            return self.env.action_space.sample()
        else:
            return int(np.argmax(self.q_values[state_key]))
            

    def _obs_to_key(self, obs: dict) -> tuple:
        
        x, y = obs["agent_loc"]
        return (
            int(x),
            int(y),
            int(obs["grid_rep"][x, y]),
            int(obs["crop_timer_rep"][x, y]),
            int(obs["soil_moisture_rep"][x, y]),
            int(obs["dry_counter_rep"][x, y]),
            int(obs["harvest_count"]),
            int(obs["fertility_rep"][x, y]),
        )
        
    
    def update(
            self,
            obs: dict,
            action: int,
            reward: float,
            terminated: bool,
            next_obs: dict,
    ):
        
        state_key = self._obs_to_key(obs)
        next_state_key = self._obs_to_key(next_obs)
        
        future_q = (not terminated) * np.max(self.q_values[next_state_key])
    
        td = reward + self.discount_factor * future_q - self.q_values[state_key][action]

        self.q_values[state_key][action] += self.lr * td
        self.training_error.append(td)
        

    def decay_epsilon(self):
        self.epsilon = max(self.final_epsilon, self.epsilon * self.epsilon_decay)
        
    
    def train(self, n_episodes=10_000):
        
        episode_rewards = []
        episode_lengths = []
        total_harvest = 0
        early_termination_count = 0
        time_limit_count = 0

        for episode in tqdm(range(n_episodes)):
            obs, info = env.reset()
            done = False
            total_reward = 0
            length = 0
            episode_harvest = 0
        
            while not done:
                action = self.get_action(obs)
                next_obs, reward, terminated, truncated, info = env.step(action)
        
                self.update(obs, action, reward, terminated, next_obs)
        
                done = terminated or truncated
                obs = next_obs
        
                total_reward += reward
                length += 1
                episode_harvest = obs["harvest_count"]
                
            episode_rewards.append(total_reward)
            episode_lengths.append(length)
            total_harvest += episode_harvest
            agent.decay_epsilon()
        
            if terminated:
                early_termination_count += 1
            elif truncated:
                time_limit_count += 1
        
        print("Episodes ended by harvest goal (TRAIN):", early_termination_count)
        print("Episodes ended by time limit (TRAIN):", time_limit_count)

        return {
            "episode_rewards": episode_rewards,
            "episode_lenghts": episode_lengths,
            "total_harvest": total_harvest,
            "early_termination_count": early_termination_count,
            "time_limit_count": time_limit_count,
        }

    def evaluate(self, test_n_episodes=10_000):

        old_epsilon = self.epsilon
        old_final_epsilon = self.final_epsilon
        old_epsilon_decay = self.epsilon_decay
        
        self.epsilon = 0.0
        self.final_epsilon = 0.0
        self.epsilon_decay = 1.0
        
        test_episode_rewards = []
        test_episode_lengths = []
        test_total_harvest = 0
        test_early_termination_count = 0
        test_time_limit_count = 0
        
        for episode in tqdm(range(test_n_episodes)):
            obs, info = env.reset()
            done = False
            test_total_reward = 0
            test_length = 0
            test_episode_harvest = 0
        
            while not done:
                action = self.get_action(obs)
                next_obs, reward, terminated, truncated, info = env.step(action)
        
                done = terminated or truncated
                obs = next_obs
        
                test_total_reward += reward
                test_length += 1
                test_episode_harvest = obs["harvest_count"]
                
            test_episode_rewards.append(test_total_reward)
            test_episode_lengths.append(test_length)
            test_total_harvest += test_episode_harvest

            if episode < 10:
                print(f"Reward for Episode {episode}: {test_total_reward}")
        
            if terminated:
                test_early_termination_count += 1
            elif truncated:
                test_time_limit_count += 1

        self.epsilon = old_epsilon
        self.final_epsilon = old_final_epsilon
        self.epsilon_decay = old_epsilon_decay
        
        print("Episodes ended by harvest goal (TEST):", test_early_termination_count)
        print("Episodes ended by time limit (TEST):", test_time_limit_count)

        return {
            "test_episode_rewards": test_episode_rewards,
            "test_episode_lengths": test_episode_lengths,
            "test_total_harvest": test_total_harvest,
            "test_early_termination_count": test_early_termination_count,
            "test_time_limit_count": test_time_limit_count,
        }
        
        

In [4]:
# Environment Setup
max_episode_steps = 1000
size = 5
harvest_goal = 10

env = gym.wrappers.TimeLimit(
    gym.make("gymnasium_env/FarmGridWorld-v0", size=size, harvest_goal=harvest_goal),
    max_episode_steps=max_episode_steps
)

In [5]:
# Hyperparameters
n_episodes = 500_000
start_epsilon = 1.0
final_epsilon = 0.1
decay_episodes = int(n_episodes * 0.90)
epsilon_decay = (final_epsilon / start_epsilon) ** (1.0 / decay_episodes)

learning_rate = 0.01    
discount_factor = 0.99

# Agent Initialization
state, info = env.reset()

agent = Dash(
    env=env,
    learning_rate=learning_rate,
    initial_epsilon=start_epsilon,
    epsilon_decay=epsilon_decay,
    final_epsilon=final_epsilon,
    discount_factor=discount_factor,
)

  gym.logger.warn("Casting input x to numpy array.")


In [6]:
info = agent.train(n_episodes)

episode_rewards = info["episode_rewards"]
episode_lengths = info["episode_lenghts"]
total_harvest = info["total_harvest"]
early_termination_count = info["early_termination_count"]
time_limit_count = info["time_limit_count"]

  9%|███                               | 44153/500000 [29:46<5:07:28, 24.71it/s]


KeyboardInterrupt: 

In [None]:
print(f"Total Harvest (All Episodes): {total_harvest}")

def get_moving_avgs(arr, window, convolution_mode="valid"):
    return np.convolve(
        np.array(arr).flatten(),
        np.ones(window),
        mode=convolution_mode
    ) / window

# Plotting
rolling_length = 500
fig, axs = plt.subplots(ncols=3, figsize=(15, 5))

# Episode rewards
axs[0].set_title("Episode Rewards (Moving Avg)")
rewards_avg = get_moving_avgs(episode_rewards, rolling_length)
axs[0].plot(range(len(rewards_avg)), rewards_avg)
axs[0].set_xlabel("Episode")
axs[0].set_ylabel("Reward")

# Episode lengths
axs[1].set_title("Episode Lengths (Moving Avg)")
lengths_avg = get_moving_avgs(episode_lengths, rolling_length)
axs[1].plot(range(len(lengths_avg)), lengths_avg)
axs[1].set_xlabel("Episode")
axs[1].set_ylabel("Steps")

# Training error (TD Error)
axs[2].set_title("Training Error (Moving Avg)")
error_avg = get_moving_avgs(agent.training_error, rolling_length, "same")
axs[2].plot(range(len(error_avg)), error_avg)
axs[2].set_xlabel("Step")
axs[2].set_ylabel("TD Error")

plt.tight_layout()
plt.show()

# **Testing**

In [None]:
test_n_episodes = 5000

eval_info = agent.evaluate(test_n_episodes)

test_episode_rewards = eval_info["test_episode_rewards"]
test_episode_lengths = eval_info["test_episode_lengths"]
test_total_harvest = eval_info["test_total_harvest"]
test_early_termination_count = eval_info["test_early_termination_count"]
test_time_limit_count = eval_info["test_time_limit_count"]

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt

final_obs, final_info = env.reset()
usage_grid = final_info["usage_grid"]

plt.figure(figsize=(6, 5))
sns.heatmap(usage_grid, annot=True, fmt="d", cmap="YlGnBu", cbar_kws={'label': 'Visit Count'})
plt.title("Grid Usage Heatmap (Agent Visit Frequency)")
plt.xlabel("Column")
plt.ylabel("Row")
plt.tight_layout()
plt.show()

In [None]:
print(f"Total Harvest (All Episodes): {test_total_harvest}")

def get_moving_avgs(arr, window, convolution_mode="valid"):
    return np.convolve(
        np.array(arr).flatten(),
        np.ones(window),
        mode=convolution_mode
    ) / window

# Plotting
rolling_length = 500
fig, axs = plt.subplots(ncols=3, figsize=(15, 5))

# Compute average reward per step for each episode
avg_reward_per_step = [
    reward / steps if steps > 0 else 0
    for reward, steps in zip(test_episode_rewards, test_episode_lengths)
]

# Episode rewards
axs[0].set_title("Episode Rewards (Moving Avg)")
rewards_avg = get_moving_avgs(test_episode_rewards, rolling_length)
axs[0].plot(range(len(rewards_avg)), rewards_avg)
axs[0].set_xlabel("Episode")
axs[0].set_ylabel("Reward")

# Episode lengths
axs[1].set_title("Episode Lengths (Moving Avg)")
lengths_avg = get_moving_avgs(test_episode_lengths, rolling_length)
axs[1].plot(range(len(lengths_avg)), lengths_avg)
axs[1].set_xlabel("Episode")
axs[1].set_ylabel("Steps")

# Average reward per step
axs[2].set_title("Avg Reward per Step (Moving Avg)")
rps_avg = get_moving_avgs(avg_reward_per_step, rolling_length)
axs[2].plot(range(len(rps_avg)), rps_avg)
axs[2].set_xlabel("Episode")
axs[2].set_ylabel("Reward / Step")

plt.tight_layout()
plt.show()