---
jupyter: python3
toc: true
toc-depth: 3
toc-expand: true
number-sections: true
title: Redshift에 데이터를 적재하는 과정에서 얻은 교훈 & psycopg2
date: 2020-02-11 00:00
categories: [Redshift]
author: limyj0708
comments:
  giscus:
    repo: limyj0708/blog
format:
    html:
        page-layout: full
---

# Redshift

## 대량의 데이터를 Insert로 넣을 생각은 하지 말 것

* 차라리 로컬 DB에서 연산한 후, S3에 올리고 COPY로 집어넣는 것이 훨씬 빠르다.
* Redshift에서 직접 SELECT, INSERT 처리를 하면 : 300만행 추가에 예상 완료시간 4일
* 로컬 DB에서 연산 후 COPY로 업로드하면 : 300만 행 연산시간 3.5시간, COPY 업로드 시간 5분
* [AWS DW 설명 문서](https://aws.amazon.com/ko/data-warehouse/)의 말을 들었어야 했는데.

```Python
# 같은 코드를 로컬 DB에서 돌리면 3.5시간, Redshift에 연결해서 돌리면 4일

with psycopg2.connect(**connect_param_local) as con:
    cur_2 = con.cursor()
    # fetchall을 사용해서 커서를 재활용하지 않고 커서를 두 개 두는 이유는, redshift의 single-node cluster에서는 fetchall이 지원되지 않기 떄문이다.
        # InternalError_: Fetch ALL is not supported on single-node clusters.
        # Please specify the fetch size (maximum 1000 for single-node clusters) 
        # or upgrade to a multi node installation.
    # 로컬 머신에서 연산하면 fetchall을 사용할 수 있으므로 이렇게 안 해도 되지만,
    # 코드 수정이 더 번거로웠으므로 그냥 사용하였다.
    
    get_companylist_sql = '''select * from target_company_list;'''
    target_company_list = sqlio.read_sql_query(get_companylist_sql, con)
    # 1mb도 안되는 작은 테이블이라서 데이터프레임으로 한 번에 받아옴

    for each_code in target_company_list['stock_code']:
        cur_1 = con.cursor('ss_cursor') # server side cursor
        # cur_1.itersize = 1000 # redshift single-node cluster에서의 server side cursor의 최대 제한값
        
        print(each_code,'_start')
        predict_start = target_company_list[target_company_list['stock_code'] == each_code]['pre_6m'].values[0] - datetime.timedelta(days=1)
        predict_end = predict_start - datetime.timedelta(days=1096)

        cur_1.execute(
            """
            select * from stock_data_2000_2020_raw
            where (date between %(predict_end)s and %(predict_start)s) and (stock_code = %(stock_code)s);
            """,
            {'predict_end':predict_end.strftime("%Y-%m-%d"),'predict_start':predict_start.strftime("%Y-%m-%d"),'stock_code': each_code}
        )

        for each_row in cur_1: 
            # next(cur_1)로 한줄씩 불러와서 insert
            cur_2.execute("""insert into stock_data_3years_raw_6m values %s""", [each_row])
            # each_row는 tuple이다.
        con.commit()
        print(each_code,'_commit complete')
    cur_2.close()
```

# psycopg2

## Query parameter 전달 시의 유의점

* [psycopg2 document](https://www.psycopg.org/docs/usage.html#query-parameters)에는, 빨간색으로 엄청 잘 보이게 써 있는 경고문이 있다.

> Warning: Never, never, NEVER use Python string concatenation (+) or string parameters interpolation (%) to pass variables to a SQL query string. Not even at gunpoint.

* SQL Injection의 위험이 있기 때문인데, 어떻게 위험한지는 [여기를](https://stackoverflow.com/questions/37329370/how-to-avoid-sql-injection-with-select-from-table-name) 참고하자.
* 그럼 어떻게 하라는 걸까?
    * 최종적으로 사용한 형식은 아래와 같다.
    
    ```Python
    import psycopg2
    from psycopg2 import sql

    credentials = 'aws_access_key_id=**************;aws_secret_access_key=**************'
    s3_bucket_param = 's3://BUCKET-NAME/FILE-NAME'

    copy_query = sql.SQL("""
            copy {table_name}
            from %(s3_bucket_param)s
            credentials %(credentials)s
            IGNOREHEADER 1
            CSV;
        """).format(table_name = sql.Identifier('TABLE-NAME'))
    
    cur.execute(copy_query, {'s3_bucket_param':s3_bucket_param, 'credentials':credentials})
    ```
    
    * **{}** : 테이블 이름 등의 identifier를 받는다. %s 형식으로 identifier를 받으려고 하면, 제대로 인식이 안 되기 때문에 번거롭지만 `.foramt(table_name = sql.Identifier('TABLE-NAME'))`형식으로 인자를 넘겨야 한다. keyword parameter로 안 해도 되지만, 어떤 자리에 무엇이 들어가는 지 명확하게 정의하는 것을 좋아하므로 몽땅 keyword parameter로 진행하였다.
        * [sql 모듈 설명](https://www.psycopg.org/docs/sql.html)
        * [같은 문제로 고통받던 사람의 이슈제기](https://github.com/psycopg/psycopg2/issues/423)
    * **%(keyword)s** : value를 받는다. execute 함수 내부에서 인자로 전달하면 된다. keyword parameter로 정의했을 경우 dictionary로 전달하자.

## Server Side Cursor

* Client Side Cursor를 사용하면, 일단 데이터를 클라이언트의 메모리에 저장한 후 거기서 결과값을 계산하게 된다.
* 엄청 큰 테이블의 일부를 select 하려고 하면 반드시 메모리 부족으로 문제가 생기게 된다.
* Server Side Cursor를 사용하면, 서버에서 연산 처리 후 결과값만 반환해주기 때문에, 클라이언트 메모리 문제에서 좀 자유로워진다.
* 서버 리소스는 더 쓰게 되고, 네트워크 부하는 줄어들게 된다.
```Python
# 편리하게도 커서에 이름만 지정해주면 된다!
cur_1 = con.cursor('ss_cursor')
```

# 로컬 DB : postgreSQL

## COPY 시의 권한 문제

* 작업 폴더 내에 있는 CSV 파일을 그대로 COPY하려고 하면, 무조건 permission error가 발생한다.
* DB 서버 사용자가 해당 파일에 접근할 권한이 없기 때문에 발생하는 문제로, 파일이나 폴더의 권한설정을 만져주면 해결된다. 그런데 권한 설정하는 것 보다는 DB 서버 사용자가 접근할 수 있는 폴더에 CSV 파일을 옮기는 것이 더 빠르지 않을까?
    * postgreSQL을 Mac에서 Homebrew로 설치했다면, /usr/local/var/postgres
    * 예시

```Python
with psycopg2.connect(**connect_param_local) as con:
    with con.cursor() as cur:
        cur.execute(
            """
            COPY stock_data_2000_2020_raw
            from '/usr/local/var/postgres/stock_data_raw_2.csv'
            DELIMITER ','
            CSV HEADER;
            """)
con.commit()
```