In [None]:
''' 
데이터를 MYSQL에서 불러와서(E)

데이터 가공 (T)

데이터를 postgres에 저장 (L)
'''

In [3]:
import dotenv
import os

env_path = dotenv.find_dotenv()
dotenv.load_dotenv(env_path)

DB_SETTINGS = dict(
    mysql_params = dict(
        engine_name = os.getenv('MYSQL_ENGINE_NAME',''),
        user = os.getenv('MYSQL_USER',''),
        password = os.getenv('MYSQL_PASSWORD',''),
        host = os.getenv('MYSQL_HOST',''),
        port = os.getenv('MYSQL_PORT',''),
        database = os.getenv('MYSQL_DATABASE','')
        ),

postgres_params = dict(
        engine_name = os.getenv('PG_ENGINE_NAME',''),
        user = os.getenv('PG_USER',''),
        password = os.getenv('PG_PASSWORD',''),
        host = os.getenv('PG_HOST',''),
        port = os.getenv('PG_PORT',''),
        database = os.getenv('PG_DATABASE','')    
        )
    )
DB_SETTINGS

{'mysql_params': {'engine_name': 'mysql+pymysql',
  'user': 'root',
  'password': '123456',
  'host': 'localhost',
  'port': '3300',
  'database': 'docker_mysql'},
 'postgres_params': {'engine_name': 'postgresql',
  'user': 'codeit',
  'password': 'sprint',
  'host': 'localhost',
  'port': '5430',
  'database': 'docker_postgres'}}

In [4]:
import pymysql, psycopg2
from sqlalchemy import create_engine

In [5]:
class DOConnector:
    def __init__(self, engine_name, user, password, host, port, database):
        self.engine_name = engine_name
        self.user = user
        self.password = password
        self.host = host
        self.port = port
        self.database = database
    
    # pymysql
    def pymysql_connection(self):
        mysql_conn = pymysql.connect(
            user =self.user,
            password=self.password,
            host=self.host,
            port=int(self.port),
            database=self.database,
            charset='utf8'
            )
        return mysql_conn
    
    # psycopg2
    def psycopg2_connection(self):
        psycopg2_conn = psycopg2.connect(
            user=self.user,
            password=self.password,
            host=self.host,
            port=self.port,
            database=self.database,
            )
        return psycopg2_conn
    
    # sqlalchemy
    def sqlalchemy_connection(self):
        sqlalchemy_conn = create_engine(f"{self.engine_name}://{self.user}:{self.password}@{self.host}:{self.port}/{self.database}")
        return sqlalchemy_conn

#### 1) 데이터 추출 (extract)

- MYSQL 데이터베이스에 저장되어 있는 pokemon테이블을 가져온다

In [10]:
# mysql 커넥션 불러오기
import pandas as pd

mysql_conn = DOConnector(**DB_SETTINGS['mysql_params']).sqlalchemy_connection()
query = 'SELECT * FROM pokemon'

def extractor(connection_obj,query):
    df = pd.read_sql(
        sql = query,
        con = connection_obj
    )
    return df

extractor(mysql_conn,query)

Unnamed: 0,index,id,kor_name,eng_name,type1,type2,total,hp,attack,defense,special_attack,special_defense,speed,generation,is_legendary
0,0,1,이상해씨,Bulbasaur,Grass,Poison,318,45,49,49,65,65,45,1,0
1,1,2,이상해풀,Ivysaur,Grass,Poison,405,60,62,63,80,80,60,1,0
2,2,3,이상해꽃,Venusaur,Grass,Poison,525,80,82,83,100,100,80,1,0
3,3,4,파이리,Charmander,Fire,,309,39,52,43,60,50,65,1,0
4,4,5,리자드,Charmeleon,Fire,,405,58,64,58,80,65,80,1,0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
246,246,247,데기라스,Pupitar,Rock,Ground,410,70,84,70,65,70,51,2,0
247,247,248,마기라스,Tyranitar,Rock,Dark,600,100,134,110,95,100,61,2,0
248,248,249,루기아,Lugia,Psychic,Flying,680,106,90,130,90,154,110,2,1
249,249,250,칠색조,Ho-oh,Fire,Flying,680,106,130,90,110,154,90,2,1


#### 데이터 가공 (Trnasform)

- Daraframe 형태로 가져온 MYSQL 테이블을 가공한다,
- Type1 컬럼으로 Group by 후 내림차순 정렬!

In [None]:
df = extractor(mysql_conn,query)

def transformer(pandas_df):
    t_df = pandas_df.value_counts('type1').to_frame().reset_index()
    return t_df
    
transformer(df)

Unnamed: 0,type1,count
0,Water,46
1,Normal,37
2,Bug,22
3,Grass,21
4,Fire,20
5,Electric,15
6,Poison,15
7,Psychic,15
8,Rock,13
9,Ground,11


#### 3)데이터 저장 (Load)

- 가공된 DataFrame을 'pokemon_type'이라는 이름의 테이블로 POSTGRESQL에 저장한다.

In [30]:
t_df = transformer(df)
pg_conn = DOConnector(**DB_SETTINGS['postgres_params']).sqlalchemy_connection()
table_name='pokemon_type'

def loader(pandas_df,table_name,connection_obj):
    try:
        pandas_df.to_sql(
            name=table_name,
            con=connection_obj,
            if_exists='replace',
            index=False
        )
        print('POSTGRESQL에 테이블 저장 완료!')
    except:
        print('저장이 제대로 되지 않았습니다')
    
loader(t_df,table_name,pg_conn)

POSTGRESQL에 테이블 저장 완료!


#### 4)controller.py구현

- extracrot(),transformer(),loader() 함수를 순서대로 실행해주는 함수

In [None]:
def main():
    mysql_conn = DOConnector(**DB_SETTINGS['mysql_params']).sqlalchemy_connection()
    query = 'SELECT * FROM pokemon'
    pg_conn = DOConnector(**DB_SETTINGS['postgres_params']).sqlalchemy_connection()
    table_name='pokemon_type'

    # 함수 실행부부
    df = extractor(mysql_conn,query)
    t_df = transformer(df)
    loader(t_df,table_name,pg_conn)
    
main()

POSTGRESQL에 테이블 저장 완료!


### 2. 파이썬 모듈화

In [37]:
from settings import DB_SETTINGS
from db.connector import DOConnector
from pipeline.extract import extractor
from pipeline.transform import transformer
from pipeline.load import loader

In [38]:
def main():
    mysql_conn = DOConnector(**DB_SETTINGS['mysql_params']).sqlalchemy_connection()
    query = 'SELECT * FROM pokemon'
    pg_conn = DOConnector(**DB_SETTINGS['postgres_params']).sqlalchemy_connection()
    table_name='pokemon_type'

    # 함수 실행부부
    df = extractor(mysql_conn,query)
    t_df = transformer(df)
    loader(t_df,table_name,pg_conn)
    
main()

extractor 시작
transformer 시작
loader 시작작
POSTGRESQL에 테이블 저장 완료!
