# Includes

In [None]:
# SQL
import sqlalchemy as sa
from sqlalchemy.engine import URL
from sqlalchemy import create_engine
import duckdb

# file system
import os

# data manipulation
import pandas as pd

# timing
from time import time

# utility
import yaml

# Variables

In [None]:
server = None
port = None
warehouse = None
shard0 = None
shard1 = None
items = None
username = None
password = None

target = 'dev'
profile = 'sitecore_warehouse'

start_path = os.path.expanduser("~\\.dbt\\")
with open(f"{start_path}profiles.yml", "r", encoding="utf-8") as f: 
    config = yaml.safe_load(f) 
    server = config[profile]["outputs"][target]["server"]
    port = config[profile]["outputs"][target]["port"]
    warehouse = config[profile]["outputs"][target]["database"]
    shard0 = config[profile]["outputs"][target]["database_shard0"]
    shard1 = config[profile]["outputs"][target]["database_shard1"]
    items = config[profile]["outputs"][target]["database_items"]
    username = config[profile]["outputs"][target]["user"]
    password = config[profile]["outputs"][target]["password"]

# Connection

In [None]:
connection_string = f'''Driver={{SQL Server}};
                        Server={server};
                        Port={port};
                        Database={warehouse};
                        uid={username};
                        pwd={password};'''
connection_url = URL.create("mssql+pyodbc", query={"odbc_connect": connection_string})
connection_engine = create_engine(connection_url)

# Export to Parquet

In [None]:
def get_tables_in_schema(schema):
    """ get table list from schema """
    sql = f"""select tab.name as [table]
		      from sys.tables tab
		      where schema_name(tab.schema_id)='{schema}'"""
    df = pd.read_sql(sql, connection_engine)
    return df['table'].values.tolist()

def export_table_to_parquet(schema, table):
    """ export table to parquet """
    time_step = time()
    print("Exporting: ", table)
    sql = f"SELECT * FROM {schema}.{table}"
    lines = 0
    with connection_engine.connect().execution_options(stream_results=True) as connection:
        for i, df in enumerate(pd.read_sql(sql, connection, chunksize=1000000)):
            # by chunk of 1M rows if needed
            t_step = time()
            file_name = table + ('' if i==0 else f'_{i}m')
            duckdb.sql(f"copy df to 'parquet/{table}/{file_name}.parquet' (format parquet)")
            lines += df.shape[0]
            print('  ', file_name, df.shape[0], f'lines ({round(time() - t_step, 2)}s)')
    print("  ", lines, f"lines exported {'' if i==0 else f' in {i} files'} ({round(time() - time_step, 2)}s)")

schema = 'mart_schema'
for table in get_tables_in_schema(schema): 
    directory = f"parquet/{table}"
    os.makedirs(directory, exist_ok=True)
    export_table_to_parquet(schema, table)