# 1. 메인 데이터베이스(Oracle)의 파라미터 테이블에서 메타데이터 불러오기

In [1]:
# 파라미터 테이블 조작은 DB에서!

In [2]:
# 엔진생성 커넥션 정보!
# ORACLE(1521), 'oracle+cx_oracle://아이디:패스워드@아이피주소(or 엔드포인트):포트번호/데이터베이스명') 
# MARIA+MYSQL(3306), 'mysql+pymysql://kopo:kopo@www.hadoopkorea.com:3306/kopo'
# POSTGRE(5432), 'postgresql+psycopg2://postgres:postgres@127.0.0.1:5432/postgres'

## 1-1. ' URL_스키마 ' 반환 함수 생성

In [3]:
def URL_scheme(db_type) :
    
    (str)(db_type);
    db_type = db_type.upper();
    
    if (db_type=='ORACLE'):
        return 'oracle+cx_oracle://'
    elif ( (db_type=='MARIADB') | (db_type=='MARIA') | (db_type=='MYSQL') ):
        return 'mysql+pymysql://'
    elif (db_type=='POSTGRE'):
        return 'postgresql+psycopg2://'
    else :
        print('오류입니다. 파라미터 테이블을 확인하세요')

## 1-2. 메인 DB의 파라미터 테이블에서 메타데이터 불러오기

In [4]:
import pandas as pd
from sqlalchemy import create_engine
 
# 메인 DB 연결시 필요한 엔진생성
main_db_type = 'oracle'.upper()
main_connection_info = 'system:oracle@127.0.0.1:1521/xe'.upper()

engine_oracle = create_engine( URL_scheme(main_db_type) + main_connection_info )

# DB 테이블을 데이터프레임으로 저장하기
parameter_table = pd.read_sql_query('SELECT * FROM PARAMETER_TABLE', engine_oracle) 

# 데이터프레임 확인
parameter_table

# 컬럼해더 재정의, parameter_table.columns = ['KEY','VALUE']

Unnamed: 0,parameter_key,parameter_value
0,SRC_TYPE,
1,SRC_DB_TYPE,ORACLE
2,DEST_CONNECTION_INFO,SYSTEM:ORACLE@127.0.0.1:1521/XE
3,SRC_CONNECTION_INFO,SYSTEM:ORACLE@127.0.0.1:1521/XE
4,SRC_TABLE_NAME,STUDENT
5,SRC_FILE_PATH,../DATASET
6,DEST_TABLE_NAME,RESULT
7,DEST_DB_TYPE,ORACLE


# 2. ETL

In [5]:
# 원본 데이터를 추출하는 장소위치 { excel,csv 혹은 DB }
src_type = str( parameter_table.loc[parameter_table.parameter_key == 'SRC_TYPE', 'parameter_value'].values[0] ).upper()

## 2-1. 엑셀 혹은 CSV파일에서 소스 데이터를 추출하는 경우

In [6]:
# 경로에 있는 모든 excel 및 csv파일을 DB에 저장하기

In [7]:
if ( (src_type == 'EXCEL') | (src_type == 'CSV') ):
    
    import pandas as pd
    import pymysql
    import cx_Oracle    
    import os
    from sqlalchemy import create_engine, types

    # 추출할 파일의 경로
    file_path = parameter_table.loc[parameter_table.parameter_key == 'SRC_FILE_PATH', 'parameter_value'].values[0].upper()

    # 모든 파일의 이름을 file_name 리스트에 담기
    file_name = os.listdir(file_path) # ['calsdata2_final.xlsx']

    # 엑셀파일과 CSV파일의 이름을 담은 리스트 생성
    excel_file_list = [file for file in file_name if (file.endswith(".xlsx")) or (file.endswith(".xls")) ] 
    csv_file_list = [file for file in file_name if (file.endswith(".csv"))]

    
# excel 파일

    # <1.Extract >
    for file in excel_file_list:     
        src_table = pd.read_excel('{}/{}'.format(file_path, file) )     
        
    # <2.Transformation >    
        # 컬럼해더 재정의, src_table.columns = ['KEY','VALUE']
               
    # <3.Load, 가공된 소스 데이터를 메인 DB에 테이블로 저장 >
        # 메인 DB 연결시 필요한 엔진생성
        dest_db_type = parameter_table.loc[parameter_table.parameter_key == 'DEST_DB_TYPE', 'parameter_value'].values[0].upper()
        dest_connection_info = parameter_table.loc[parameter_table.parameter_key == 'DEST_CONNECTION_INFO', 'parameter_value'].values[0].upper()        
        dest_table_name = os.path.splitext(file)[0].lower() # 테이블 이름 설정 # print(dest_table_name) -> calsdata2 

        engine = create_engine( URL_scheme(dest_db_type) + dest_connection_info )

        # 속도 향상을 위해 문자열 컬럼을 varchar(100)로 변환하여 DB에 저장
        object_column = list(src_table.columns[src_table.dtypes == 'object']) 
        type_dict={}
        maxLen = 100
        for i in range(0, len(object_column)):
            type_dict[ object_column[i] ] = types.VARCHAR(100)

        try :
            src_table.to_sql(name = dest_table_name, con = engine, if_exists = 'replace', dtype = type_dict, index = False)
        except Exception as e:
            print(e)  
            
# csv 파일
    
    # <1.Extract >
    for file in csv_file_list:
        src_table = pd.read_csv('{}/{}'.format(file_path, file) )     

    # < 2.Transformation >    

    # < 3.Load, 가공된 소스 데이터를 메인 DB에 테이블로 저장 >
        # 메인 DB 연결시 필요한 엔진생성
        dest_db_type = parameter_table.loc[parameter_table.parameter_key == 'DEST_DB_TYPE', 'parameter_value'].values[0].upper()        
        dest_connection_info = parameter_table.loc[parameter_table.parameter_key == 'DEST_CONNECTION_INFO', 'parameter_value'].values[0].upper()
        dest_table_name = os.path.splitext(file)[0].lower() # 테이블 이름 설정
        
        engine = create_engine( URL_scheme(dest_db_type) + dest_connection_info )

        # 속도 향상을 위해 문자열 컬럼을 varchar(100)로 변환하여 DB에 저장
        object_column = list(src_table.columns[src_table.dtypes == 'object']) 
        type_dict={}
        maxLen = 100
        for i in range(0, len(object_column)):
            type_dict[ object_column[i] ] = types.VARCHAR(100)

        try :
            src_table.to_sql(name = dest_table_name, con = engine, if_exists = 'replace', dtype = type_dict, index = False)
        except Exception as e:
            print(e)

## 2-2. 데이터베이스에서 소스 데이터를 추출하는 경우

In [8]:
if ( (src_type == 'DB') | (src_type == 'DATABASE') ):
    
    import pandas as pd
    import pymysql
    import cx_Oracle
    from sqlalchemy import create_engine, types

    # < 1.Extract, 메인 DB 파라미터 테이블의 메타데이터를 활용해서 DB에 있는 소스 데이터 불러오기 >
    # DB 연결시 필요한 엔진생성
    src_db_type = parameter_table.loc[parameter_table.parameter_key == 'SRC_DB_TYPE', 'parameter_value'].values[0].upper()        
    src_connection_info = parameter_table.loc[parameter_table.parameter_key == 'SRC_CONNECTION_INFO', 'parameter_value'].values[0].upper()        
    src_table_name = parameter_table.loc[parameter_table.parameter_key == 'SRC_TABLE_NAME', 'parameter_value'].values[0].upper() # 추출할 테이블의 이름

    engine = create_engine( URL_scheme(src_db_type) + src_connection_info )
    
    # DB의 소스데이터를 데이터프레임으로 저장하기
    src_table = pd.read_sql_query('SELECT * FROM {}'.format(src_table_name), engine)   
    
    # < 2.Transformation >        

    # < 3.Load, 가공된 소스데이터를 메인 DB에 테이블로 저장 >
    
    # 메인 DB 연결시 필요한 엔진생성
    dest_db_type = parameter_table.loc[parameter_table.parameter_key == 'DEST_DB_TYPE', 'parameter_value'].values[0].upper()        
    dest_connection_info = parameter_table.loc[parameter_table.parameter_key == 'DEST_CONNECTION_INFO', 'parameter_value'].values[0].upper()        
    dest_table_name = parameter_table.loc[parameter_table.parameter_key == 'DEST_TABLE_NAME', 'parameter_value'].values[0].lower() # 메인 DB에 생성될 테이블 이름

    engine = create_engine( URL_scheme(dest_db_type) + dest_connection_info )
    
    # 속도 향상을 위해 문자컬럼을 varchar(100)로 변환하여 DB에 저장
    object_column = list(src_table.columns[src_table.dtypes == 'object']) #print( src_table.dtypes ) -> name:object, age:int64
    type_dict={}
    maxLen = 100
    for i in range(0, len(object_column)):
        type_dict[ object_column[i] ] = types.VARCHAR(100)        
    try :
        src_table.to_sql(name = dest_table_name, con = engine, if_exists = 'replace', dtype = type_dict, index = False)
    except Exception as e:
        print(e)    

In [9]:
if ( src_type == 'NONE'):
    print( "파라미터 테이블의 'SRC_TYPE' 값이 비어있습니다. 값을 입력하세요" )

파라미터 테이블의 'SRC_TYPE' 값이 비어있습니다. 값을 입력하세요
