# Projeto 1: Rock Paper Scissors

- Descrição e arquivos originais disponíveis no site da [FreeCodeCamp](https://www.freecodecamp.org/learn/machine-learning-with-python/machine-learning-with-python-projects/rock-paper-scissors).

- Objetivo: vencer quatro robôs em mais de 60% das partidas de pedra papel e tesoura.
    - Regras do jogo: para cada jogada, ganha o complemento.
        - Se `'R'`, então `'P'`;
        - Se `'P'`, então `'S'`;
        - Se `'S'`, então `'R'`.

- Método: treinar uma rede neural recorrente capaz de identificar o padrão de cada robô e adotar a estratégia correta para vencê-lo;
    - Inicialmente, realizar jogadas aleatórias enquanto alimenta a RNN.
    - Quando o número de jogadas for satisfatório, fazer uma previsão com a RNN.
    - Com o robô identificado, então selecionar a estratégia adequada e derrotá-lo pelo restante de jogadas que faltam.

## Padrões de Jogo

- `quincy` sempre joga a sequência `["R", "R", "P", "P", "S"]`.

- `mrugesh` lê e atualiza o histórico de jogadas do oponente. Das 10 últimas jogadas, ele escolhe o movimento contra a mais frequente. Caso seja o primeio jogo, ele escolhe `'S'`.

- `kris` lê a última jogada do oponente e joga contra ela. Caso seja o primeiro jogo então joga `'R'`.

- `abey` lê e atualiza o histórico de jogadas do oponente como o `mrugesh` faz, porém ele escolhe o movimento primeiro determinando qual combinação de duas jogadas foi a mais frequente e em seguida joga contra a última dessa dupla.

- A estratégia para derrotá-los é basicamente a mesma: aplicar o mesmo algoritimo de cada um lendo o histórico do próprio jogador e escolher a jogada que derrote o que for instruído ao algoritimo do robô.

## Pré-processamento

- No problema, se jogará 1000 rodadas por robô, totalizando 4000 jogadas.
    - Em treino, se usará sequências de $N = 400$ jogadas.

- Para o treino e validação, se gerará um banco de dados ordens de grandeza maior, com $20 \times 10^6$ jogadas.

- Há duas variáveis: jogada e robô. Aplicando o one-hot encoding se obtém $1+4=5$ variáveis numéricas por observação.

- O número de sequências ao fim do pré-processamento é $4 \times 5 \times 10^6 / N = 50 \times 10^3$, ou $5 \times 10^6 / N = 12,5 \times 10^3$ por robô.

- Como a identidade do robô é a variável de interesse, então deve-se sortear $N$ jogadas do mesmo robô de forma que não haja correlação entre um lote e outro.

- Os exemplos serão gerados com jogadas aleatórias contra os robôs, garantindo que o padrão observado venha das estratégias de jogo e não do conjunto de valores da variável independente.

- A geração dos bancos de dados se deu usando os arquivos `generate_ETL_plays.py` e `generate_ETL_plays_validation.py`. Aqui a discussão se retém a observar o padrão de cada robô.

In [None]:
from RPS_game import mrugesh, abbey, quincy, kris, random_player

def Play_List(player1, player2, num_games):
    p1_prev_play = ""
    p2_prev_play = ""
    bot_history = []
    
    for _ in range(num_games):
        p1_play = player1(p2_prev_play)
        p2_play = player2(p1_prev_play)

        bot_history.append(p2_play)

        p1_prev_play = p1_play
        p2_prev_play = p2_play
    
    return bot_history

num_games = int(5e6)

rand_mrugesh = Play_List(random_player, mrugesh, num_games)
rand_abbey = Play_List(random_player, abbey, num_games)
rand_quincy = Play_List(random_player, quincy, num_games)
rand_kris = Play_List(random_player, kris, num_games)

In [None]:
import pandas as pd

out = [
    pd.DataFrame(rand_mrugesh, columns = ['play']),
    pd.DataFrame(rand_abbey, columns = ['play']),
    pd.DataFrame(rand_quincy, columns = ['play']),
    pd.DataFrame(rand_kris, columns = ['play'])
]
bots = ['mrugesh', 'abbey', 'quincy', 'kris']
play_encoding = lambda e: 0 if e == 'R' else (1 if e == 'P' else 2)

for i in range(len(bots)):
    out[i] = out[i].applymap(play_encoding)
    out[i]['bot'] = bots[i]

db_plays = pd.concat(out, axis = 0)
db_plays = pd.concat([db_plays, pd.get_dummies(db_plays['bot'])], axis = 1)\
    .drop('bot', axis = 1)\
    .reset_index(drop = True)

In [None]:
db_plays

Unnamed: 0,play,abbey,kris,mrugesh,quincy
0,0,0,0,1,0
1,0,0,0,1,0
2,0,0,0,1,0
3,1,0,0,1,0
4,0,0,0,1,0
...,...,...,...,...,...
399995,1,0,1,0,0
399996,0,0,1,0,0
399997,1,0,1,0,0
399998,0,0,1,0,0


- Como boa prática, primeiro observamos como se distribuem as jogadas de cada robô.

- Como há diferenças nas estatísticas de cada robô, então deve ser possível aprender o padrão de jogada. Porém, como o desvio padrão é alto, se espera que seja necessária uma grande quantidade de entradas para o modelo corretamente catalogar o robô.

In [None]:
bots = ['mrugesh', 'abbey', 'quincy', 'kris']
100 * db_plays.loc[db_plays['abbey'] == 1, 'play'].value_counts() / (db_plays.shape[0] / len(bots))

0    46.745
2    41.269
1    11.986
Name: play, dtype: float64

In [None]:
100 * db_plays.loc[db_plays['kris'] == 1, 'play'].value_counts() / (db_plays.shape[0] / len(bots))

1    33.543
2    33.315
0    33.142
Name: play, dtype: float64

In [None]:
100 * db_plays.loc[db_plays['quincy'] == 1, 'play'].value_counts() / (db_plays.shape[0] / len(bots))

0    40.0
1    40.0
2    20.0
Name: play, dtype: float64

In [None]:
100 * db_plays.loc[db_plays['mrugesh'] == 1, 'play'].value_counts() / (db_plays.shape[0] / len(bots))

2    34.220
1    33.517
0    32.263
Name: play, dtype: float64

- Divide-se o banco de dados em examples e label para treinamento e teste de forma a garantir a correlação entre a sequência de jogadas e o robô jogador.

- Enfim, salva-se os bancos de dados em formato CSV pela conveniência.

In [None]:
import numpy as np
import os
from google.colab import drive
drive.mount('/content/drive')

colab = True
if colab:
    path = '/content/drive/MyDrive/Colab Notebooks'
else:
    path = os.getcwd()
bots = ['mrugesh', 'abbey', 'quincy', 'kris']
sequence_length = int(1e4)

examples = pd.DataFrame(db_plays['play'].values.reshape((db_plays.shape[0] // sequence_length, sequence_length)))
labels = db_plays.loc[::sequence_length, bots]

np.random.seed(666)
examples = examples.reindex(np.random.permutation(examples.index))
labels = labels.reindex(np.random.permutation(labels.index))

partitions = [
    int(0.9 * examples.shape[0]),
    examples.shape[0]
]
db_filenames = ['db_examples_train.csv', 'db_examples_test.csv', 'db_labels_train.csv', 'db_labels_test.csv']
dbs = [
    examples.iloc[:partitions[0]].reset_index(drop = True),
    examples.iloc[partitions[0]:partitions[1]].reset_index(drop = True),
    labels.iloc[:partitions[0]].reset_index(drop = True),
    labels.iloc[partitions[0]:partitions[1]].reset_index(drop = True),
]

for i in range(len(db_filenames)):
    dbs[i].to_csv(os.path.join(path, db_filenames[i]))

- O modelo será validado contra um $N$ diferente do treinamento, avaliando se o modelo é capaz de fazer previsões no contexto do problema.
    - Como se requer pelo menos 60% de acertos em 1000 partidas, se fará $N = 399$.

In [None]:
from RPS_game import mrugesh, abbey, quincy, kris, random_player
import pandas as pd
import numpy as np
import os
from google.colab import drive
drive.mount('/content/drive')

colab = True
if colab:
    path = '/content/drive/MyDrive/Colab Notebooks'
else:
    path = os.getcwd()

def Play_List(player1, player2, num_games):
    p1_prev_play = ""
    p2_prev_play = ""
    bot_history = []
    
    for _ in range(num_games):
        p1_play = player1(p2_prev_play)
        p2_play = player2(p1_prev_play)

        bot_history.append(p2_play)

        p1_prev_play = p1_play
        p2_prev_play = p2_play
    
    return bot_history

num_games = int(399)
rand_mrugesh = Play_List(random_player, mrugesh, num_games)
rand_abbey = Play_List(random_player, abbey, num_games)
rand_quincy = Play_List(random_player, quincy, num_games)
rand_kris = Play_List(random_player, kris, num_games)

out = [
    pd.DataFrame(rand_mrugesh, columns = ['play']),
    pd.DataFrame(rand_abbey, columns = ['play']),
    pd.DataFrame(rand_quincy, columns = ['play']),
    pd.DataFrame(rand_kris, columns = ['play'])
]
bots = ['mrugesh', 'abbey', 'quincy', 'kris']
play_encoding = lambda e: 0 if e == 'R' else (1 if e == 'P' else 2)

for i in range(len(bots)):
    out[i] = out[i].applymap(play_encoding)
    out[i]['bot'] = bots[i]

db_plays = pd.concat(out, axis = 0)
db_plays = pd.concat([db_plays, pd.get_dummies(db_plays['bot'])], axis = 1)\
    .drop('bot', axis = 1)\
    .reset_index(drop = True)
    
examples = pd.DataFrame(db_plays['play'].values.reshape((4, num_games)))
labels = db_plays.loc[::num_games, bots]

np.random.seed(666)
examples = examples.reindex(np.random.permutation(examples.index))
labels = labels.reindex(np.random.permutation(labels.index))

examples.to_csv(os.path.join(path, 'db_examples_validation.csv'))
labels.to_csv(os.path.join(path, 'db_labels_validation.csv'))

- Enfim, se transforma o banco de dados num tensor dividido em lotes.

In [None]:
import tensorflow as tf
import pandas as pd
import os
from google.colab import drive
drive.mount('/content/drive')

colab = True
if colab:
    path = '/content/drive/MyDrive/Colab Notebooks'
else:
    path = os.getcwd()

batch_size = 100

TFd_plays_train = tf.data.Dataset.from_tensor_slices((
    pd.read_csv(os.path.join(path, 'db_examples_train.csv')),
    pd.read_csv(os.path.join(path, 'db_labels_train.csv'))
))\
    .batch(batch_size, drop_remainder = True)

In [None]:
TFd_plays_train.element_spec

(TensorSpec(shape=(1000, 100), dtype=tf.int64, name=None),
 TensorSpec(shape=(1000, 4), dtype=tf.uint8, name=None))

## Construção, Treinamento, Teste e Validação da RNN

- É um problema parecido com o de previsão de caractere abordado anteriormente, porém a última camada deve ser de classificação (qual é o robô?).
    - Variável independente (feature): do tipo categórica, diz qual a jogada do robô.
    - Variável dependente (label): do tipo categórica, diz qual robô fez a jogada. Aqui se trata os dados com one-hot encoding.
    - Assim como o problema anterior, o treino do modelo será com uma quantidade de jogadas $N$ (timesteps) e a previsão com um $N$ menor.

- Entrada do modelo: uma lista de dimensão $(N_{lote}, N, 1)$ com as $N$ últimas jogadas do robô em lotes com $N_{lote}$ sequências.
    - A entrada é gerada a partir do histórico de um robô contra um oponente que toma decições aleatórias.
    - Essa lista tem tamanho arbitrário, porém $N > 20$ de forma a capturar os padrões tanto do `mrugesh` quanto do `abey`.
    - Se escolhe treinar com $N = 400$ e validar o modelo com $N = 399$, de forma a cumprir o mínimo de 60% de vitórias. 

- Saída do modelo: uma lista de números com dimensão $(N_{lote}, 4)$ dando a distribuição de probabilidades para cada robô.

- Topologia:
    - Uma camada LSTM bidirecional de 128 unidades cada, com entrada $(N_{lote}, N, 1)$ e saída de $(N_{lote}, 2 \times 128)$ dimensionais.
        - Como cada entrada do lote é independente, o estado da camada deve ser reinicializado (`stateful = False`).
        - O padrão a ser detectado é sensível ao contexto, por isso essa camada será bidirecional.
    - Uma camada densa com saída de 4 dimensões (número de tipos de robô).

- Como a variável dependente é categórica codificada com one-hot, se escolheu a função de entropia cruzada (categorical crossentropy) para ser minimizada e como métrica o score F, que otimiza precisão e revocação.

- Saídas negativas e maiores que um não fazem sentido na camada densa, por essa razão se escolheu a função de ativação softmax.

In [None]:
def Build_Model (seq_length, rnn_units):
    return tf.keras.Sequential([
        tf.keras.layers.Bidirectional(
            tf.keras.layers.LSTM(
                rnn_units,
                stateful = False
            ),
            batch_input_shape = (None, seq_length, 1)
        ),
        tf.keras.layers.Dense(4, activation = 'softmax')# 'mrugesh', 'abbey', 'quincy' ou 'kris'
    ])

In [None]:
import tensorflow as tf
import tensorflow_addons as tfa
import os
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')

batch_size = 100
sequence_length = 400
hidden_lstm_units = 2 * 64

colab = True
if colab:
    path = '/content/drive/MyDrive/Colab Notebooks/'
else:
    path = os.getcwd()

db_train = pd.read_csv(os.path.join(path, 'db_train.csv')).drop(columns = 'Unnamed: 0')
labels_train = pd.get_dummies(db_train['bot'])
features_train = db_train.drop(columns = 'bot')

TFd_plays_train = tf.data.Dataset.from_tensor_slices((
    features_train,
    labels_train
))\
    .batch(batch_size, drop_remainder = True)

model = Build_Model(sequence_length, hidden_lstm_units)
model.reset_states()
model.compile(
    loss = 'categorical_crossentropy',
    metrics = tfa.metrics.F1Score(num_classes = 4, average = 'macro')
)
model.summary()

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
Model: "sequential_6"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 bidirectional_6 (Bidirectio  (None, 256)              133120    
 nal)                                                            
                                                                 
 dense_6 (Dense)             (None, 4)                 1028      
                                                                 
Total params: 134,148
Trainable params: 134,148
Non-trainable params: 0
_________________________________________________________________


In [None]:
number_epochs = 20
ckpt_directory = 'training_ckpts_3'
colab = True
if colab:
    path = '/content/drive/MyDrive/Colab Notebooks/'
else:
    path = os.getcwd()

continuation_job = False
if continuation_job:
    model.load_weights(tf.train.latest_checkpoint(os.path.join(path + ckpt_directory)))

ckpt_path = os.path.join(path, ckpt_directory, 'ckpt_{epoch}')
ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath = ckpt_path, save_weights_only = True)

history = model.fit(
    TFd_plays_train,
    epochs = number_epochs,
    callbacks = [ckpt_callback]
)
model.save_weights(os.path.join(path, 'trained_model'))

Epoch 1/20
Epoch 2/20
Epoch 3/20
Epoch 4/20
Epoch 5/20
Epoch 6/20
Epoch 7/20
Epoch 8/20
Epoch 9/20
Epoch 10/20
 16/450 [>.............................] - ETA: 1:16 - loss: 1.0815 - f1_score: 0.7281

KeyboardInterrupt: ignored

- O treinamento foi interrompido quando se atingiu o `f1_score` desejado.

In [None]:
model.save_weights(os.path.join(path, 'trained_model'))

- Avalia-se o desempenho desse novo modelo.

In [None]:
import tensorflow as tf
import tensorflow_addons as tfa
import os
import numpy as np
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')

colab = True
if colab:
    path = '/content/drive/MyDrive/Colab Notebooks/'
else:
    path = os.getcwd()

db_test = pd.read_csv(os.path.join(path, 'db_test.csv')).drop(columns = 'Unnamed: 0')
labels_test = pd.get_dummies(db_test['bot'])
features_test = db_test.drop(columns = 'bot')

TFd_plays_test = tf.data.Dataset.from_tensor_slices((
    features_test,
    labels_test
))\
    .batch(batch_size, drop_remainder = True)

sequence_length = 400
batch_size = 100
hidden_lstm_units = 2 * 64

model = Build_Model(sequence_length, batch_size, hidden_lstm_units)
model.reset_states()
model.load_weights(os.path.join(path, 'trained_model'))
model.compile(
    loss = 'categorical_crossentropy',
    metrics = tfa.metrics.F1Score(num_classes = 4, average = 'macro')
)
model.evaluate(TFd_plays_test)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


[0.26428523659706116, 0.8689265251159668]

- O modelo então é validado para $N = 399$. Apesar do score F elevado, ainda parece ter certa dificuldade de distinguir as jogadas de abbey e kris. No entando, isso não parece tão preocupante no contexto do problema.

In [None]:
import tensorflow as tf
import tensorflow_addons as tfa
import os
import numpy as np
import pandas as pd
from google.colab import drive
drive.mount('/content/drive')

colab = True
if colab:
    path = '/content/drive/MyDrive/Colab Notebooks/'
else:
    path = os.getcwd()

bots = ['mrugesh', 'abbey', 'quincy', 'kris']
db_validation = pd.read_csv(os.path.join(path, 'db_validation.csv')).drop(columns = 'Unnamed: 0')
labels_validation = pd.get_dummies(db_validation['bot'])
features_validation = db_validation.drop(columns = 'bot')

TFd_plays_validation = tf.data.Dataset.from_tensor_slices((
    features_validation,
    labels_validation
))\
    .batch(1, drop_remainder = True)

sequence_length = 399
batch_size = 1
hidden_lstm_units = 2 * 64

model = Build_Model(sequence_length, batch_size, hidden_lstm_units)
model.reset_states()
model.load_weights(os.path.join(path, 'trained_model'))
model.compile(
    loss = 'categorical_crossentropy',
    metrics = tfa.metrics.F1Score(num_classes = 4, average = 'macro')
)
prediction = model.predict(TFd_plays_validation)

for i in range(len(bots)):
    print(f'{bots[prediction[i].argmax()]: <7} ({bots[labels_validation.to_numpy()[i].argmax()]})')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
kris    (kris)
mrugesh (mrugesh)
kris    (abbey)
quincy  (quincy)


In [None]:
prediction[2]

array([1.0359727e-01, 3.1019419e-01, 2.0586385e-11, 5.8620852e-01],
      dtype=float32)

# Estratégias de Jogo

- Uma vez validado, então se implementa na solução do problema.

- Como a estratégia de jogo dos robôs é conhecida, implementa-se uma contra-estratégia assumindo que se sabe quem é o oponente.

## Abbey

- O robô abbey escolhe a sua jogada observando o histórico do jogador: há o registro da frequência de duplas e, baseado nas combinações possíveis com a última jogada, escolhe aquela que apareceu mais vezes no histórico do jogador.

- Contra esse robô, se implementa a mesma estratégia de análise de frequência histórica porém, ao invés de se observar o histórico de abbey, se observa o próprio histórico. Dessa forma, sempre se saberá qual a próxima jogada do abbey, bastando modificar os valores no dicionário `ideal_response`.

In [None]:
def Counter_Abbey(opponent_history=[],
            own_history=[],
            num_plays=0,
            play_order=[{
                "RR": 0,
                "RP": 0,
                "RS": 0,
                "PR": 0,
                "PP": 0,
                "PS": 0,
                "SR": 0,
                "SP": 0,
                "SS": 0,
            }]):

    if len(own_history) == num_plays:
        # Build statistics at first call to correctly apply abbey rules
        for i in range(1, len(own_history) - 1):
            play_pairs = ''.join(own_history[i - 1:i + 1])
            play_order[0][play_pairs] += 1

    # Proceed to play on abbey rules observing own plays
    ideal_response = {'P': 'R', 'R': 'S', 'S': 'P'}
    last_played = own_history[-1]
    last_two = ''.join(own_history[-2:])
    play_order[0][last_two] += 1

    potential_plays = [
        last_played + "R",
        last_played + "P",
        last_played + "S",
    ]

    sub_order = {k: play_order[0][k] for k in potential_plays if k in play_order[0]}

    prediction = max(sub_order, key=sub_order.get)[-1:]

    return ideal_response[prediction]

## Kris

- A estratégia do kris é simples: ele observa a última jogada do oponente e escolhe contra ela.

- Contra esse robô então basta modificar os valores no dicionário `ideal_response`.

In [None]:
def Counter_Kris(prev_opponent_play):
    # Pick the play which counter kris's pick on its ideal_response
    ideal_response = {'P': 'R', 'R': 'S', 'S': 'P'}

    # Observe own's last play
    return ideal_response[prev_opponent_play]

## Quincy

- Quincy sempre joga a mesma sequência de cinco escolhas;

- Contra esse robô basta acompanhar em qual índice da lista ele está com a operação $\mod(\text{tamanho do histórico},\, \text{tamanho de \textit{choices}})$.

In [None]:
def Counter_Quincy(counter=[0]):
    # Count the same way quincy counts
    # and pick from a table which wins
    # every play at the quincy's table
    choices = ['P', 'P', 'S', 'S', 'R']

    counter[0] += 1
    
    return choices[counter[0] % len(choices)]

## Mrugesh

- Mrugesh observa as frequências das últimas dez jogadas do oponente e escolhe a opção contra a jogada que apareceu mais vezes.

- Assim como feito contra o abbey, basta observar o próprio histórico e modificar a escolha do dicionário `ideal_response`.

In [None]:
def Counter_Mrugesh(opponent_history=[]):
    # Pick the play which counter mrugesh's pick on its ideal_response
    ideal_response = {'P': 'R', 'R': 'S', 'S': 'P'}
    
    # Measure own history's statistics
    last_ten = opponent_history[-10:]
    most_frequent = max(set(last_ten), key=last_ten.count)

    return ideal_response[most_frequent]