In [2]:
%matplotlib inline

import gym
import itertools
import matplotlib
import matplotlib.pyplot as plt
import numpy as np
import sys
import sklearn.pipeline
import sklearn.preprocessing
from collections import namedtuple

if "../" not in sys.path:
    sys.path.append("../") 

from lib import plotting
from sklearn.linear_model import SGDRegressor
from sklearn.kernel_approximation import RBFSampler
from sklearn.preprocessing import PolynomialFeatures

matplotlib.style.use('ggplot')

In [3]:
env = gym.envs.make("MountainCar-v0")

In [4]:
# Feature(state) 전처리, 평균을 0, 표준편차를 1로 만들어줍니다
# 환경에서 임의의 샘플 10000개를 뽑아와서 작업을 시작합니다.
observation_examples = np.array([env.observation_space.sample() for x in range(10000)])
scaler = sklearn.preprocessing.StandardScaler()
scaler.fit(observation_examples)

# Polynomial features
featurizer = PolynomialFeatures(degree=10)
featurizer.fit(scaler.transform(observation_examples))

#  Feature construction
# RBF kernel을 사용하여 새로운 Feature 들을 만들어냅니다.
# 다양한 gamma 값을 사용하여 다양한 Feature를 만들어냅니다.
# featurizer = sklearn.pipeline.FeatureUnion([
#         ("rbf1", RBFSampler(gamma=5.0, n_components=100)),
#         ("rbf2", RBFSampler(gamma=2.0, n_components=100)),
#         ("rbf3", RBFSampler(gamma=1.0, n_components=100)),
#         ("rbf4", RBFSampler(gamma=0.5, n_components=100))
#         ])
# featurizer.fit(scaler.transform(observation_examples))

PolynomialFeatures(degree=5, include_bias=True, interaction_only=False)

In [6]:
class Estimator():
    """
    Value Function approximator. 
    """
    
    def __init__(self):
        # action space가 discreate 하기 때문에  action 마다
        # 모델을 따로 만들어 줄것입니다.
        self.models = []
        for _ in range(env.action_space.n):
            model = SGDRegressor(learning_rate="constant")
            # We need to call partial_fit once to initialize the model
            # or we get a NotFittedError when trying to make a prediction
            # This is quite hacky.
            model.partial_fit([self.featurize_state(env.reset())], [0])
            self.models.append(model)
    
    def featurize_state(self, state):
        """
        state(position, velocity)를 input으로 받고 Feature construction을 한
        결과를 ouput으로 내는 함수입니다.
        """
        scaled = scaler.transform([state])
        featurized = featurizer.transform(scaled)
        return featurized[0]
    
    def predict(self, s):
        """
        value에 대한 예측을 합니다.
        
        Args:
            s: value를 estimate하고 싶은 state
            
        Returns
            state s에서 각각의 action이 갖는 value를 numpy.array로 반환
            
        """
        # TODO: Implement this!
        pass
    
    def update(self, s, a, y):
        """
        주어진 state s, action a와 target y를 사용하여 estimator를 update 합니다.
        """
        # TODO: Implement this!
        pass

In [7]:
def make_epsilon_greedy_policy(estimator, epsilon, nA):
    """
    epsilon greedy policy 정의
    
    """
    def policy_fn(observation):
        A = np.ones(nA, dtype=float) * epsilon / nA
        q_values = estimator.predict(observation)
        best_action = np.argmax(q_values)
        A[best_action] += (1.0 - epsilon)
        return A
    return policy_fn

In [8]:
def q_learning(env, estimator, num_episodes, discount_factor=1.0, epsilon=0.1, epsilon_decay=1.0):
    """
    Q-learning with Funtion Approximation with epsilon greedy policy
    Args:
        env: OpenAI environment.
        estimator: Action-Value function estimator
        num_episodes: Number of episodes to run for.
        discount_factor: Gamma discount factor.
        epsilon: Chance the sample a random action. Float betwen 0 and 1.
        epsilon_decay: Each episode, epsilon is decayed by this factor
    
    Returns:
        An EpisodeStats object with two numpy arrays for episode_lengths and episode_rewards.
    """

    # 통계(episode별 길이와 reward)를 저장
    EpisodeStats = namedtuple("Stats",["episode_lengths", "episode_rewards"])
    stats = EpisodeStats(episode_lengths=np.zeros(num_episodes),
                         episode_rewards=np.zeros(num_episodes))
    position_list = []
    
    for i_episode in range(num_episodes):
        
        # policy 정의
        policy = make_epsilon_greedy_policy(
            estimator, epsilon * epsilon_decay**i_episode, env.action_space.n)
        
        # Print out which episode we're on, useful for debugging.
        # Also print reward for last episode
        last_reward = stats.episode_rewards[i_episode - 1]
        print("\rEpisode {}/{} ({})".format(i_episode + 1, num_episodes, last_reward), end="")
        sys.stdout.flush()
        
        # TODO: Implement this!
    
    return stats

In [9]:
estimator = Estimator()

In [10]:
stats = q_learning(env, estimator, 100, epsilon=0.0)

Episode 100/100 (0.0)

In [None]:
plotting.get_render(position_list[-1000:])

In [None]:
plotting.plot_cost_to_go_mountain_car(env, estimator)
plotting.plot_episode_stats(stats, smoothing_window=25)