In [59]:
from dotenv import load_dotenv
import os
import snowflake.connector
# .env 파일의 환경변수를 로드

load_dotenv(dotenv_path='dev.env')

True

In [60]:
from dotenv import load_dotenv
import os
import snowflake.connector

# .env 파일의 환경변수를 로드
load_dotenv(dotenv_path='dev.env')

def return_snowflake_conn():

    conn = snowflake.connector.connect(
        user=os.environ.get('SNOWFLAKE_USER'),
        password=os.environ.get('SNOWFLAKE_PASSWORD'),
        account=os.environ.get('SNOWFLAKE_ACCOUNT'),
        warehouse = os.environ.get('SNOWFLAKE_WAREHOUSE'),
        database = os.environ.get('SNOWFLAKE_DATABASE')
    )

    return conn.cursor()

In [61]:
import requests

def extract(url):
    f = requests.get(url)
    return (f.text)

In [62]:
# table를 읽어서 판다스 데이터프레임으로 변환하고 처음 5개의 레코드와 레코드 수를 표시
def check_table_stats(cur, table, key):
    result = cur.execute(f"SELECT * FROM {table} ORDER BY {key}")
    df = cur.fetch_pandas_all()
    print(len(df))
    print(df.head())
    return df

In [63]:
# transform 함수를 개선해서 헤더를 빼고 리턴하게 만들자
def transform_v2(text):
    lines = text.strip().split("\n")
    records = []
    for l in lines:  # remove the first row
        (country, capital) = l.split(",")
        records.append([country, capital])
    return records[1:]

In [64]:
# load 함수를 개선해서 제대로 된 "Full refresh"를 구현하자
#  - 매번 테이블을 새로 만들고 레코드를 새로 적재하는 방식으로 구현
#  - SQL transaction을 사용하는 것이 일반적이지만 Snowflake은 SQL transaction을 지원하지
#    않기 때문에 다른 방식으로 구현
def load_v2(con, records):
    target_table = "dev.raw_data.country_capital"
    try:
        # 여기서 CREATE OR REPLACE TABLE이 사용되면 안됨. DDL이라 Snowflake에서는 transaction을 지키지 않기 때문
        con.execute(f"CREATE TABLE IF NOT EXISTS {target_table} (country varchar primary key, capital varchar);")
        con.execute("BEGIN;")
        con.execute(f"DELETE FROM {target_table};")
        for r in records:
            country = r[0].replace("'", "''")
            capital = r[1].replace("'", "''")
            print(country, "-", capital)

            sql = f"INSERT INTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
            con.execute(sql)
        con.execute("COMMIT;")
    except Exception as e:
        con.execute("ROLLBACK;")
        print(e)
        raise e

In [65]:
def country_capital_data_pipeline_v2(link):
    cur = return_snowflake_conn()
    data = extract(link)
    lines = transform_v2(data)
    load_v2(cur, lines)
    check_table_stats(cur, "dev.raw_data.country_capital", "country")
    cur.close()

In [66]:
link = "https://s3-geospatial.s3.us-west-2.amazonaws.com/country_capital.csv"
country_capital_data_pipeline_v2(link)

Abkhazia - Sukhumi
Afghanistan - Kabul
Akrotiri and Dhekelia - Episkopi Cantonment
Albania - Tirana
Algeria - Algiers
American Samoa - Pago Pago
Andorra - Andorra la Vella
Angola - Luanda
Anguilla - The Valley
Antigua and Barbuda - St. John''s
Argentina - Buenos Aires
Armenia - Yerevan
Aruba - Oranjestad
Ascension Island - Georgetown
Australia - Canberra
Austria - Vienna
Azerbaijan - Baku
Bahamas - Nassau
Bahrain - Manama
Bangladesh - Dhaka
Barbados - Bridgetown
Belarus - Minsk
Belgium - Brussels
Belize - Belmopan
Benin - Porto-Novo
Bermuda - Hamilton
Bhutan - Thimphu
Bolivia - La Paz
Bosnia and Herzegovina - Sarajevo
Botswana - Gaborone
Brazil - Brasââ lia
British Virgin Islands - Road Town
Brunei - Bandar Seri Begawan
Bulgaria - Sofia
Burkina Faso - Ouagadougou
Burundi - Bujumbura
Cambodia - Phnom Penh
Cameroon - YaoundâÂ©
Canada - Ottawa
Cape Verde - Praia
Cayman Islands - George Town
Central African Republic - Bangui
Chad - N''Djamena
Chile - Santiago
China - Beijing
Christmas

In [67]:
# 한번 더 실행해보기
country_capital_data_pipeline_v2(link)

Abkhazia - Sukhumi
Afghanistan - Kabul
Akrotiri and Dhekelia - Episkopi Cantonment
Albania - Tirana
Algeria - Algiers
American Samoa - Pago Pago
Andorra - Andorra la Vella
Angola - Luanda
Anguilla - The Valley
Antigua and Barbuda - St. John''s
Argentina - Buenos Aires
Armenia - Yerevan
Aruba - Oranjestad
Ascension Island - Georgetown
Australia - Canberra
Austria - Vienna
Azerbaijan - Baku
Bahamas - Nassau
Bahrain - Manama
Bangladesh - Dhaka
Barbados - Bridgetown
Belarus - Minsk
Belgium - Brussels
Belize - Belmopan
Benin - Porto-Novo
Bermuda - Hamilton
Bhutan - Thimphu
Bolivia - La Paz
Bosnia and Herzegovina - Sarajevo
Botswana - Gaborone
Brazil - Brasââ lia
British Virgin Islands - Road Town
Brunei - Bandar Seri Begawan
Bulgaria - Sofia
Burkina Faso - Ouagadougou
Burundi - Bujumbura
Cambodia - Phnom Penh
Cameroon - YaoundâÂ©
Canada - Ottawa
Cape Verde - Praia
Cayman Islands - George Town
Central African Republic - Bangui
Chad - N''Djamena
Chile - Santiago
China - Beijing
Christmas

In [68]:
# 이번에는 Transaction이 제대로 동작하는지 load_v3를 만들고 고의로 SQL 문법 에러를 내보기
def load_v3(con, records):
    target_table = "dev.raw_data.country_capital"
    try:
        con.execute("BEGIN;")
        # 여기서 CREATE OR REPLACE TABLE이 사용되면 안됨. DDL이라 Snowflake에서는 transaction을 지키지 않기 때문
        con.execute(f"CREATE TABLE IF NOT EXISTS {target_table} (country varchar primary key, capital varchar);")
        con.execute(f"DELETE FROM {target_table};")
        for r in records:
            country = r[0].replace("'", "''")
            capital = r[1].replace("'", "''")
            print(country, "-", capital)

            # 아래 고의로 에러를 냈음
            sql = f"INSERT INTTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
            con.execute(sql)
        con.execute("COMMIT;")
    except Exception as e:
        con.execute("ROLLBACK;")
        print(e)
        raise e

In [72]:
cur = return_snowflake_conn()
lines = transform_v2(data)


In [73]:
load_v3(cur, lines)

Abkhazia - Sukhumi
001003 (42000): SQL compilation error:
syntax error line 1 at position 7 unexpected 'INTTO'.


ProgrammingError: 001003 (42000): SQL compilation error:
syntax error line 1 at position 7 unexpected 'INTTO'.

In [74]:
# transaction이 동작하지 않았다면 앞서 DELETE가 실행되어서 테이블의 내용이 없어야함.
# 248개의 레코드가 있음을 확인해보자
cur.execute("SELECT COUNT(1) FROM dev.raw_data.country_capital")
results = cur.fetchone()
print(results)

(247,)
