# 1. 작업 계획 세우기

1. Linux 파일 시스템에 저장된 공정 환경변수 파일들을 하루 한 번 MySQL에 정기적으로 저장
    - 저장된 공정 환경변수 파일들을 전처리
    - 전처리된 것을 parquet 형식으로 Hadoop에 저장
    - 전처리된 것을 SQL Alchemy로 pd.to_sql() 해서 MySQL에 저장(원래 테이블에 insert)
        - MySQL query를 select ~ 해서 parquet 형식으로 만들어 모델러에게 줄 수 있게 해보자
2. 업로드가 전부 완료되면 Linux 파일 시스템에서 공정 환경변수 파일들 삭제

우선 여기서는 폴더 파일을 사용하여 해본 후,
나중에 Linux 파일 시스템이 만들어지면 그걸로 바꾸면 됨

<a href="https://airflow.apache.org/docs/apache-airflow/stable/howto/set-up-database.html">airflow 공식문서 링크</a>

In [5]:
# 기본 패키지
import pandas as pd
import numpy as np
import time
import datetime
from time import strftime

# 추가 패키지
import pyarrow
import pyspark
from pyspark.sql import SparkSession

# 디렉토리 관련 패키지
import os
import glob
import natsort

# MySQL 관련 패키지
import MySQLdb
import mysql.connector

# SQL Alchemy 관련 패키지 1
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.dialects.mysql import *
from sqlalchemy.types import *

# SQL Alchemy 관련 패키지 2
from sqlalchemy.orm import sessionmaker
from sqlalchemy import Table, MetaData
from sqlalchemy import insert, update

# 2. MySQL 스케줄러

### [1] SQL Alchemy 연결 | Scheduler ON | Timezone 설정

In [6]:
# (1) SQL Alchemy 연결 - 함수 형식으로 정의
def MySQL_connect(user, password, db, host, port=3306):
    url = 'mysql+mysqldb://{}:{}@{}:{}/{}'.format(user, password, host, port, db)
    engine = sqlalchemy.create_engine(url, encoding='utf-8', echo=True)
    return engine

In [7]:
# 연결하기
engine = MySQL_connect('sixdogma', 'Poiu0987*', 'Anay', '13.113.12.130')

In [9]:
# 유니크 컬럼을 만들어줘야 하므로 일단 원래 있던 테이블 지운다
# engine.execute("DROP table variable;")

2022-11-12 09:16:06,371 INFO sqlalchemy.engine.Engine DROP table variable;
2022-11-12 09:16:06,371 INFO sqlalchemy.engine.Engine [raw sql] ()
2022-11-12 09:16:06,477 INFO sqlalchemy.engine.Engine COMMIT


<sqlalchemy.engine.cursor.LegacyCursorResult at 0x2224e817760>

In [12]:
# 유니크 컬럼을 추가해서 테이블을 다시 만들어준다
# engine.execute("CREATE TABLE variable ( var_id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, DateTime DATETIME NOT NULL UNIQUE KEY, Date DATE NOT NULL, Time TIME NOT NULL, Lot INT NOT NULL, pH FLOAT(3,2) NOT NULL, Temp FLOAT(4,2) NOT NULL, Voltage FLOAT(4,2) NOT NULL )")

2022-11-12 09:18:41,301 INFO sqlalchemy.engine.Engine CREATE TABLE variable ( var_id INT NOT NULL AUTO_INCREMENT PRIMARY KEY, DateTime DATETIME NOT NULL UNIQUE KEY, Date DATE NOT NULL, Time TIME NOT NULL, Lot INT NOT NULL, pH FLOAT(3,2) NOT NULL, Temp FLOAT(4,2) NOT NULL, Voltage FLOAT(4,2) NOT NULL )
2022-11-12 09:18:41,303 INFO sqlalchemy.engine.Engine [raw sql] ()
2022-11-12 09:18:41,385 INFO sqlalchemy.engine.Engine COMMIT


<sqlalchemy.engine.cursor.LegacyCursorResult at 0x2224e8526b0>

In [11]:
# (2) 스케줄러 작동 여부 확인 : 에러 발생 → 주석 처리
# MySQL 상에서 쿼리문으로 확인할 수 있다
# engine.execute("SHOW VARIABLES LIKE 'event%';")

In [13]:
# (3) 스케줄러 켜기 : 정상 실행
engine.execute("SET GLOBAL event_scheduler = ON;")

2022-11-12 09:18:57,525 INFO sqlalchemy.engine.Engine SET GLOBAL event_scheduler = ON;
2022-11-12 09:18:57,526 INFO sqlalchemy.engine.Engine [raw sql] ()


<sqlalchemy.engine.cursor.LegacyCursorResult at 0x2224e91b070>

In [14]:
# (4) MySQL Time Zone 확인 : 정상 실행 BUT 표시되지는 않음
# MySQL 상에서 쿼리문으로 확인할 수 있다
engine.execute("SELECT @@global.time_zone, @@session.time_zone, now();")

2022-11-12 09:19:01,282 INFO sqlalchemy.engine.Engine SELECT @@global.time_zone, @@session.time_zone, now();
2022-11-12 09:19:01,282 INFO sqlalchemy.engine.Engine [raw sql] ()


<sqlalchemy.engine.cursor.LegacyCursorResult at 0x2224e665ed0>

In [14]:
# (5) 한국 시간에 맞게 Time Zone 변경
# 코드로는 할 수 없고 직접 ubuntu로 들어가 mysql.conf.d 파일에서 수정을 해줘야 한다

### [2] 파일 불러내고 전처리하는 함수 만들기

이 과정을 scheduling할 필요가 있다

pandas dataframe 전처리 작업을 spark로 할 수 있을까? 추후 수정

In [15]:
# csv 파일 경로 세팅 => 추후 linux file system 경로로 수정

# var_dir : 공정환경변수 파일 기본 경로
var_dir = r'C:\Users\admin\Desktop\FinalProject\chromate\chromate_data\variable\\'
# varfilelist : 공정환경변수 파일 기본 경로 안의 csv 파일 목록들
varfilelist = natsort.natsorted(os.listdir(var_dir))
varfilelist = [file for file in varfilelist if file.endswith('.csv')]

# err_dir : 에러로트 파일 기본 경로
err_dir = r'C:\Users\admin\Desktop\FinalProject\chromate\chromate_data\error\\'
# errfilelist : 에러로트 파일 기본 경로 안의 csv 파일 목록들
errfilelist = natsort.natsorted(os.listdir(err_dir))
errfilelist = [file for file in errfilelist if file.endswith('.csv')]

In [16]:
# 1. csv 파일 하나 단위당 전처리하는 코드
def VarPreprocessing(root_dir, var_list):
    df = pd.read_csv(os.path.join(root_dir, var_list), engine='pyarrow')

    # csv 파일명으로부터 'Date' 컬럼 만들어주기
    df['Date'] = '-'.join(var_list.split('-')[-1].split('.')[:-1])

    # 'Time' 컬럼 전처리 : 간단하게 할 수 있는 strftime, strptime 방법 찾아볼 것
    adj_time = list()
    for time in df['Time']:
        tmp = time.split(':')
        if tmp[0].split(' ')[0] == '오후':
            tmp[0] = str(int(tmp[0].split(' ')[-1]) + 12)
        else:
            tmp[0] = tmp[0].split(' ')[-1]
        tmp = ':'.join(tmp).split('.')[0]
        adj_time.append(tmp)
    df['Time'] = adj_time

    # 스케줄링을 위해서는 유니크 컬럼이 필요 : 'Date'와 'Time' 합쳐서 'DateTime' 컬럼 만들기 (Date는 에러 로트 파일과 연관되므로 없애지 않는다)
    df['DateTime'] = df['Date'] + ' ' + df['Time']
    
    return df

In [17]:
# 2. csv 파일 전부 merge하고 정리하는 코드 
def VarMergeFrame(root_dir, var_list):
    # 우선 row 하나로 데이터프레임 만들어놓고 거기에 merge해나간다
    var_df = VarPreprocessing(root_dir, var_list[0])
    for i in range(1, len(var_list)):
        var_df = pd.merge(var_df, VarPreprocessing(root_dir, var_list[i]), how='outer')
        
    # 열 순서 조정 및 필요없는 'Index' 컬럼 삭제
    var_df = var_df[['Index', 'DateTime', 'Date', 'Time', 'Lot', 'pH', 'Temp', 'Voltage']]
    var_df.drop(columns=['Index'], inplace=True)

    return var_df

In [18]:
# 3. 함수가 제대로 만들어졌는지 확인해본다
variable_df = VarMergeFrame(var_dir, varfilelist)
variable_df

Unnamed: 0,DateTime,Date,Time,Lot,pH,Temp,Voltage
0,2021-09-06 16:29:54,2021-09-06,16:29:54,1,2.15,43.15,19.74
1,2021-09-06 16:29:59,2021-09-06,16:29:59,1,2.08,40.13,18.01
2,2021-09-06 16:30:04,2021-09-06,16:30:04,1,2.18,43.46,18.73
3,2021-09-06 16:30:09,2021-09-06,16:30:09,1,1.99,41.72,16.75
4,2021-09-06 16:30:14,2021-09-06,16:30:14,1,1.85,43.65,18.02
...,...,...,...,...,...,...,...
50089,2021-10-27 18:36:03,2021-10-27,18:36:03,22,2.05,42.84,15.38
50090,2021-10-27 18:36:08,2021-10-27,18:36:08,22,1.91,42.64,19.08
50091,2021-10-27 18:36:13,2021-10-27,18:36:13,22,2.11,44.09,18.14
50092,2021-10-27 18:36:18,2021-10-27,18:36:18,22,1.92,43.95,17.96


In [20]:
# 4. 데이터프레임에 있는 데이터 insert하기
var_type = {'DateTime':sqlalchemy.types.DATETIME(),
            'Date':sqlalchemy.types.DATE(),
            'Time':sqlalchemy.types.TIME(),
            'Lot':sqlalchemy.types.INT(),
            'pH':sqlalchemy.types.FLOAT(),
            'Temp':sqlalchemy.types.FLOAT(),
            'Voltage':sqlalchemy.types.FLOAT()
}
variable_df.to_sql(name='variable', con=engine, if_exists='append', index=False, dtype=var_type)

2022-11-12 09:20:21,161 INFO sqlalchemy.engine.Engine SELECT COUNT(*) FROM information_schema.tables WHERE table_schema = %s AND table_name = %s
2022-11-12 09:20:21,161 INFO sqlalchemy.engine.Engine [generated in 0.00158s] ('Anay', 'variable')
2022-11-12 09:20:21,576 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2022-11-12 09:20:21,852 INFO sqlalchemy.engine.Engine INSERT INTO variable (`DateTime`, `Date`, `Time`, `Lot`, `pH`, `Temp`, `Voltage`) VALUES (%s, %s, %s, %s, %s, %s, %s)
2022-11-12 09:20:21,852 INFO sqlalchemy.engine.Engine [generated in 0.23212s] (('2021-09-06 16:29:54', '2021-09-06', '16:29:54', 1, 2.15, 43.15, 19.74), ('2021-09-06 16:29:59', '2021-09-06', '16:29:59', 1, 2.08, 40.13, 18.01), ('2021-09-06 16:30:04', '2021-09-06', '16:30:04', 1, 2.18, 43.46, 18.73), ('2021-09-06 16:30:09', '2021-09-06', '16:30:09', 1, 1.99, 41.72, 16.75), ('2021-09-06 16:30:14', '2021-09-06', '16:30:14', 1, 1.85, 43.65, 18.02), ('2021-09-06 16:30:19', '2021-09-06', '16:30:19', 1, 1.94, 42.82

50094

에러 로트 csv도 함수 만들어 준다

In [40]:
# 1. csv 파일 하나 단위당 전처리하는 코드
def ErrPreprocessing(root_dir, err_list):
    df = pd.read_csv(os.path.join(root_dir, err_list), engine='pyarrow')
    df.rename(columns = {'0':'Date', '1':'FailureLot1', '2':'FailureLot2'}, inplace=True)
    
    return df

In [41]:
# 2. csv 파일 전부 merge하고 정리하는 코드
def ErrMergeFrame(root_dir, err_list):
    err_df = ErrPreprocessing(root_dir, err_list[0])
    
    # 파일이 두 개 이상일 경우에만 merge 작업을 해주면 된다
    if len(err_list) > 1:
        for i in range(1, len(err_list)):
            err_df = pd.merge(err_df, ErrPreprocessing(root_dir, err_list[i]), how='outer')
            err_df = err_df[['Date', 'FailureLot1', 'FailureLot2']]
    else:
        err_df = err_df[['Date', 'FailureLot1', 'FailureLot2']]

    return err_df

In [42]:
# 3. 함수가 제대로 만들어졌는지 확인해본다
error_df = ErrMergeFrame(err_dir, errfilelist)
error_df

Unnamed: 0,Date,FailureLot1,FailureLot2
0,2021-09-06,,
1,2021-09-07,,
2,2021-09-08,20.0,
3,2021-09-09,16.0,5.0
4,2021-09-10,,
5,2021-09-13,,
6,2021-09-14,,
7,2021-09-15,,
8,2021-09-16,4.0,
9,2021-09-17,,


### [3] MySQL에 upsert 스케줄링 (공식문서참조)

In [None]:
def InsertSchedule():
    cols = '`,`'.join([str(i) for i in variable_df.columns.tolist()])
    for i, row in variable_df.iterrows():
        sql = 'INSERT INTO `variable` (`' + cols + '`) VALUES (' + '%s,'*(len(row)-1) + '%s)'

In [21]:
# 스케줄러 : 일단 주석처리
# engine.execute("CREATE EVENT IF NOT EXIST 'variable_upsert' ON SCHEDULE EVERY 1 DAY STARTS '2022-11-13 00:00:00 ON COMPLETION NOT PRESERVE ENABLE COMMENT 'variable_upsert' DO 'INSERT INTO 'variable' ('DateTime', 'Date', 'Time', 'Lot', 'pH', 'Temp', 'Voltage') VALUES (")

### [4] spark 테스트

In [9]:
# 스파크를 사용하기 위해 가장 먼저 SparkContext라는 스파크 객체를 만들어주어야 한다.
# SparkContext를 만들어 주기 위해서 우선 SparkSession을 만들어 주자.
spark = (SparkSession
         .builder
         .appName('SparkExample')
         .getOrCreate())

# sparkContext로 객체 생성
sc = spark.sparkContext

In [10]:
# 스파크로 csv 파일 읽어보기
df = (spark.read.format('csv')
      .option("inferSchema", "True")
      .option("header", "true")
      .load(os.path.join(var_dir, varfilelist[0])))

df.createOrReplaceTempView('df_table')

df.printSchema()  # 스키마 형태 볼 수 있음
df.show()         # 가상의 테이블을 만들어 데이터프레임을 볼 수 있음

root
 |-- Index: integer (nullable = true)
 |-- Lot: integer (nullable = true)
 |-- Time: string (nullable = true)
 |-- pH: double (nullable = true)
 |-- Temp: double (nullable = true)
 |-- Voltage: double (nullable = true)

+-----+---+--------------+----+-----+-------+
|Index|Lot|          Time|  pH| Temp|Voltage|
+-----+---+--------------+----+-----+-------+
|    1|  1|오후 4:29:54.0|2.15|43.15|  19.74|
|    2|  1|오후 4:29:59.0|2.08|40.13|  18.01|
|    3|  1|오후 4:30:04.0|2.18|43.46|  18.73|
|    4|  1|오후 4:30:09.0|1.99|41.72|  16.75|
|    5|  1|오후 4:30:14.0|1.85|43.65|  18.02|
|    6|  1|오후 4:30:19.0|1.94|42.82|  19.27|
|    7|  1|오후 4:30:24.0|1.94|43.17|   17.4|
|    8|  1|오후 4:30:29.0|2.06|44.16|  18.69|
|    9|  1|오후 4:30:34.0|1.97|41.79|  15.33|
|   10|  1|오후 4:30:39.0|1.94|42.62|  17.44|
|   11|  1|오후 4:30:44.0|2.14|40.61|  18.35|
|   12|  1|오후 4:30:49.0|1.85|42.98|  19.24|
|   13|  1|오후 4:30:54.0|2.16|43.76|  16.55|
|   14|  1|오후 4:30:59.0|1.88|41.33|  18.34|
|   15|  1|오후 4:31:04