# 麻雀テンパイ予測 with Vision Transformer

**Author:** 俺様<br>
**Date created:** 2021/08/26<br>
**Last modified:** 2022/01/09<br>
**Description:** 

## Setup

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
pip install -U tensorflow-addons

In [None]:
pip install optuna

In [None]:
import numpy as np
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import tensorflow_addons as tfa
import codecs, copy
import time
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report, roc_curve, auc
import copy
from keras.layers import Lambda
import optuna



input_path = '/content/drive/MyDrive/experiment/learning_data/tfrecord/data_train_2013_MjT_tnsp'
output_path = '/content/drive/MyDrive/experiment/learning_data/tfrecord/data_train_2013_MjT_tnsp_24_36_1_'
output_path2 = '/content/drive/MyDrive/experiment/learning_data/tfrecord/data_test_2015_MjT_tnsp_24_36_1_'
weights_path = '/content/drive/MyDrive/experiment/backup/hitori_T.ckpt.data-00000-of-00001'

## Prepare the data

In [None]:
num_classes = 1
# input_shape = (25,36,1)#36:lichi 37:目標指定
input_shape = (24,36,1)#36:lichi 37:目標指定





## Configure the hyperparameters

In [None]:
learning_rate = 0.001
weight_decay = 0.0001
batch_size = 256

# image_size = 72  # We'll resize input images to this size
# patch_size = 6  # Size of the patches to be extract from the input images
num_patches = 24
projection_dim = 104
num_heads = 5
transformer_layers = 8
transformer_units = [
    projection_dim * 2,
    projection_dim,
]  # Size of the transformer layers

mlp_head_units = [projection_dim, projection_dim//2]  # Size of the dense layers of the final classifier


## Implement multilayer perceptron (MLP)

In [None]:

def mlp(x, hidden_units, dropout_rate):
    for units in hidden_units:
        x = layers.Dense(units, activation=tf.nn.gelu)(x)
        x = layers.Dropout(dropout_rate)(x)
    return x


## Implement patch creation as a layer

In [None]:

class Patches(layers.Layer):
    def __init__(self):
        super(Patches, self).__init__()
        

    def call(self, images):
        batch_size = tf.shape(images)[0]
        patches = tf.image.extract_patches(
            images=images,
            sizes=[1, 1, 36, 1],
            strides=[1, 1, 1, 1],
            rates=[1, 1, 1, 1],
            padding="VALID",
        )
        patch_dims = patches.shape[-1]
        patches = tf.reshape(patches, [batch_size, -1, patch_dims])
        return patches


## Implement the patch encoding layer

The `PatchEncoder` layer will linearly transform a patch by projecting it into a
vector of size `projection_dim`. In addition, it adds a learnable position
embedding to the projected vector.

In [None]:

class PatchEncoder(layers.Layer):
    def __init__(self, num_patches, projection_dim):
        super(PatchEncoder, self).__init__()
        self.num_patches = num_patches
        self.projection = layers.Dense(units=projection_dim)
        self.position_embedding = layers.Embedding(
            input_dim=num_patches, output_dim=projection_dim
        )

    def call(self, patch):
        positions = tf.range(start=0, limit=self.num_patches, delta=1)
        encoded = self.projection(patch) + self.position_embedding(positions)
        return encoded


## Build the ViT model



In [None]:

def create_vit_classifier():
    inputs = layers.Input(shape=input_shape)
    # Augment data.
    # augmented = data_augmentation(inputs)
    # Create patches.
    patches = Patches()(inputs)
    # Encode patches.
    encoded_patches = PatchEncoder(num_patches, projection_dim)(patches)

    # Create multiple layers of the Transformer block.
    for _ in range(transformer_layers):
        # Layer normalization 1.
        x1 = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
        # Create a multi-head attention layer.
        attention_output = layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=projection_dim, dropout=0.1
        )(x1, x1)
        # Skip connection 1.
        x2 = layers.Add()([attention_output, encoded_patches])
        # Layer normalization 2.
        x3 = layers.LayerNormalization(epsilon=1e-6)(x2)
        # MLP.
        x3 = mlp(x3, hidden_units=[projection_dim * 2,projection_dim,], dropout_rate=0.1)
        # Skip connection 2.
        encoded_patches = layers.Add()([x3, x2])

    # Create a [batch_size, projection_dim] tensor.
    # encoded_patches = Lambda(lambda a: a[:,:1], input_shape=[None, 25,16])(encoded_patches)
    representation = layers.LayerNormalization(epsilon=1e-6)(encoded_patches)
    representation = layers.Flatten()(representation)
    representation = layers.Dropout(0.15)(representation)
    # Add MLP.
    features = mlp(representation, hidden_units= [projection_dim*8, projection_dim*4], dropout_rate=0.15)
    # Classify outputs.
    logits = layers.Dense(num_classes,activation='sigmoid')(features)
    # Create the Keras model.
    model = keras.Model(inputs=inputs, outputs=logits)
    return model


In [None]:
!nvidia-smi

Fri Dec 10 04:14:01 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.44       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   39C    P0    26W / 250W |      0MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

In [None]:
def deserialize_example(serialized_string):
    image_feature_description = {
        'x': tf.io.FixedLenFeature([], tf.string),
        'y1': tf.io.FixedLenFeature([], tf.string),
    }
    example = tf.io.parse_single_example(serialized_string, image_feature_description)
    image = tf.reshape(tf.io.decode_raw(example["x"], tf.int32), (24, 36, 1))
    label = tf.io.decode_raw(example["y1"], tf.int32)
    return image, label

## Compile, train, and evaluate the mode

In [None]:

def run_experiment(model):
    
    
    optimizer = tf.optimizers.Adam(
        learning_rate=learning_rate
        # , decay=0.000003
    )

    model.compile(
        optimizer=optimizer,
        loss='binary_crossentropy',
        metrics=[
            keras.metrics.binary_accuracy
            # keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"),
        ],
    )
    model.summary()


    # checkpoint_filepath = "/content/drive/MyDrive/experiment/model/checkpoint"
    # checkpoint_callback = keras.callbacks.ModelCheckpoint(
    #     checkpoint_filepath,
    #     monitor="val_accuracy",
    #     save_best_only=True,
    #     save_weights_only=True,
    # )
    t=0
    max = 0
    epochs = 50
    trainloss_list=[]
    trainacc_list=[]
    valloss_list=[]
    valacc_list=[]
    testloss_list=[]
    testacc_list=[]
    x_predict=[]
    # trainpart = [2,3,4]
    # testpart = 1

    while t < epochs:
      for i in range(10):
        dataset = tf.data.TFRecordDataset(output_path + str(i+1)+'.tfrecords').map(deserialize_example).batch(100000)

        for data in dataset:
          # head = np.zeros([len(data[0]),1,36,1])
          # x_head = np.concatenate([head, data[0]],axis=1)
          
  
           
          history = model.fit(
              # x=x_head,
              x=data[0],
              y=data[1],
              epochs=1,
              verbose=2,
              class_weight={0:1 , 1:1.2 },
              # validation_split=0.1
              # callbacks=[checkpoint_callback],
          )
          trainloss_list.append(history.history['loss'])
          trainacc_list.append(history.history['binary_accuracy'])
          # valloss_list.append(history.history['val_loss'])
          # valacc_list.append(history.history['val_binary_accuracy'])
      t += 1
      # model.save_weights("/content/drive/MyDrive/experiment/model/checkpoint/hitoriforclasstoken.ckpt"+str(t))
     
      # testtesttestx=np.empty((0,25,36,1), int)
      testtesttestx=np.empty((0,24,36,1), int)
      testtesttesty=np.empty((0,1), int)

      playernum=np.empty((0,1), int)
      lichilichi=np.empty((0,1), int)
      sutelenlen=np.empty((0,1), int)
      for i in range(2):
        testdataset = tf.data.TFRecordDataset(output_path2 + str(i+1)+'.tfrecords').map(deserialize_example).batch(100000)

        for data in testdataset:
            # head = np.zeros([len(data[0]),1,36,1])
            # x_head = np.concatenate([head, data[0]],axis=1)
            # testtesttestx = np.concatenate([testtesttestx, data[0]],axis=0)

            # testtesttestx = np.append(testtesttestx,x_head,axis=0)
            testtesttestx = np.append(testtesttestx,data[0],axis=0)
            testtesttesty = np.concatenate([testtesttesty, data[1]])
 
            
        # score = model.evaluate(testtesttestx, [testtesttesty1,testtesttesty2,testtesttesty3,testtesttesty4])
        # print(score)
      score = model.evaluate(x=testtesttestx,y=testtesttesty,verbose=0)
      print(score)
      testloss_list.append(score[0])
      testacc_list.append(score[1])
      result = model.predict(testtesttestx)
          # fpr, tpr, thresholds = roc_curve(y_test, result)
      result_1 = [int((s+0.5)//1) for s in result]
      report = classification_report(testtesttesty, result_1, digits=4,output_dict=True)
      print(report)
      weightedf1 = report['1']['f1-score']
      print('epoch:'+str(t)+' '+str(weightedf1))
      if weightedf1 > max:
        max = weightedf1
        model.save_weights("./checkpoint/hitori_T.ckpt")
    fig1=plt.figure     
    plt.plot(trainacc_list)
    plt.plot(testacc_list)    
    plt.title('acc')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test',], loc='upper left')
    fig1.savefig("./acc.png")

    fig2=plt.figure
    plt.plot(trainloss_list)
    plt.plot(testloss_list)
    plt.title('loss')
    plt.xlabel('Epoch')
    plt.legend(['Train', 'Test'], loc='upper left')
    fig1.savefig("./loss.png")


    # plt.plot(testacc_list)
    # plt.title('testacc')
    # plt.xlabel('Epoch')
    # plt.show()

    # plt.plot(testloss_list)
    # plt.title('testloss')
    # plt.xlabel('Epoch')
    # plt.show()



    return -max

def test(model):
    optimizer = tf.optimizers.Adam(
          learning_rate=learning_rate
          # , decay=0.000003
      )

    model.compile(
        optimizer=optimizer,
        loss='binary_crossentropy',
        metrics=[
            keras.metrics.binary_accuracy
            # keras.metrics.SparseTopKCategoricalAccuracy(5, name="top-5-accuracy"),
        ],
    )
    model.summary()
    model.load_weights(weights_path)
    max = 0
    for i in range(2):
        testdataset = tf.data.TFRecordDataset(output_path2 + str(i+1)+'.tfrecords').map(deserialize_example).batch(100000)

        for data in testdataset:
            # head = np.zeros([len(data[0]),1,36,1])
            # x_head = np.concatenate([head, data[0]],axis=1)
            # testtesttestx = np.concatenate([testtesttestx, data[0]],axis=0)

            # testtesttestx = np.append(testtesttestx,x_head,axis=0)
            testtesttestx = np.append(testtesttestx,data[0],axis=0)
            testtesttesty = np.concatenate([testtesttesty, data[1]])
 
            
        # score = model.evaluate(testtesttestx, [testtesttesty1,testtesttesty2,testtesttesty3,testtesttesty4])
        # print(score)
    score = model.evaluate(x=testtesttestx,y=testtesttesty,verbose=0)
    print(score)
    result = model.predict(testtesttestx)
        # fpr, tpr, thresholds = roc_curve(y_test, result)
    result_1 = [int((s+0.5)//1) for s in result]
    report = classification_report(testtesttesty, result_1, digits=4,output_dict=True)
    print(report)
    weightedf1 = report['1']['f1-score']
    
    


def objective(trial): # 引数 (trial) はTrial型の値
    global num_heads 
    global projection_dim     
    global transformer_layers 
    x_numhead = trial.suggest_int("num_head", 1, 12)
    x_dim = trial.suggest_int("dim", 16,128, 8)
    x_numlayers = trial.suggest_int("num_layers", 1, 12)
    num_heads = x_numhead
    #num_heads = 1
    projection_dim = x_dim
    transformer_layers = x_numlayers
    #transformer_layers = 1
    # print(num_heads)
    # print(projection_dim)    
    # print(transformer_layers)
    vit_classifier = create_vit_classifier()
    wf1 = run_experiment(vit_classifier)
    
    return wf1
    
def start():
    vit_classifier = create_vit_classifier()
    wf1 = run_experiment(vit_classifier)
    return wf1
#study = optuna.create_study(direction="minimize")
#study.optimize(objective, # 目的関数
#               n_trials=50 # トライアル数
#              )

stttttt = start()
