In [1]:
!pip install --upgrade pip
!pip install -U python-dotenv
!pip install PyMySQL

Requirement already up-to-date: pip in /opt/conda/lib/python3.6/site-packages (20.1.1)
Requirement already up-to-date: python-dotenv in /opt/conda/lib/python3.6/site-packages (0.14.0)


In [2]:
from dotenv import load_dotenv
load_dotenv()

True

In [3]:
import os
import time

import pymysql

import multiprocessing

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from sklearn.preprocessing import MinMaxScaler

In [4]:
# DB_CONN INFOS
DB_USER = os.getenv('MYSQL_USER')
DB_PASSWD = os.getenv('MYSQL_PASSWORD')
DB_HOST = os.getenv('MYSQL_HOST')
DB_DB = os.getenv('MYSQL_DATABASE')

# Connect to db
db = pymysql.connect(
    user=DB_USER, 
    passwd=DB_PASSWD, 
    host=DB_HOST, 
    db=DB_DB, 
    charset='utf8'
)

# Set cursor
cursor = db.cursor(pymysql.cursors.DictCursor)

In [5]:
# Get all stations ids in database
sql = "SELECT TABLE_NAME FROM INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = %s;"
cursor.execute(sql, DB_DB)
stationIds = cursor.fetchall()
stationIds = [stationId['TABLE_NAME'] for stationId in stationIds]
stationIds = stationIds[:30]

In [6]:
start = time.time()

count = 0
data = {}
for stationId in stationIds:
    sql = "SELECT parkingBikeTotCnt FROM `{}`".format(stationId)
    count += cursor.execute(sql)
    res = cursor.fetchall()
    
    tempdf = pd.DataFrame(res)
    y = pd.DataFrame(tempdf.parkingBikeTotCnt)

    scaler = MinMaxScaler()
    y = scaler.fit_transform(y)
    
    data[stationId] = y
    
print("로딩시간 :", time.time() - start)
print("로드된 데이터 수 :", count)

로딩시간 : 1.0224604606628418
로드된 데이터 수 : 81360


In [7]:
def create_dataset(dataset, look_back=10, nPredicted = 6):
    dataX, dataY = [], []
    for i in range(len(dataset)-look_back-nPredicted + 1):
        dataX.append(dataset[i:(i+look_back), 0])
        dataY.append(dataset[i + look_back: i + look_back + nPredicted, 0])
        
    dataX, dataY = np.array(dataX), np.array(dataY)
    
    dataX = dataX.reshape(dataX.shape[0], dataX.shape[1], 1)
    dataY = dataY.reshape(dataY.shape[0], dataY.shape[1], 1)
    
    return dataX, dataY

In [22]:
def do_all_task(args):
    import keras
    from keras import optimizers
    from keras.models import Sequential
    from keras.layers import Dense, LSTM, Dropout, TimeDistributed, RepeatVector

    from keras import backend as K
    K.set_session(K.tf.Session(config=K.tf.ConfigProto(intra_op_parallelism_threads=32, 
                                                      inter_op_parallelism_threads=32, 
                                                      device_count={"CPU": 8})))
    
    model = Sequential()
    
    model.add(LSTM(27, activation='linear', input_shape=(10, 1)))
    model.add(RepeatVector(6))
    model.add(LSTM(27, activation='linear', return_sequences=True))
    model.add(TimeDistributed(Dense(1)))
    
    model.compile(optimizer='adam', loss='mse')
    
    
    print("학습 시작, pid :", os.getpid())
    start = time.time()
    history = model.fit(args['x'], args['y'], epochs=65, batch_size=64, verbose=0)
    print("학습시간 :", time.time() - start, "loss :", history.history['loss'][-1:])
    
    file_name = 'models/{}.h5'.format(args['key'])
    model.save(file_name)

In [23]:
try:
    nCores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=nCores)

    keys = data.keys()
    result = pool.map(create_dataset, [data[key] for key in keys])

    datasets = {}
    idx = 0
    for key in keys:
        datasets[key] = result[idx]
        idx+=1
except:
    pool.terminate()
    print('pool is terminated')
finally:
    print('joining pool processes')
    pool.close()
    pool.join()
    print('join complete')

joining pool processes
join complete


In [26]:
try:
    print("start training")
    nCores = multiprocessing.cpu_count()
    pool = multiprocessing.Pool(processes=nCores)

    keys = datasets.keys()

    pool.map(do_all_task, [{
        'key': key, 
        'x': datasets[key][0], 
        'y': datasets[key][1]
    } for key in keys])
    
    print("end training")
except:
    print('에러가 발생 했습니다')
    pool.terminate()
    print('pool is terminated')
finally:
    print('joining pool processes')
    pool.close()
    pool.join()
    print('join complete')

start training


Using TensorFlow backend.
Using TensorFlow backend.
Using TensorFlow backend.
Using TensorFlow backend.
Using TensorFlow backend.
Using TensorFlow backend.
Using TensorFlow backend.
Using TensorFlow backend.


Instructions for updating:
Instructions for updating:
Colocations handled automatically by placer.

Instructions for updating:
Instructions for updating:
Instructions for updating:
Colocations handled automatically by placer.


Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Colocations handled automatically by placer.
학습 시작, pid : 18491
학습 시작, pid : 18488
학습 시작, pid : 18489
학습 시작, pid : 18487
학습 시작, pid : 18493
학습 시작, pid : 18494
학습 시작, pid : 18492
학습 시작, pid : 18490
Instructions for updating:
Instructions for updating:
Use tf.cast instead.
Instructions for updating:
Instructions for updating:
Instructions for updating:
Use tf.cast instead.

Instructions for updating:
Use tf.cast instead.


Instructions for updating:
Instructions for updating:
Use tf.cast instead.

학습시간 : 341.1845133304596 loss : [0.004823275670063194]
학습시간 : 343.19205927848816 loss : [0.003740279