# BIGQUERY TO SNOWFLAKE SCHEMA CONVERSION 

### 1. CREATE SNOWFLAKE DDL SCRIPT FROM JSON SCHEMA FILES
### 2. EXECUTE SCRIPTS TO CREATE TABLES
### 3. PUT DATA FILES INTO INTERNAL STAGE
### 4. COPY DATA FILES FROM INTERNAL STAGE TO SNOWFLAKE TABLES
### 5. VALIDATE SUCCESS AND FAILED LOADS

In [None]:
# Import required libraries
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import avg, sum, col,lit
from snowflake.snowpark.functions import udf, sproc, col
from snowflake.snowpark.types import IntegerType, FloatType, LongType, DoubleType, DecimalType,StringType, BooleanType, Variant
from snowflake.snowpark.types import PandasSeries, PandasDataFrame
from snowflake.snowpark import functions as fn

import sys ,json
import io
import logging
import pandas as pd

import joblib
import pandas as pd
import numpy as np
import json

from snowflake.snowpark import version
print (f"snowflake snowpark version is: {version.VERSION}")

### Install glob2 if not installed already

In [None]:
#!pip install glob2

### Create the scripts from the provided schema json files

In [None]:
script = []
from glob2 import glob
scriptfiles = glob('schema_files/*.json')
for j in scriptfiles:
# j = 'schema_files/affinity_api_weekly_dma_brand_channel_panel_schema.json'
    try:
        table = j.split('/')[-1].split('.')[0].replace('_schema','')
        f = open(j)
        data = json.load(f)
        col = []
        table_create = [f"create or replace table {table} ("]
        for i in range(len(data)):
            dtype = data[i]['type']
            if data[i]['type'] == 'BIGNUMERIC': ##not supported in snowflake
                dtype = 'NUMBER'
            if i<len(data)-1:
                table_create+=[data[i]['name']+' '+dtype]+[',']
            else:
                table_create+=[data[i]['name']+' '+dtype]        
        table_create+=[')']    
        script+=[''.join([i for i in table_create])]
    except:
        print (f"cannot create dml script for '{j}' , please check if valid file")
print (f"total files read : {len(scriptfiles)}")
print (f"total scripts generated : {len(script)}")
print (f"invalid files : {len(scriptfiles) - len(script)}")

### Connect to the snowflake session

In [None]:
snowflake_connection_cfg = open('cred.json')
snowflake_connection_cfg = snowflake_connection_cfg.read()
snowflake_connection_cfg = json.loads(snowflake_connection_cfg)

# Creating Snowpark Session
load_session = Session.builder.configs(snowflake_connection_cfg).create()
print('Current Database:', load_session.get_current_database())
print('Current Schema:', load_session.get_current_schema())
print('Current Warehouse:', load_session.get_current_warehouse())
print("Warehouse set up:")
load_session.sql("show warehouses like 'APP_WH'").collect()

### Create tables in snowflake

In [None]:
for s in script:
    try:
        load_session.sql(s).collect()
    except:
        print (f"cannot create table for dml script , please check if script is valid!")

### PUT the data files from your local folder into snowflake internal stage

In [None]:
load_session.sql("CREATE OR REPLACE STAGE stage_data").collect()

In [None]:
datafiles = glob('data/*.csv')

In [None]:
for c in datafiles:
    try:
        load_session.file.put(c, 'stage_data')
    except:
        print (f"cannot load file {c}")
print (f"total number of data file available : {len(datafiles)}")

In [None]:
internal_stage_list = load_session.sql("list @stage_data").collect()
print (f"total number of data file available : {len(internal_stage_list)}")

### Load Data to Snowflake from internal stage

In [None]:
load_script = []
loads_failed = []
loads_success = []
stage_directory = 'stage_data'
for d in datafiles:
    copy = f"copy into {d.split('/')[-1].split('.')[0].replace('synthetic_data_','')}\
     from @{stage_directory}/{d.split('/')[-1].split('.')[0]}.csv.gz "
    form = '''FILE_FORMAT = (TYPE = 'csv' RECORD_DELIMITER = '\\n' SKIP_HEADER = 1 field_optionally_enclosed_by='"' DATE_FORMAT = 'YYYY-MM-DD')'''
    try:
        load_session.sql(copy+form).collect()
        loads_success.append(d)
    except:
        loads_failed.append(d)

In [None]:
loads_failed ### take action

In [None]:
loads_success ### validate

In [None]:
loads_still_failed = []

In [None]:
for f in loads_failed:
    df = pd.read_csv(f)
    table_name = f.replace('Synthetic data/synthetic_data_','').split('.')[0]
    try:
        load_session.create_dataframe(df)\
        .write.mode("append")\
        .save_as_table(table_name)
    except:
        loads_still_failed.append(f)

In [None]:
loads_still_failed

In [None]:
load_session.close()
print('Finished!!!')