In [None]:
collection="recommendRoutines"
dt='19700101'
env="stg"
buffer_size = 100000

In [None]:
import datetime
import pandas as pd
import pyarrow
import re

from pymongo import MongoClient
from skt.ye import get_secrets
from skt.gcp import get_bigquery_client
# from skt.gcp import get_temp_table

mongodb_access_info = get_secrets("adot/routine_db/mongodb_access")
db_user = mongodb_access_info[f"{env}_user"]
db_password = mongodb_access_info[f"{env}_password"]

# STG
mongodb_URI = f"mongodb://{db_user}:{db_password}@172.27.8.210:27017/"

# Prod
mongodb_prod_URI = f"mongodb://{db_user}:{db_password}@172.27.99.85:27017,172.27.99.86:27017,172.27.99.87:27017,172.27.99.88:27017,172.27.99.89:27017/"

bq_table = re.sub(r'(?<!^)(?=[A-Z])', '_', collection).lower()
bq = get_bigquery_client()

database=""
bigquery_destination=""
if env=="prd":
    #client
    client = MongoClient(mongodb_prod_URI)
    #database
    database=client["prd_nugu"]
    bigquery_destination=f"{bq.project}.adot_routine_prd.{bq_table}"
else:
    #client
    client = MongoClient(mongodb_URI)
    #database
    database=client["stg_nugu"]
    bigquery_destination=f"{bq.project}.adot_routine_stg.{bq_table}"

In [None]:
from google.cloud.bigquery import RangePartitioning, PartitionRange, TimePartitioning
from google.cloud.bigquery.job import QueryJobConfig, LoadJobConfig
from google.cloud.exceptions import NotFound
from google.cloud import bigquery
from skt.gcp import *

#little modified from skt pandas_to_bq library
#added component list
#1.
#parquet_options = bigquery.format_options.ParquetOptions()
#parquet_options.enable_list_inference = True
#2.
#pyarrow.Table.from_pandas(pd_df[pd_df[partition] == part_val],preserve_index=False)
def pandas_to_bq(pd_df, destination, partition=None, clustering_fields=None, overwrite=True):
    range_partitioning = None
    time_partitioning = None
    bq = get_bigquery_client()
    if bq_table_exists(destination):
        target_table = bq.get_table(destination)
        range_partitioning = target_table.range_partitioning
        time_partitioning = target_table.time_partitioning
    else:
        if partition:
            from pandas.api.types import is_integer_dtype
            import datetime

            if is_integer_dtype(pd_df[partition][0]):
                range_partitioning = RangePartitioning(
                    PartitionRange(start=200001, end=209912, interval=1), field=partition
                )
            elif isinstance(pd_df[partition][0], datetime.date):
                time_partitioning = TimePartitioning(field=partition)
            else:
                raise Exception("Partition type must be either date or range.")

    parquet_options = bigquery.format_options.ParquetOptions()
    parquet_options.enable_list_inference = True
    
    if time_partitioning or range_partitioning:
        if time_partitioning:
            input_partitions = [(p.strftime("%Y%m%d"), p) for p in set(pd_df[partition])]
        elif range_partitioning:
            input_partitions = [(p, p) for p in set(pd_df[partition])]
        for part, part_val in input_partitions:
            writer = pyarrow.BufferOutputStream()
            pyarrow.parquet.write_table(
                pyarrow.Table.from_pandas(pd_df[pd_df[partition] == part_val],preserve_index=False),
                writer
            )
            reader = pyarrow.BufferReader(writer.getvalue())
            
            bq.load_table_from_file(
                reader,
                destination=f"{destination}${part}",
                job_config=LoadJobConfig(
                    create_disposition="CREATE_IF_NEEDED",
                    write_disposition="WRITE_TRUNCATE" if overwrite else "WRITE_APPEND",
                    time_partitioning=time_partitioning,
                    range_partitioning=range_partitioning,
                    clustering_fields=clustering_fields,
                    source_format = bigquery.SourceFormat.PARQUET,
                    parquet_options = parquet_options,
                    schema_update_options=[
                        "ALLOW_FIELD_ADDITION",
                        "ALLOW_FIELD_RELAXATION",
                    ],
                ),
            ).result()
    else:
        bq.load_table_from_dataframe(
            dataframe=pd_df,
            destination=destination,
            job_config=LoadJobConfig(
                create_disposition="CREATE_IF_NEEDED",
                write_disposition="WRITE_TRUNCATE" if overwrite else "WRITE_APPEND",
            ),
        ).result()
    bq.close()


In [None]:
import json

def process_document(x):
    x["_id"] = str(x["_id"])
    # if "modifiedAt" in x and isinstance(x["modifiedAt"], str):
    #     x["modifiedAt"] = datetime.datetime.strptime(x["modifiedAt"], "%Y-%m-%dT%H:%M:%S.%f%z")
    # if "createAt" in x and isinstance(x["createAt"], str):
    #     x["createAt"] = datetime.datetime.strptime(x["createAt"], "%Y-%m-%dT%H:%M:%S.%f%z")
    if "modifiedAt" in x and isinstance(x["modifiedAt"], dict):
        x["modifiedAt"] = x["modifiedAt"]["$date"]
    if "createAt" in x and isinstance(x["createAt"], dict):
        x["createAt"] = x["createAt"]["$date"]
    if collection == "userRoutine":
        x["actions"] = json.dumps(x["actions"])
    return x

In [None]:
database[collection].count_documents({})

In [None]:
def append_df_to_bq_temp_table(df, dest_table):
    if "expiredAt" in df.columns:
        df["expiredAt"]=df["expiredAt"].astype("datetime64[ns]")
    if "createAt" in df.columns:
        df["createAt"]=df["createAt"].astype("datetime64[ns]")
    if "modifiedAt" in df.columns:
        df["modifiedAt"]=df["modifiedAt"].astype("datetime64[ns]")
    if "snoozeAt" in df.columns:
        df["snoozeAt"]=df["snoozeAt"].astype("datetime64[ns]")
    if collection=="recommendRoutine":
        df["priority"]=df["priority"].astype("bool")
        df["enable"]=df["enable"].astype("bool")
    df["dt"]=datetime.date(int(dt[0:4]),int(dt[4:6]),int(dt[6:]))
    pandas_to_bq(df, dest_table, partition="dt", overwrite=False)

In [None]:
from skt.gcp import get_temp_table

inputs = database[collection].find()

temp_table = get_temp_table()
buffer = []

for x in inputs:
    buffer.append(x)
    if len(buffer) == buffer_size:
        buffer = list(map(process_document, buffer))
        df=pd.DataFrame(buffer)
        append_df_to_bq_temp_table(df, temp_table)
        buffer = []
if buffer:
    buffer = list(map(process_document, buffer))
    df=pd.DataFrame(buffer)
    append_df_to_bq_temp_table(df, temp_table)
    buffer = []


In [None]:
temp_table

In [None]:
def insert(source, target, dt):
    from google.cloud.bigquery.table import TableReference
    from google.cloud.bigquery.dataset import DatasetReference
    from google.cloud.bigquery.job import QueryJobConfig
    from skt.gcp import _print_query_job_results
    
    bq = get_bigquery_client()
    table = bq.get_table(target)
    project_id, dataset_id, table_id = target.split(".")
    ref = TableReference(DatasetReference(project_id, dataset_id), f"{table_id}${dt}")
    qjc = QueryJobConfig(
        destination=ref,
        write_disposition="WRITE_TRUNCATE",
        create_disposition="CREATE_IF_NEEDED",
        time_partitioning=table.time_partitioning,
        range_partitioning=table.range_partitioning,
        clustering_fields=table.clustering_fields,
        schema_update_options=[
            "ALLOW_FIELD_ADDITION",
            "ALLOW_FIELD_RELAXATION",
        ]
    )
    query = f"select * from {source}"
    job = bq.query(query, job_config=qjc)
    job.result()
    _print_query_job_results(job)

In [None]:
insert(temp_table, bigquery_destination, dt)