In [None]:
# 需要先安裝 gym[atari]
# headless 執行: xvfb-run -a jupyter notebook
import gym

In [None]:
env = gym.make('Pong-ram-v0')

In [None]:
import numpy as np
import ipywidgets as W
from PIL import Image
from io import BytesIO
def to_png(a):
    with BytesIO() as bio:
        Image.fromarray(a).save(bio, 'png')
        return bio.getvalue()

Q learning 的骨架

* 用 `compute_s(observation)` 來計算 state
* 用 `Qupdate(s, a, v)` 來 update `Q(s,a)`
* 用 `Qfunc(s)` 來算 `Q(s, ...)`


In [None]:
from time import sleep
from random import randint, random, shuffle, choice
actions = [0,2,3]

def Qlearn(test=False, screen=None, T=40):
    observation = env.reset()
    for i in range(50):
        observation, reward, done, info =env.step(choice(actions))
    s2 = compute_s(observation)
    total_r = 0
    for i in range(T):
        s = s2
        if not test and random()< ϵ:
            a = choice(actions)
        elif s is None:
            a = choice(actions)
        else:
            a = actions[np.argmax(Qfunc(s))]
        observation, reward, done, info = env.step(a)        
        s2 = compute_s(observation)
        r = reward
        total_r+=r
        if not test and s is not None:
            if s2 is None:
                r=1.
            if r:
                v = r
            else:
                v = γ*Qfunc(s2).max()
            a = max(0, a-1)
            Qupdate(s, a, v)
        if screen is not None:
            img = env.render(mode='rgb_array')
            screen.value = to_png(img)
            sleep(1/60)
    return total_r

最簡單的是把整個 observation 當成 state，但是很慢

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, LeakyReLU
from keras.optimizers import Adam, SGD

Q = Sequential()
Q.add(Dense(2048, input_shape=(256,)))  # 輸入是 i, j 
Q.add(LeakyReLU(0.2))
Q.add(Dense(3)) # 因為輸出是 +-1
Q.compile(loss='mse',optimizer=Adam(1e-4), metrics=['accuracy']) # 輸出 a

def Qfunc(s):    
    return Q.predict(s)[0]


def Qupdate(s, a, v):    
    Y = Q.predict(s)
    Y[0][a] = v
    return Q.train_on_batch(s, Y)

def compute_s(observation):
    dx = (observation[58]+128)%256-128
    if dx>=0:
        return None
    ob = list(observation)
    return np.array([ob+[(v+128)%256-128 for v in ob]], dtype='float32')/255

In [None]:
%matplotlib inline
from matplotlib import pyplot as plt

很久，可以小跑一下看看

In [None]:
screen = W.Image()
display(screen)
txt =W.Text()
display(txt)
r = 0
γ = 1
ϵ = 0.1
rr= -1
rate = []
for j in range(101):
    if j%100==99:
        r=sum(Qlearn(test=True, T=40) for i in range(20))
        print(j, r/20)
        if r>=-2:
            break
    rr = rr*0.95 + 0.05*Qlearn(T=40)
    rate.append(rr)
    txt.value="j={} r={}".format(j,rr)
    plt.clf()
    plt.plot(rate)
    with BytesIO() as bio:
        plt.savefig(bio)
        screen.value = bio.getvalue()


之前 tabular 的 state, 用二次函數近似

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, LeakyReLU, LocallyConnected1D, Lambda
from keras.initializers import Constant, RandomNormal
from keras.activations import selu
from keras.optimizers import Adam, SGD, RMSprop
Q = Sequential()
Q.add(Dense(3, input_shape=(2,),
            kernel_initializer=RandomNormal(stddev=0.001) ))  # 輸入是 i, j 
Q.compile(loss='mse',optimizer=SGD(1e-3)) # 輸出 a

def Qfunc(s):
    X = np.array([[s[0][0], s[0][0]**2]])
    return Q.predict(X)[0]


def Qupdate(s, a, v):
    X = np.array([[s[0][0], s[0][0]**2]])
    Y = Q.predict(X)
    Y[0][a] = v
    return Q.train_on_batch(X, Y)


def compute_s(observation):
    dx = (observation[58]+127)%256-127
    if dx>=0:
        return None
    dy = (observation[56]+127)%256-127
    x,y0 = observation[[49,54]]
    y2 = observation[60]    
    y = (int(y0 - (186-x)*dy/dx)-44)%(326)
    if y>163:
        y=326-y
    y+=38
    s = (y-y2)/2
    return np.float32([[s/50]])


In [None]:
screen = W.Image()
display(screen)
txt =W.Text()
display(txt)
γ, ϵ, rr = 1, 3., -8
for j in range(1001):
    if j%100==99:
        r=sum(Qlearn(test=True, T=400) for i in range(20))
        print(j, r/20)
    rr = rr*0.95 + 0.05*Qlearn(T=400)
    txt.value="j={} r={} ϵ={}".format(j,rr, ϵ)
    plt.clf()
    Qvalue = np.array([Qfunc(np.float32([[i/50]])) for i in range(-50,50)])
    plt.plot(Qvalue[:,0], 'r')
    plt.plot(Qvalue[:,1], 'g')
    plt.plot(Qvalue[:,2], 'b')
    with BytesIO() as bio:
        plt.savefig(bio)
        screen.value = bio.getvalue()
    ϵ = max(0.1, ϵ*0.99)

In [None]:
# 測試
screen = W.Image()
display(screen)
Qlearn(test=True, screen=screen, T=400)

用常態分佈曲線來逼近

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, LeakyReLU, LocallyConnected1D, Lambda,Reshape
from keras.initializers import Constant, RandomNormal
from keras.activations import selu
from keras.optimizers import Adam, SGD, RMSprop
import keras.backend as K
Q = Sequential()
Q.add(Dense(3, input_shape=(2,) , use_bias=False))  # 輸入是 i, j 
Q.add(Lambda(lambda x: K.exp(x)))
Q.add(Reshape((3,1)))
Q.add(LocallyConnected1D(filters=1,kernel_size=1,
                         kernel_initializer="zeros"))
Q.add(Reshape( (3,) ))
Q.compile(loss='mse',optimizer=SGD(1e-3)) # 輸出 a
Q.layers[0].set_weights([np.array([[0,-1., 1.],[-3., -3., -3.]])])
Q.layers[3].set_weights([np.array([[[.11]],[[.1]],[[.1]]]), np.array([[-0.]]*3) ])
def Qfunc(s):
    X = np.array([[s[0][0], s[0][0]**2]])#, s[0][0]**3, s[0][0]**4]])
    return Q.predict(X)[0]


def Qupdate(s, a, v):
    X = np.array([[s[0][0], s[0][0]**2]])#, s[0][0]**3, s[0][0]**4]])
    Y = Q.predict(X)
    Y[0][a] = v
    return Q.train_on_batch(X, Y)


def compute_s(observation):
    dx = (observation[58]+127)%256-127
    if dx>=0:
        return None
    dy = (observation[56]+127)%256-127
    x,y0 = observation[[49,54]]
    y2 = observation[60]    
    y = (int(y0 - (186-x)*dy/dx)-44)%(326)
    if y>163:
        y=326-y
    y+=38
    s = (y-y2)/2
    return np.float32([[s/50]])


In [None]:
%matplotlib notebook
%matplotlib notebook
import matplotlib.pyplot as plt
plt.ion()
fig = plt.figure()
ax = fig.gca()
txt =W.Text()
display(txt)

In [None]:
γ, ϵ, rr = 1, .1, -8
for j in range(0, 1001):
    if j%100==99:
        r=sum(Qlearn(test=True, T=1400) for i in range(20))
        print("{} {}\n".format(j, r/20))
        if r>0:
            break
    rr = rr*0.95 + 0.05*Qlearn(T=1400)    
    txt.value="j={} r={} ϵ={}".format(j,rr, ϵ)
    ax.clear()
    Qvalue = np.array([Qfunc(np.float32([[i/50]])) for i in range(-50,50)])
    for i, c in enumerate("rgb"):
        ax.plot(Qvalue[:,i], c)
    fig.canvas.draw()


## 增加兩個新的 action
如果之前移動了，則這次不移動。


In [None]:
from time import sleep
from random import randint, random, shuffle, choice
actions2 = [0,2,3,4,5]

def Qlearn2(test=False, screen=None, T=40):
    observation = env.reset()
    for i in range(50):
        a = choice(actions2)
        observation, reward, done, info =env.step(a)
    s2 = compute_s(observation)
    last_a = a
    total_r = 0
    for i in range(T):
        s = s2
        if not test and random()< ϵ:
            a = choice(actions2)
        elif s is None:
            a = choice(actions2)
        else:
            a = actions2[np.argmax(Qfunc(s))]
        if a>=4:
            if last_a==a-2:
                observation, reward, done, info = env.step(0)
                last_a = 0
            else:
                observation, reward, done, info = env.step(a-2)
                last_a = a-2
        else:
            observation, reward, done, info = env.step(a)
            last_a = a
        s2 = compute_s(observation)
        r = reward
        total_r+=r
        if not test and s is not None:
            if s2 is None:
                r=1.
            if r:
                v = r
            else:
                v = γ*Qfunc(s2).max()
            a = max(0, a-1)
            Qupdate(s, a, v)
        if screen is not None:
            img = env.render(mode='rgb_array')
            screen.value = to_png(img)
            sleep(1/60)
    return total_r

模型也一樣，只是多了兩個輸出。

In [None]:
import numpy as np
from keras.models import Sequential
from keras.layers import Dense, LeakyReLU, LocallyConnected1D, Lambda,Reshape
from keras.initializers import Constant, RandomNormal
from keras.activations import selu
from keras.optimizers import Adam, SGD, RMSprop
import keras.backend as K
Q = Sequential()
Q.add(Dense(5, input_shape=(2,) , use_bias=False))  # 輸入是 i, j 
Q.add(Lambda(lambda x: K.exp(x)))
Q.add(Reshape((5,1)))
Q.add(LocallyConnected1D(filters=1,kernel_size=1,
                         kernel_initializer="zeros"))
Q.add(Reshape( (5,) ))
Q.compile(loss='mse',optimizer=SGD(1e-3)) # 輸出 a
Q.layers[0].set_weights([np.array([[0,-1., 1., -1, 1],
                                   [-12.,-3.,-3.,-6.,-6.]])])
Q.layers[3].set_weights([np.array([[[.16]],[[.1]],[[.1]],[[.15]],[[.15]]]), np.array([[-0.]]*5) ])
def Qfunc(s):
    X = np.array([[s[0][0], s[0][0]**2]])
    return Q.predict(X)[0]


def Qupdate(s, a, v):
    X = np.array([[s[0][0], s[0][0]**2]])
    Y = Q.predict(X)
    Y[0][a] = v
    return Q.train_on_batch(X, Y)


def compute_s(observation):
    dx = (observation[58]+127)%256-127
    if dx>=0:
        return None
    dy = (observation[56]+127)%256-127
    x,y0 = observation[[49,54]]
    y2 = observation[60]    
    y = (int(y0 - (186-x)*dy/dx)-44)%(326)
    if y>163:
        y=326-y
    y+=38
    s = (y-y2)/2
    return np.float32([[s/50]])


In [None]:
%matplotlib notebook
%matplotlib notebook
import matplotlib.pyplot as plt
plt.ion()
fig = plt.figure()
ax = plt.gca()
txt =W.Text()
display(txt)

In [None]:
γ, ϵ, rr = 1, .1, -8
for j in range(0, 1001):
    if j%100==99:
        r=sum(Qlearn2(test=True, T=1400) for i in range(20))
        print("{} {}\n".format(j, r/20))
        if r>0:
            break
    rr = rr*0.95 + 0.05*Qlearn2(T=1400)    
    txt.value="j={} r={} ϵ={}".format(j,rr, ϵ)
    ax.clear()
    Qvalue = np.array([Qfunc(np.float32([[i/50]])) for i in range(-50,50)])
    for i, c in enumerate("rgbyk"):
        ax.plot(Qvalue[:,i], c)
    fig.canvas.draw()

In [None]:
from keras.models import load_model
Q = load_model('Qlearn2_function.h5')

In [None]:
# 測試
screen = W.Image()
display(screen)
Qlearn2(test=True, screen=screen, T=1400)

In [None]:
#Q.save('Qlearn2_function.h5')