Snowflake Python Client 설정하기 

로그인을 위한 credential을 Colab Secrets에 설정하고 읽어들이기
snowflake-connector-python 파이썬 모듈 설치


In [2]:
from dotenv import load_dotenv
import snowflake.connector
import os

def return_snowflake_conn():

    load_dotenv(override=True)

    conn = snowflake.connector.connect(
        user=os.getenv("SNOWFLAKE_USER"),
        password=os.getenv("SNOWFLAKE_PASSWORD"),
        account=os.getenv("SNOWFLAKE_ACCOUNT")
    )
    return conn.cursor()

print("Connection successful!")
print(os.getenv("SNOWFLAKE_ACCOUNT"))




Connection successful!
mpqflwu-wp69976


# 첫 번째 버전 : ETL 함수 정의

In [3]:
import requests

def extract(url):
    f=requests.get(url)
    return f.text

In [4]:
def transform(text):
    lines = text.strip().split("\n")
    records = []
    for l in lines:
        (country, capital) = l.split(",")
        records.append([country, capital])
    return records

In [5]:
def load(cur, records):
    target_table = "dev.raw_data.country_capital"
    cur.execute(f"""
        CREATE TABLE IF NOT EXISTS {target_table} (
            country varchar primary key,
            capital varchar
        )
    """)
    for r in records:
        country = r[0].replace("'", "''")
        capital = r[1].replace("'", "''")
        sql = f"insert into {target_table} (country, capital) values ('{country}', '{capital}')"
        print(country, "-", capital, "-", sql)
        cur.execute(sql)

In [6]:
link = "https://s3-geospatial.s3.us-west-2.amazonaws.com/country_capital.csv"

data = extract(link)

In [7]:
data

"country,capital\nAbkhazia,Sukhumi\nAfghanistan,Kabul\nAkrotiri and Dhekelia,Episkopi Cantonment\nAlbania,Tirana\nAlgeria,Algiers\nAmerican Samoa,Pago Pago\nAndorra,Andorra la Vella\nAngola,Luanda\nAnguilla,The Valley\nAntigua and Barbuda,St. John's\nArgentina,Buenos Aires\nArmenia,Yerevan\nAruba,Oranjestad\nAscension Island,Georgetown\nAustralia,Canberra\nAustria,Vienna\nAzerbaijan,Baku\nBahamas,Nassau\nBahrain,Manama\nBangladesh,Dhaka\nBarbados,Bridgetown\nBelarus,Minsk\nBelgium,Brussels\nBelize,Belmopan\nBenin,Porto-Novo\nBermuda,Hamilton\nBhutan,Thimphu\nBolivia,La Paz\nBosnia and Herzegovina,Sarajevo\nBotswana,Gaborone\nBrazil,Brasâ\x88\x9aâ\x89\xa0lia\nBritish Virgin Islands,Road Town\nBrunei,Bandar Seri Begawan\nBulgaria,Sofia\nBurkina Faso,Ouagadougou\nBurundi,Bujumbura\nCambodia,Phnom Penh\nCameroon,Yaoundâ\x88\x9aÂ©\nCanada,Ottawa\nCape Verde,Praia\nCayman Islands,George Town\nCentral African Republic,Bangui\nChad,N'Djamena\nChile,Santiago\nChina,Beijing\nChristmas Island,Fly

In [8]:
len(data)

5069

In [9]:
lines = transform(data)

In [10]:
len(lines)

248

In [11]:
lines[0:10]

[['country', 'capital'],
 ['Abkhazia', 'Sukhumi'],
 ['Afghanistan', 'Kabul'],
 ['Akrotiri and Dhekelia', 'Episkopi Cantonment'],
 ['Albania', 'Tirana'],
 ['Algeria', 'Algiers'],
 ['American Samoa', 'Pago Pago'],
 ['Andorra', 'Andorra la Vella'],
 ['Angola', 'Luanda'],
 ['Anguilla', 'The Valley']]

In [12]:
cur = return_snowflake_conn()
load(cur, lines)

country - capital - insert into dev.raw_data.country_capital (country, capital) values ('country', 'capital')
Abkhazia - Sukhumi - insert into dev.raw_data.country_capital (country, capital) values ('Abkhazia', 'Sukhumi')
Afghanistan - Kabul - insert into dev.raw_data.country_capital (country, capital) values ('Afghanistan', 'Kabul')
Akrotiri and Dhekelia - Episkopi Cantonment - insert into dev.raw_data.country_capital (country, capital) values ('Akrotiri and Dhekelia', 'Episkopi Cantonment')
Albania - Tirana - insert into dev.raw_data.country_capital (country, capital) values ('Albania', 'Tirana')
Algeria - Algiers - insert into dev.raw_data.country_capital (country, capital) values ('Algeria', 'Algiers')
American Samoa - Pago Pago - insert into dev.raw_data.country_capital (country, capital) values ('American Samoa', 'Pago Pago')
Andorra - Andorra la Vella - insert into dev.raw_data.country_capital (country, capital) values ('Andorra', 'Andorra la Vella')
Angola - Luanda - insert int

In [13]:
# table를 읽어서 판다스 데이터프레임으로 변환하고 처음 5개의 레코드와 레코드 수를 표시
def check_table_stats(cur, table, key):
    result = cur.execute(f"SELECT * FROM {table} ORDER BY {key}")
    df = cur.fetch_pandas_all()
    print(len(df))
    print(df.head())
    return df

In [14]:
check_table_stats(cur, "dev.raw_data.country_capital", "country")

495
                 COUNTRY              CAPITAL
0               Abkhazia              Sukhumi
1               Abkhazia              Sukhumi
2            Afghanistan                Kabul
3            Afghanistan                Kabul
4  Akrotiri and Dhekelia  Episkopi Cantonment


Unnamed: 0,COUNTRY,CAPITAL
0,Abkhazia,Sukhumi
1,Abkhazia,Sukhumi
2,Afghanistan,Kabul
3,Afghanistan,Kabul
4,Akrotiri and Dhekelia,Episkopi Cantonment
...,...,...
490,Zambia,Lusaka
491,Zambia,Lusaka
492,Zimbabwe,Harare
493,Zimbabwe,Harare


In [15]:
data = extract(link)
lines = transform(data)
cur = return_snowflake_conn()
load(cur, lines)

country - capital - insert into dev.raw_data.country_capital (country, capital) values ('country', 'capital')
Abkhazia - Sukhumi - insert into dev.raw_data.country_capital (country, capital) values ('Abkhazia', 'Sukhumi')
Afghanistan - Kabul - insert into dev.raw_data.country_capital (country, capital) values ('Afghanistan', 'Kabul')
Akrotiri and Dhekelia - Episkopi Cantonment - insert into dev.raw_data.country_capital (country, capital) values ('Akrotiri and Dhekelia', 'Episkopi Cantonment')
Albania - Tirana - insert into dev.raw_data.country_capital (country, capital) values ('Albania', 'Tirana')
Algeria - Algiers - insert into dev.raw_data.country_capital (country, capital) values ('Algeria', 'Algiers')
American Samoa - Pago Pago - insert into dev.raw_data.country_capital (country, capital) values ('American Samoa', 'Pago Pago')
Andorra - Andorra la Vella - insert into dev.raw_data.country_capital (country, capital) values ('Andorra', 'Andorra la Vella')
Angola - Luanda - insert int

In [16]:
# 결과를 보면 Primary Key 유일성이 없음
# 데이터가 중복되어서 적재된 것으로 확인 
check_table_stats(cur, "dev.raw_data.country_capital", "country")

743
       COUNTRY  CAPITAL
0     Abkhazia  Sukhumi
1     Abkhazia  Sukhumi
2     Abkhazia  Sukhumi
3  Afghanistan    Kabul
4  Afghanistan    Kabul


Unnamed: 0,COUNTRY,CAPITAL
0,Abkhazia,Sukhumi
1,Abkhazia,Sukhumi
2,Abkhazia,Sukhumi
3,Afghanistan,Kabul
4,Afghanistan,Kabul
...,...,...
738,Zimbabwe,Harare
739,Zimbabwe,Harare
740,Zimbabwe,Harare
741,country,capital


## 현재 문제점
1. 데이터 헤더도 같이 가져옴
2. 데이터가 중복되어 적재됨 -> 멱등성 위반 
 - 데이터를 삭제하고 전체 다시 적재 (전체 업데이트) 
 - 데이터를 날짜/시간 단위로 테이블을 업데이트 신규 혹은 변경 레코드만 복사 (증분업데이트)

## 해결
1. 값을 리턴할 때 1째줄 제외하고 가져오기 
2. (데이터가 적을 때 쉽고 안전하게 바꿀 수 있는) 전체 업데이트 채택 그리고 트랜잭션 사용으로 (begin/commit/rollback) 멱등성 보장


In [17]:
# transform 함수를 개선해서 헤더를 빼고 리턴하게 만들자
def transform_v2(text):
    lines = text.strip().split("\n")
    records = []
    for l in lines:  # remove the first row
        (country, capital) = l.split(",")
        records.append([country, capital])
    return records[1:] # 여기서 첫번째 줄을 삭제 하고 리턴하도록 변경함 

In [18]:
# load 함수를 개선해서 제대로 된 "Full refresh"를 구현하자
#  - 매번 테이블을 새로 만들고 레코드를 새로 적재하는 방식으로 구현
#  - SQL transaction을 사용하는 것이 일반적이지만 Snowflake은 SQL transaction을 지원하지
#    않기 때문에 다른 방식으로 구현
def load_v2(con, records):
    target_table = "dev.raw_data.country_capital"
    try:
        # 여기서 CREATE OR REPLACE TABLE이 사용되면 안됨. DDL이라 Snowflake에서는 transaction을 지키지 않기 때문
        con.execute(f"CREATE TABLE IF NOT EXISTS {target_table} (country varchar primary key, capital varchar);")
        con.execute("BEGIN;") # 작업의 한단위로 묶을거야. 하나라도 실패하면 전부 취소해 (begin / commit / rollback)
        con.execute(f"DELETE FROM {target_table};")
        for r in records:
            country = r[0].replace("'", "''")
            capital = r[1].replace("'", "''")
            print(country, "-", capital)

            sql = f"INSERT INTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
            con.execute(sql)
        con.execute("COMMIT;")
    except Exception as e:
        con.execute("ROLLBACK;")
        print(e)
        raise e

In [19]:
def country_capital_data_pipeline_v2(link):
    cur = return_snowflake_conn()# 커넥션을 얻어서 연결해줌
    data = extract(link) # 데이터 추출 
    lines = transform_v2(data) # 데이터 반환
    load_v2(cur, lines) # 데이터 적재 
    check_table_stats(cur, "dev.raw_data.country_capital", "country")
    cur.close()

In [20]:
link = "https://s3-geospatial.s3.us-west-2.amazonaws.com/country_capital.csv"
country_capital_data_pipeline_v2(link)


Abkhazia - Sukhumi
Afghanistan - Kabul
Akrotiri and Dhekelia - Episkopi Cantonment
Albania - Tirana
Algeria - Algiers
American Samoa - Pago Pago
Andorra - Andorra la Vella
Angola - Luanda
Anguilla - The Valley
Antigua and Barbuda - St. John''s
Argentina - Buenos Aires
Armenia - Yerevan
Aruba - Oranjestad
Ascension Island - Georgetown
Australia - Canberra
Austria - Vienna
Azerbaijan - Baku
Bahamas - Nassau
Bahrain - Manama
Bangladesh - Dhaka
Barbados - Bridgetown
Belarus - Minsk
Belgium - Brussels
Belize - Belmopan
Benin - Porto-Novo
Bermuda - Hamilton
Bhutan - Thimphu
Bolivia - La Paz
Bosnia and Herzegovina - Sarajevo
Botswana - Gaborone
Brazil - Brasââ lia
British Virgin Islands - Road Town
Brunei - Bandar Seri Begawan
Bulgaria - Sofia
Burkina Faso - Ouagadougou
Burundi - Bujumbura
Cambodia - Phnom Penh
Cameroon - YaoundâÂ©
Canada - Ottawa
Cape Verde - Praia
Cayman Islands - George Town
Central African Republic - Bangui
Chad - N''Djamena
Chile - Santiago
China - Beijing
Christmas

In [21]:
# 한번 더 실행해보기
country_capital_data_pipeline_v2(link)

Abkhazia - Sukhumi
Afghanistan - Kabul
Akrotiri and Dhekelia - Episkopi Cantonment
Albania - Tirana
Algeria - Algiers
American Samoa - Pago Pago
Andorra - Andorra la Vella
Angola - Luanda
Anguilla - The Valley
Antigua and Barbuda - St. John''s
Argentina - Buenos Aires
Armenia - Yerevan
Aruba - Oranjestad
Ascension Island - Georgetown
Australia - Canberra
Austria - Vienna
Azerbaijan - Baku
Bahamas - Nassau
Bahrain - Manama
Bangladesh - Dhaka
Barbados - Bridgetown
Belarus - Minsk
Belgium - Brussels
Belize - Belmopan
Benin - Porto-Novo
Bermuda - Hamilton
Bhutan - Thimphu
Bolivia - La Paz
Bosnia and Herzegovina - Sarajevo
Botswana - Gaborone
Brazil - Brasââ lia
British Virgin Islands - Road Town
Brunei - Bandar Seri Begawan
Bulgaria - Sofia
Burkina Faso - Ouagadougou
Burundi - Bujumbura
Cambodia - Phnom Penh
Cameroon - YaoundâÂ©
Canada - Ottawa
Cape Verde - Praia
Cayman Islands - George Town
Central African Republic - Bangui
Chad - N''Djamena
Chile - Santiago
China - Beijing
Christmas

## 혹시라도 문제가 생겼을 때 문제를 확인할 수 있도록 에러를 처리하더라도 알 수 있도록 처리가 필요함 

In [22]:
# 이번에는 Transaction이 제대로 동작하는지 load_v3를 만들고 고의로 SQL 문법 에러를 내보기
def load_v3(con, records):
    target_table = "dev.raw_data.country_capital"
    try:
        con.execute("BEGIN;")
        # 여기서 CREATE OR REPLACE TABLE이 사용되면 안됨. DDL이라 Snowflake에서는 transaction을 지키지 않기 때문
        con.execute(f"CREATE TABLE IF NOT EXISTS {target_table} (country varchar primary key, capital varchar);")
        con.execute(f"DELETE FROM {target_table};")
        for r in records:
            country = r[0].replace("'", "''")
            capital = r[1].replace("'", "''")
            print(country, "-", capital)

            # 아래 고의로 에러를 냈음
            sql = f"INSERT INTTO {target_table} (country, capital) VALUES ('{country}', '{capital}')"
            con.execute(sql)
        con.execute("COMMIT;")
    except Exception as e:
        con.execute("ROLLBACK;")
        print(e)
        raise e

In [23]:
load_v3(cur, lines)

country - capital
001003 (42000): SQL compilation error:
syntax error line 1 at position 7 unexpected 'INTTO'.


ProgrammingError: 001003 (42000): SQL compilation error:
syntax error line 1 at position 7 unexpected 'INTTO'.

In [24]:
# transaction이 동작하지 않았다면 앞서 DELETE가 실행되어서 테이블의 내용이 없어야함.
# 248개의 레코드가 있음을 확인해보자
cur.execute("SELECT COUNT(1) FROM dev.raw_data.country_capital")
results = cur.fetchone()
print(results)

(247,)
