In [58]:
import numpy as np
import gym
import gym_pen
import os

def read_data(data_file):
    pen_mode = 0
    index = -1
    all_index_array = []
    index_array = []
    for line in data_file:
        if line[0] == '.':
            if line == '.PEN_DOWN\n':
                if index != -1:
                    all_index_array.append(index_array)
                index_array = []
                index += 1
                pen_mode = 0
            elif line == '.PEN_UP\n':
                pen_mode = 0
            else:
                pass
        elif line == '\n':
            pass
        else:
            try:            
                index_array.append(line.split())
                pen_mode = 1
            except:
                pass
    
    return all_index_array

def scale_data(xy_array, shape):
    min_x = np.min(xy_array[:, 0])
    max_x = np.max(xy_array[:, 0])
    min_y = np.min(xy_array[:, 1])
    max_y = np.max(xy_array[:, 1])

    xy_array_scaled = np.copy(xy_array)
    xy_array_scaled[:, 0] = (shape[0] - 1) * (xy_array[:, 0] - min_x) / (max_x - min_x)
    xy_array_scaled[:, 1] = (shape[1] - 1) * (xy_array[:, 1] - min_y) / (max_y - min_y)

    return xy_array_scaled

def bezier_curve(control_points, num_points=100):
    n = len(control_points) - 1
    t = np.linspace(0, 1, num_points)
    
    def binomial_coefficient(n, k):
        return np.math.factorial(n) / (np.math.factorial(k) * np.math.factorial(n - k))
    
    def bernstein_poly(i, n, t):
        return binomial_coefficient(n, i) * (t ** i) * ((1 - t) ** (n - i))
    
    curve_points = np.zeros((num_points, 2))
    for i in range(num_points):
        curve_points[i, 0] = sum(bernstein_poly(j, n, t[i]) * control_points[j][0] for j in range(n + 1))
        curve_points[i, 1] = sum(bernstein_poly(j, n, t[i]) * control_points[j][1] for j in range(n + 1))
    
    return curve_points

def add_third_value(data):
    result_data = []
    n = len(data)
    
    for i in range(n):
        if i == 0 or i == n - 1:
            result_data.append([data[i][0], data[i][1], 0])
        else:
            result_data.append([data[i][0], data[i][1], 1])
    
    return result_data

In [91]:
import pandas as pd


file_list = os.listdir('unipen/data')
for file_name in file_list:
  file_name = file_name[:-4]
  try:
    with open(os.path.join(f'unipen/include/{file_name}.dat'), 'r') as file:
      data_array = read_data(file)

    df = pd.read_csv(f'unipen/data/{file_name}.csv')

    df[['start_index', 'end_index']] = df['index'].str.split('-', expand=True)
    df['start_index'] = df['start_index'].fillna(df['start_index']).astype(int)
    df['end_index'] = df['end_index'].fillna(df['start_index']).astype(int)


    df = df.drop('index', axis=1)

    for i in range(len(df)):
      end_index, start_index, label = df['end_index'][i], df['start_index'][i], df['label'][i]

      if end_index == start_index:
        xy_array = data_array[start_index]

        xy_array = np.array(xy_array, dtype= np.intc)
        min_x = np.min(xy_array[:, 0])
        max_x = np.max(xy_array[:, 0])
        min_y = np.min(xy_array[:, 1])
        max_y = np.max(xy_array[:, 1])
        shape = (max_x - min_x, max_y - min_y)
        xy_array = scale_data(xy_array, shape)
        xy_array = bezier_curve(xy_array).astype(np.intc)
        xy_array = add_third_value(xy_array)
      else:
        xy_array_1 = data_array[start_index]
        xy_array_1 = np.array(xy_array_1, dtype= np.intc)

        xy_array_2 = data_array[end_index]
        xy_array_2 = np.array(xy_array_2, dtype= np.intc)
        
        min_x = np.min([ np.min(xy_array_1[:, 0]) , np.min(xy_array_2[:, 0])])
        max_x = np.max([ np.max(xy_array_1[:, 0]) , np.max(xy_array_2[:, 0])])
        min_y = np.min([ np.min(xy_array_1[:, 1]) , np.min(xy_array_2[:, 1])])
        max_y = np.max([ np.max(xy_array_1[:, 1]) , np.max(xy_array_2[:, 1])])


        shape = (max_x - min_x, max_y - min_y)
        xy_array_1 = scale_data(xy_array_1, shape)
        xy_array_2 = scale_data(xy_array_2, shape)
        xy_array_1 = bezier_curve(xy_array_1, 50).astype(np.intc)
        xy_array_2 = bezier_curve(xy_array_2, 50).astype(np.intc)
        xy_array = np.concatenate((add_third_value(xy_array_1), add_third_value(xy_array_2)), axis=0)
      directory = f'data/{label}/'
      if not os.path.exists(directory):
        os.makedirs(directory)
      pd.DataFrame(xy_array, columns=['x', 'y', 'pen_mode']).to_csv(f'{directory}{file_name}_{i}.csv', index=False)
  except:
    print(file_name)

rcw-dig15
zor-dig22
sjb-dig20


In [14]:
import gym
import gym_pen
import numpy as np

shape = (500, 500)

env = gym.make('pen-v0')

observation, _ = env.reset()

import tensorflow as tf

def custom_model(shape=(100, 100)):
    # Define the layers
    array_input = tf.keras.layers.Input(shape=(shape[0], shape[1], 1))
    char_input = tf.keras.layers.Input(shape=(1,))
    index_input = tf.keras.layers.Input(shape=(1,))
    xy_input = tf.keras.layers.Input(shape=(2,))

    array_x = tf.keras.layers.Conv2D(15, 3, activation='relu', kernel_initializer='he_normal')(array_input)
    array_x = tf.keras.layers.MaxPooling2D(5)(array_x)
    array_x = tf.keras.layers.Conv2D(15, 3, activation='relu', kernel_initializer='he_normal')(array_x)
    array_x = tf.keras.layers.Flatten()(array_x)

    combined_x = tf.keras.layers.Concatenate()([array_x, index_input, xy_input])
    x = tf.keras.layers.Dense(64, activation='relu', kernel_initializer='he_normal')(combined_x)
    x = tf.keras.layers.Dense(64, activation='relu', kernel_initializer='he_normal')(x)

    output_xy = tf.keras.layers.Dense(2, activation='relu')(x)

    model = tf.keras.Model(inputs=[array_input, index_input, xy_input], outputs=[output_xy])
    return model

model = custom_model(shape=shape)
model.summary()

Model: "model_9"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_37 (InputLayer)       [(None, 500, 500, 1)]        0         []                            
                                                                                                  
 conv2d_18 (Conv2D)          (None, 498, 498, 15)         150       ['input_37[0][0]']            
                                                                                                  
 max_pooling2d_9 (MaxPoolin  (None, 99, 99, 15)           0         ['conv2d_18[0][0]']           
 g2D)                                                                                             
                                                                                                  
 conv2d_19 (Conv2D)          (None, 97, 97, 15)           2040      ['max_pooling2d_9[0][0]'

In [17]:
env.initialize_with_noise(0.01)

In [18]:

# 모델 생성
model = custom_model()

# 모델 초기화
model.build(input_shape=[(None, 500, 500, 1), (None, 1), (None, 2)])

# 가상의 입력 데이터 생성 (with batch dimension)
array_input_data = np.expand_dims(env.model_canvas, axis=0)  # (1, 100, 100, 1)
index_input_data = np.array([[env.step_idx]])  # (1, 1)
xy_input_data = np.array([env.model_pen_position])  # (1, 2)


# 모델에 입력 데이터 전달하여 출력 얻기
output_xy = model.predict([array_input_data, index_input_data, xy_input_data])

np.shape(output_xy[0])
print(output_xy[0,:])

[0.00062219 0.0086446 ]
