# BigQuery Sample Codes  
AIDP 분석환경에서 제공하는 JupyerHub에서 skt 파이썬 패키지를 사용하면 BigQuery와 연동하여 분석 및 모델링 작업을 수행할 수 있습니다.  
skt 패키지는 분석환경에 기본적으로 설치되어 있으며 pip를 사용하여 버전 업그레이드를 할 수 있습니다.
~~~bash
$ pip install --upgrade skt
~~~

## BigQuery 실행  
다음과 같이 2가지 방법으로 쿼리를 실행하고 결과를 확인할 수 있습니다.  
- IPython Magic을 사용하여 Jupyter Notebook Cell에서 SQL 실행
- BigQuery Client 사용하여 SQL 실행

### IPython Magic으로 SQL 실행  
Jupyter Notebook Cell에서 SQL을 실행하고 그 결과를 확인할 수 있습니다. 쿼리 결과는 변수에 Pandas Dataframe으로 저장할 수 있습니다. Ditonary로 정의된 쿼리 파라미터를 SQL에 주입할 수 있습니다.

먼저, IPython Magic을 로드합니다.

In [None]:
from skt.gcp import load_bigquery_ipython_magic

load_bigquery_ipython_magic()

다음과 같이 SQL을 실행합니다.

In [None]:
%%bq
    SELECT MAX(dt) as max_dt
    FROM tworld.twd_dst_product_group

SQL 결과를 Pandas DataFrame으로 변수에 저장할 수 있습니다. 다음은 max_dt라는 변수에 SQL 결과를 저장하는 예제입니다.

In [None]:
from skt.ye import get_spark
from skt.gcp import load_bigquery_ipython_magic, \
                    bq_to_pandas, \
                    get_bigquery_client

load_bigquery_ipython_magic()

In [None]:
%%bq max_dt

SELECT MAX(dt) as value
FROM tworld.twd_dst_product_group

In [None]:
max_dt

쿼리 파라미터를 SQL에 전달할 수 있습니다. 전달할 값은 Dictionary 타입으로 정의합니다.

In [None]:
query_params = {
    "max_dt": max_dt["value"][0].strftime("%Y-%m-%d")
}

query_params

다음과 같이 params 옵션에 전달할 쿼리 파라미터 변수를 정의합니다.

In [None]:
%%bq --params $query_params

SELECT *
FROM tworld.twd_dst_product_group
WHERE dt = '{max_dt}'


### BigQuery Client로 SQL 실행  
BigQuery Client는 Google에서 제공하는 Client Library에서 제공하는 객체입니다. SQL을 실행할 수 있을 뿐만 아니라 Google Client Library에서 제공하는 다양한 기능들을 활용할 수 있습니다. 더욱 자세한 내용은 <a href="https://googleapis.dev/python/bigquery/latest/generated/google.cloud.bigquery.client.Client.html#google.cloud.bigquery.client.Client" target="_blank">이곳</a>에서 확인하시기 바랍니다.

다음과 같이 BigQuery Client를 생성하여 SQL을 실행합니다. 쿼리 조회 결과가 있는 경우 Iterator 형태로 결과를 리턴해줍니다.

In [None]:
from skt.gcp import get_bigquery_client

bq_client = get_bigquery_client()

sql = """
    SELECT *
    FROM tworld.twd_dst_product_group
    WHERE dt = '{max_dt}'
    LIMIT 5
""".format(**query_params)

iterator = bq_client.query(sql).result()
for r in iterator:
    product_grp_id, product_grp_nm, dt = r
    print(product_grp_id, product_grp_nm, dt)

### INSERT OVERWRITE
BigQuery는 Hive와 달리 INSERT OVERWRITE 기능을 제공하지 않습니다. 그러나 skt 패키지의 bq_insert_overwrite 메서드를 사용하면 INSERT OVERWRITE가 가능합니다.  
만약 테이블이 존재하지 않는다면 새로 테이블을 생성합니다.  
저장하려는 대상 테이블이 파티셔닝되어 있다면 partition 파라미터를 사용하여 파티션 컬럼 이름을 설정합니다. sql 결과의 파티션 컬럼 값에 따라 해당 파티션으로 저장됩니다.  
대상 테이블이 cluster 설정이 되어 있거나 새로 생성하는 테이블에 clustrer를 설정하고 싶은 경우 clustering_fields 파라미터에 컬럼 이름을 리스트로 넣어줍니다.

In [None]:
from skt.gcp import bq_insert_overwrite, get_temp_table

sql = """
    SELECT *
    FROM tworld.twd_dst_product_group
    WHERE dt = '{max_dt}'
""".format(**query_params)

result_table_name= get_temp_table()

bq_insert_overwrite(sql=sql, destination=result_table_name, partition="dt", clustering_fields=["product_grp_id"])

### Multi Partitions
BigQuery는 단일 컬럼 파티션만 제공합니다. BigQuery에서 멀티 파티션 컬럼을 사용할 수는 없지만 BigQuery에서 제공하는 와일드카드 테이블을 응용하여 멀티 파티션 테이블처럼 정의할 수 있습니다.  
<b>"__"</b>를 구분자로 한 테이블들을 생성하여 멀티 파티션처럼 사용하는 예제입니다.

In [None]:
%%bq --params $query_params

CREATE OR REPLACE TABLE temp_1d.wildcard_table__subpart_1
AS
SELECT *
FROM tworld.twd_dst_product_group
WHERE dt = '{max_dt}'
;

CREATE OR REPLACE TABLE temp_1d.wildcard_table__subpart_2
AS
SELECT *
FROM tworld.twd_dst_product_group
WHERE dt = '{max_dt}'
;

SELECT *
FROM `temp_1d.wildcard_table__*`
WHERE _TABLE_SUFFIX = 'subpart_1'
LIMIT 5
;

이렇게 와일드카드로 생성한 테이블들에도 INSERT OVERWRITE를 사용할 수 있습니다. skt 패키지의 bq_insert_overwrite_with_suffixes 메서드를 사용하여 특정 suffix를 가지는 테이블의 특정 파티션에 INSERT OVERWRITE 할 수 있습니다. 마치 멀티 파티션 테이블에 데이터를 저장하는 것처럼 말입니다.  
아래와 같이 suffixes 파라미터에 추가 파티션으로 사용할 컬럼 이름을 지정합니다. suffix가 여러 개인 경우(예: table_name__subpart1__subpart2)도 suffixes 파라미터에 해당 컬럼 이름들을 리스트로 넣어주면 INSERT OVERWRITE 할 수 있습니다.

In [None]:
from skt.gcp import bq_insert_overwrite, get_temp_table

sql = """
    SELECT *, 'subpart_1' as subpart
    FROM tworld.twd_dst_product_group
    WHERE dt = '{max_dt}'
""".format(**query_params)

result_table_name= get_temp_table()

bq_insert_overwrite(sql=sql, destination=result_table_name, suffixes=["subpart"], partition="dt", clustering_fields=["product_grp_id"])

## BigQuery to Pandas
위에서 설명했듯이 BigQuery 쿼리 결과를 Pandas Dataframe으로 리턴받을 수 있습니다.
위의 IPython Magic 뿐만 아니라 skt 패키지에서 제공하는 메서드를 사용할 수도 있습니다.   
   
(참고 : Jupyter Notebook Cell에서 IPyhon Magic을 사용하여 SQL을 실행하는 경우 그 Cell에는 다른 Python 코드를 추가할 수 없습니다. Cell에 다른 Python 코드를 자유롭게 추가하려면 skt 패키지의 기능을 활용하시기 바랍니다.)


다음은 bq_to_pandas 메서드로 결과를 Pandas Dataframe에 저장하는 예입니다.

In [None]:
from skt.gcp import bq_to_pandas

pd_df = bq_to_pandas("""
    SELECT *
    FROM tworld.twd_dst_product_group
    WHERE dt = (SELECT MAX(dt) FROM tworld.twd_dst_product_group)
""")

pd_df

## Pandas to BigQuery

Pandas Dataframe을 특정 BigQuery 테이블에 저장할 수 있습니다.  
다음과 같이 pandas_to_bq 메서드를 사용합니다. destination 파라미터에 BigQuery 테이블을 지정해주며 테이블이 없는 경우 자동으로 생성됩니다.  
destination 테이블에 파티션이 있다면 Pandas Dataframe에서 자동으로 컬럼 이름을 감지하여 해당 파티션에 저장합니다.  
destination 테이블을 새로 생성하는 경우 partition 파라미터에 파티션으로 사용할 컬럼 이름을 지정해주어 파티셔닝된 테이블을 생성할 수 있습니다.  
또한 clustering_fields 파라미터를 사용하여 클러스터링에 사용할 컬럼을 지정할 수 있습니다.

In [None]:
import time
from skt.gcp import pandas_to_bq

dest_table = f"twd_dst_product_group_{str(int(time.time()))}"
print(f"저장할 테이블 : temp_1d.{dest_table}")

pandas_to_bq(
    pd_df=pd_df, 
    destination=f"sktaic-datahub.temp_1d.{dest_table}",
    partition="dt",
    clustering_fields=["product_grp_id"]
)

get_bigquery_client().query(f"""
    SELECT *
    FROM temp_1d.{dest_table}
""").result()

데이터가 테이블에 저장되었는지 확인합니다.

In [None]:
query_params =  {"dest_table": dest_table}

In [None]:
%%bq --params $query_params

SELECT *
FROM temp_1d.{dest_table}

## BigQuery to Spark  

BigQuery 데이터를 가져와서 Spark로 처리할 수 있습니다. BigQuery SQL 결과를 Spark Dataframe으로 변환한 후 이어서 데이터 처리가 가능합니다.

In [None]:
from skt.gcp import bq_to_df

spark_df = bq_to_df(f"""
    SELECT *
    FROM temp_1d.{dest_table}
""")

spark_df.head(5)

## Spark to BigQuery  

Spark으로 처리한 결과를 다시 BigQuery에 적재가 가능합니다.  
다음은 위에서 spark_df 변수에 저장한 Spark Dataframe을 다시 BigQuery에 적재하는 예제입니다.

1. 먼저 저장할 테이블을 BigQuery에 생성합니다. 여기서는 파티션된 테이블을 생성하고 특정 파티션에 Spark Dataframe을 저장할 것입니다.

In [None]:
from skt.gcp import df_to_bq_table
from pyspark.sql.types import DateType

dest_dataset = "temp_1d"
partitioned_dest_table = f"twd_dst_product_group_{str(int(time.time()))}"

get_bigquery_client().query(f"""
    CREATE OR REPLACE TABLE {dest_dataset}.{partitioned_dest_table}
    (
        product_grp_id STRING,
        product_grp_nm STRING,
        dt DATE
    )
    PARTITION BY dt
""").result()

print(f"생성된 테이블 : {dest_dataset}.{partitioned_dest_table}")

2. Spark Dataframe을 BigQuery 테이블에 저장합니다. 여기서는 파티션 컬럼 타입이 Date이므로 타입 변환 작업을 해주었습니다. 이렇게 경우에 따라 타입 변환 작업이 필요한 경우가 있으므로 타입에 주의해주세요.

In [None]:
changed_df = spark_df.select("product_grp_id", "product_grp_nm", spark_df.dt.cast("date"))
partition_dt = changed_df.head(1)[0].dt.strftime("%Y%m%d")
df_to_bq_table(df=changed_df,
               dataset=dest_dataset,
               table_name=partitioned_dest_table,
               partition=partition_dt,
               partition_field="dt",
               mode="overwrite")

3. BigQuery에 저장되었는지 확인합니다.

In [None]:
query_params = {"dataset":dest_dataset, "table_name":partitioned_dest_table}

In [None]:
%%bq --params $query_params
    SELECT *
    FROM {dataset}.{table_name}