## **Install/Update Packages**

In [None]:
%pip install -r ../requirements.txt --upgrade

## **Download Delta Tables from Lakehouse and Write to DuckDB**

In [3]:
# %run func-OneLakeStorageOperations.py
%run func-WriteFabricToDuckDb.py
import os
import polars as pl
import duckdb

# Create an instance of AzureStorageOperations
azure_ops = AzureStorageOperations()

# Get Authentication Token
token = azure_ops.get_authentication_token()
file_system_client = azure_ops.get_file_system_client(token)

# Connect to DuckDB
# conn = duckdb.connect('md:') # MotherDuck
# conn = duckdb.connect('Tables/data.db') # Local DuckDB Database
conn = duckdb.connect('../Tables/LH_Gold.db') # New DuckDB Database

# Select Database (If MotherDuck)
# conn.sql(f'CREATE OR REPLACE DATABASE LH_DeltaLake;')
# conn.sql(f'CREATE DATABASE IF NOT EXISTS LH_Bronze;')
# conn.sql(f'USE LH_Bronze;')

# Create Schema
# dt_today = datetime.today()
# dt_string = 'dbo_' + dt_today.strftime("%Y_%m_%d")
# conn.sql(f'CREATE OR REPLACE SCHEMA {dt_string}')

# Get the filtered subdirectory names for "Tables"
filtered_tables = azure_ops.list_items(file_system_client=file_system_client, target_directory_path="Tables")
# partial_match = ['fact', 'dim'] # Partial match strings
# filtered_tables = [table for table in filtered_tables if any(keyword in table for keyword in partial_match)] # Filter for Conatains
# filtered_tables = [table for table in filtered_tables if any(table.startswith(keyword) for keyword in partial_match)] # Filter for Starts With

# Prepare a list of SQL statements for batch execution
sql_statements = []

for table in filtered_tables:
    try:
        # Read the delta file from the lakehouse
        local_file_name = azure_ops.download_from_lakehouse(
            file_system_client=file_system_client,
            target_file_path=f'Tables/{table}',
            is_delta=True
        )
        
        print(f"Local file path for table {table}: {local_file_name}")
        
        if not os.path.exists(local_file_name):
            print(f"[E] File path does not exist: {local_file_name}")
            continue
        
        # Prepare SQL statement for batch execution
        sql_statements.append(f"""
        DROP TABLE IF EXISTS {table};
        CREATE TABLE {table} AS SELECT * FROM parquet_scan('{local_file_name}');
        """)
        
        print(f"[I] Table {table} prepared for creation/replacement in DuckDB.")
        
    except Exception as e:
        print(f"[E] Error processing table {table}: {e}")
    finally:
        # Clean up downloaded files immediately
        if local_file_name and os.path.exists(local_file_name):
            azure_ops.delete_local_path(local_file_name)

# Execute all SQL statements in a single transaction
with conn.cursor() as cursor:
    cursor.execute("BEGIN TRANSACTION;")
    for statement in sql_statements:
        cursor.execute(statement)
    cursor.execute("COMMIT;")

print("[I] All tables created/replaced in DuckDB.")

# Close the connection
conn.close()

## **Write DuckDB to SQL Server**

### **Polars: write_database()**
**Documentation: [polars.DataFrame.write_database()](https://docs.pola.rs/api/python/stable/reference/api/polars.DataFrame.write_database.html)**

In [None]:
%run ../main.py
import duckdb
import os
import polars as pl
from sqlalchemy import create_engine, text
from sqlalchemy.exc import OperationalError
from concurrent.futures import ThreadPoolExecutor, as_completed
from tqdm import tqdm

# Define connection parameters
server = os.getenv("server")
user = os.getenv("user")
password = os.getenv("password")
database = "LH_GOLD"

# Create the SQL Server URI connection string
sql_server_uri = f"mssql+pyodbc://{user}:{password}@{server}/{database}?driver=ODBC+Driver+17+for+SQL+Server"

def execute_non_transactional_query(query):
    try:
        engine = create_engine(f"mssql+pyodbc://{user}:{password}@{server}/master?driver=ODBC+Driver+17+for+SQL+Server")
        with engine.connect() as conn:
            conn.execute(text(query).execution_options(autocommit=True))  # Ensure the query is executed outside of a transaction
        print(f"Query executed successfully: {query}")
    except OperationalError as e:
        print(f"OperationalError: {e}")
    except Exception as e:
        print(f"Error executing query: {e}")

# Drop the LH_GOLD database if it exists
drop_db_query = "IF EXISTS (SELECT name FROM sys.databases WHERE name = 'LH_GOLD') DROP DATABASE LH_GOLD;"
execute_non_transactional_query(drop_db_query)

# Create the LH_GOLD database
create_db_query = "CREATE DATABASE LH_GOLD;"
execute_non_transactional_query(create_db_query)

# Define a function to process and transfer each table
def process_table(table, duckdb_connection, sql_server_uri, progress_bar):
    try:
        # Establish connection to DuckDB
        con_duckdb = duckdb.connect(duckdb_connection)
        
        # Fetch data from DuckDB table
        df_arrow = con_duckdb.execute(f'SELECT * FROM {table}').arrow()
        df = pl.from_arrow(df_arrow)
        
        # Optionally transform the DataFrame
        df = df.with_columns(pl.col("*").cast(pl.Utf8))

        # Use Polars' write_database method to write the DataFrame to SQL Server
        df.write_database(table, connection=sql_server_uri, if_table_exists='replace')

        print(f"Data successfully written to table {table} in SQL Server.")
    
    except Exception as e:
        print(f"Error while processing table {table}: {e}")
    finally:
        con_duckdb.close()
        # Update the progress bar
        progress_bar.update(1)

# Establish connection to DuckDB
duckdb_connection = '../Tables/LH_Gold.db'
con_duckdb = duckdb.connect(duckdb_connection)

# Fetch tables from DuckDB
tables = con_duckdb.execute("SHOW TABLES").fetchdf()
table_list = tables['name'].tolist()

# Create a tqdm progress bar
with tqdm(total=len(table_list), desc="Processing tables", unit="table") as progress_bar:
    # Use ThreadPoolExecutor to process tables in parallel
    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = [executor.submit(process_table, table, duckdb_connection, sql_server_uri, progress_bar) for table in table_list]

        # Process each future as it completes
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                print(f"Error occurred: {e}")

print("All operations completed successfully.")


### **Turbodbc (Faster) - Conda Kernel**
**Documentation: [Turbodbc Website](https://turbodbc.readthedocs.io/en/latest/pages/introduction.html)**

#### **<u>SQL Server</u>**

In [3]:
import os
import duckdb
import polars as pl
import turbodbc
import pyodbc
from concurrent.futures import ThreadPoolExecutor, as_completed
import logging
import time  # For measuring insert time
from typing import Union

# Initialize a list to collect error messages
error_messages = []

def get_database_connection(destination_database_system: str, connection_details: Union[str, dict]):
    try:
        if isinstance(connection_details, str):
            return turbodbc.connect(dsn=connection_details)
        
        if not isinstance(connection_details, dict):
            raise ValueError(f"Connection details must be a dictionary or a string (DSN). Got: {type(connection_details)}")

        if destination_database_system == 'sqlserver':
            return turbodbc.connect(
                DRIVER=connection_details['driver'],
                SERVER=connection_details['server'],
                UID=connection_details['user'],
                PWD=connection_details['password'],
                DATABASE=connection_details['database']
            )
        elif destination_database_system == 'mysql':
            return turbodbc.connect(
                host=connection_details['server'],
                user=connection_details['user'],
                password=connection_details['password'],
                database=connection_details['database']
            )
        elif destination_database_system == 'postgres':
            return turbodbc.connect(
                host=connection_details['server'],
                user=connection_details['user'],
                password=connection_details['password'],
                dbname=connection_details['database']
            )
        elif destination_database_system == 'duckdb':
            return duckdb.connect(connection_details['database'])
        else:
            raise ValueError(f"Unsupported destination database system: {destination_database_system}")
    except Exception as e:
        logging.error(f"Failed to connect to {destination_database_system}: {e}")
        raise

def execute_non_transactional_query(query, connection_string):
    try:
        with pyodbc.connect(connection_string) as conn:
            cursor = conn.cursor()
            cursor.execute(query)
            print(f"QUERY: {query}")
    except Exception as e:
        error_messages.append(f"Error executing query '{query}': {e}")

def ensure_table_exists(df_arrow, table_name, connection_string, destination_database_system):
    column_definitions = ", ".join([f"[{col.name}] NVARCHAR(MAX)" for col in df_arrow.schema])
    check_table_query = (
        f"IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = '{table_name}') "
        f"CREATE TABLE {table_name} ({column_definitions})"
    )
    execute_non_transactional_query(check_table_query, connection_string)
    logging.info(f"TABLE: '{table_name}' checked/created successfully in {destination_database_system}.")

def convert_chunked_arrays_to_numpy(df_arrow):
    """Convert Arrow Table columns to Numpy arrays."""
    return [column.to_numpy() for column in df_arrow.columns]

def process_chunk(chunk, table_name, connection_string, destination_database_system, use_convert_to_text):
    num_columns = len(chunk.schema)
    placeholders = ", ".join(["?"] * num_columns)
    insert_query = f"INSERT INTO {table_name} VALUES ({placeholders})"

    with turbodbc.connect(connection_string=connection_string) as conn:
        with conn.cursor() as cursor:
            if not use_convert_to_text:
                cursor.executemanycolumns(insert_query, chunk)
            else:
                numpy_columns = convert_chunked_arrays_to_numpy(chunk)
                cursor.executemanycolumns(insert_query, numpy_columns)
            conn.commit()
            logging.info(f"INSERT: Chunk of data written to table '{table_name}' in {destination_database_system}.")

def process_table(table, duckdb_connection, connection_string, chunk_size, destination_database_system):
    con_duckdb = duckdb.connect(duckdb_connection)
    try:
        df_arrow = con_duckdb.execute(f'SELECT * FROM {table}').fetch_arrow_table()

        if df_arrow.num_rows == 0:
            logging.info(f"Skipping empty table {table}.")
            return

        ensure_table_exists(df_arrow, table, connection_string, destination_database_system)

        num_rows = df_arrow.num_rows
        if num_rows > chunk_size:
            num_chunks = (num_rows // chunk_size) + (1 if num_rows % chunk_size > 0 else 0)
            logging.info(f"Processing large table '{table}' in {num_chunks} chunks.")
            
            for i in range(num_chunks):
                chunk_start = i * chunk_size
                chunk_end = min(chunk_start + chunk_size, num_rows)
                chunk = df_arrow.slice(chunk_start, chunk_end - chunk_start)

                start_time = time.time()
                process_chunk(chunk, table, connection_string, destination_database_system, use_convert_to_text=False)
                end_time = time.time()
                process_time = end_time - start_time
                logging.info(f"INSERT {process_time:.2f}s: '{chunk.num_rows} rows' successfully written to table '{table}' in {destination_database_system}.")

        else:
            # For smaller tables, use chunked arrays for faster bulk insert
            columns = ", ".join([f"[{col.name}]" for col in df_arrow.schema])
            placeholders = ", ".join(["?"] * len(df_arrow.schema))
            insert_query = f"INSERT INTO {table} ({columns}) VALUES ({placeholders})"

            with turbodbc.connect(connection_string=connection_string) as conn:
                cursor = conn.cursor()
                start_time = time.time()
                cursor.executemanycolumns(insert_query, df_arrow)
                conn.commit()
                end_time = time.time()
                process_time = end_time - start_time
                logging.info(f"INSERT {process_time:.2f}s: '{df_arrow.num_rows} rows' successfully written to table '{table}' in {destination_database_system}.")

    except Exception as e:
        error_messages.append(f"Error while processing table '{table}': {e}")
    finally:
        con_duckdb.close()

def write_to_local_database(
    source_database_connection: str,
    destination_connection_string: Union[str, dict],
    source_database_system: str = 'duckdb',
    destination_database_system: str = 'sqlserver',
    destination_database_name: str = None,
    mode: str = 'overwrite',
    convert_to_text: bool = False,
    chunk_size: int = 100000  # Default chunk size for large tables
):
    logging.basicConfig(level=logging.INFO, format='%(message)s')

    supported_destination_systems = ['sqlserver', 'mysql', 'postgres', 'interbase', 'duckdb']

    if source_database_system not in ['duckdb']:
        logging.error(f"Unsupported source database system: {source_database_system}")
        raise ValueError("Supported source database systems: 'duckdb'")
    if destination_database_system not in supported_destination_systems:
        logging.error(f"Unsupported destination database system: {destination_database_system}")
        raise ValueError(f"Supported destination database systems: {supported_destination_systems}")

    if isinstance(destination_connection_string, dict):
        required_keys = ['server', 'user', 'password', 'database']
        if not all(key in destination_connection_string for key in required_keys):
            logging.error("Missing required connection parameters.")
            raise ValueError("Ensure server, user, password, and database are set in the connection string.")
        conn_string = (
            f"DRIVER={destination_connection_string['driver']};"
            f"SERVER={destination_connection_string['server']};"
            f"UID={destination_connection_string['user']};"
            f"PWD={destination_connection_string['password']};"
            f"DATABASE={destination_connection_string['database']}"
        )
    elif isinstance(destination_connection_string, str):
        conn_string = destination_connection_string

    def check_database_exists():
        if destination_database_system == 'sqlserver':
            database_name = destination_database_name or destination_connection_string['database']

            check_db_query = f"SELECT name FROM sys.databases WHERE name = '{database_name}'"
            try:
                with get_database_connection(destination_database_system, destination_connection_string) as conn:
                    cursor = conn.cursor()
                    cursor.execute(check_db_query)
                    db_exists = cursor.fetchone()
                    if not db_exists:
                        logging.error(f"DATABASE: '{database_name}' does not exist in SQL Server. Create it to proceed.")
                        return False
                    logging.info(f"DATABASE: '{database_name}' exists.")
                    return True
            except Exception as e:
                logging.error(f"Error checking if database exists: {e}")
                return False
        else:
            logging.info(f"Skipping database existence check for {destination_database_system}")
            return True

    if not check_database_exists():
        raise SystemExit(f"Cannot proceed: Database '{destination_database_name or destination_connection_string['database']}' does not exist in {destination_database_system}.")

    con_duckdb = duckdb.connect(source_database_connection)
    try:
        tables = con_duckdb.execute("SHOW TABLES").fetchdf()
        table_list = tables['name'].tolist()
    finally:
        con_duckdb.close()

    with ThreadPoolExecutor(max_workers=8) as executor:
        futures = [
            executor.submit(
                process_table,
                table,
                source_database_connection,
                conn_string,
                chunk_size,
                destination_database_system
            )
            for table in table_list
        ]
        
        for future in as_completed(futures):
            try:
                future.result()
            except Exception as e:
                error_messages.append(f"Error occurred during parallel execution: {e}")

    logging.info("All operations completed successfully.")

    if error_messages:
        print("\nErrors encountered during execution:")
        for message in error_messages:
            print(message)

# Example usage of the write_to_local_database function
source_database_connection = '../Tables/LH_Gold.db'

# DSN Connection example
dsn_connection = 'sqlserver-odbc'  # DSN Connection
destination_database_name = 'LH_GOLD'  # Must specify the database name when using DSN

# Direct Connection String example
destination_connection_string = {
    "driver": "{ODBC Driver 17 for SQL Server}",
    "server": os.getenv("server"),
    "user": os.getenv("user"),
    "password": os.getenv("password"),
    "database": "LH_GOLD"
}

# # Use DSN Connection
# write_to_local_database(
#     source_database_system='duckdb',
#     source_database_connection=source_database_connection,
#     destination_database_system='sqlserver',
#     destination_connection_string=dsn_connection,
#     destination_database_name=destination_database_name,  # Required when using DSN
#     mode='append',         # (Optional): Control whether to overwrite or append data in database. Options: 'overwrite' or 'append' (default is overwrite).
#     convert_to_text=False  # (Optional): Control whether to convert all columns to text. Options: True or False (default is True). If False, retry with True if an error occurs.
# )

# Use Direct Connection Parameters
write_to_local_database(
    source_database_system='duckdb',
    source_database_connection=source_database_connection,
    destination_database_system='sqlserver',
    destination_connection_string=destination_connection_string,  # Direct connection parameters
    mode='append',         # (Optional): Control whether to overwrite or append data in database. Options: 'overwrite' or 'append' (default is overwrite).
    convert_to_text=False  # (Optional): Control whether to convert all columns to text. Options: True or False (default is True). If False, retry with True if an error occurs.
)


DATABASE: 'LH_GOLD' exists.
TABLE: 'dim_budget_2024' checked/created successfully in sqlserver.
TABLE: 'dim_productcategory_gold' checked/created successfully in sqlserver.
TABLE: 'dim_plant_gold' checked/created successfully in sqlserver.
INSERT 0.00s: '6 rows' successfully written to table 'dim_productcategory_gold' in sqlserver.
TABLE: 'dim_coa_gold' checked/created successfully in sqlserver.
TABLE: 'dim_salesperson_gold' checked/created successfully in sqlserver.
TABLE: 'dim_date_gold' checked/created successfully in sqlserver.


Query executed successfully: IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'dim_budget_2024') CREATE TABLE dim_budget_2024 ([GL_ACCOUNT] NVARCHAR(MAX), [DESCRIPTION] NVARCHAR(MAX), [AMOUNT] NVARCHAR(MAX), [YEAR] NVARCHAR(MAX), [MONTH] NVARCHAR(MAX), [SEGMENT_1] NVARCHAR(MAX), [SEGMENT_2] NVARCHAR(MAX))
Query executed successfully: IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'dim_productcategory_gold') CREATE TABLE dim_productcategory_gold ([PRODUCT_CATEGORY] NVARCHAR(MAX), [INDEX] NVARCHAR(MAX))
Query executed successfully: IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'dim_plant_gold') CREATE TABLE dim_plant_gold ([PLANT_NO] NVARCHAR(MAX), [PLANT_NAME] NVARCHAR(MAX), [DESCRIPTION] NVARCHAR(MAX), [ADDRESS_1] NVARCHAR(MAX), [ADDRESS_2] NVARCHAR(MAX), [PHONE_NO] NVARCHAR(MAX), [PLANT_SCREEN_ID] NVARCHAR(MAX), [LAST_CHANGE_DATETIME] NVARCHAR(MAX), [LAST_CHANGE_USER] NVARCHAR(MAX), [ACTIVE_FLAG] NVARCHAR(MAX))
Query executed successfully: IF NOT EXISTS (SELECT * FROM

INSERT 0.14s: '4735 rows' successfully written to table 'dim_budget_2024' in sqlserver.
TABLE: 'dim_customers_gold' checked/created successfully in sqlserver.
INSERT 0.01s: '13 rows' successfully written to table 'dim_plant_gold' in sqlserver.


Query executed successfully: IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'dim_unitofmeasure_gold') CREATE TABLE dim_unitofmeasure_gold ([INDEX] NVARCHAR(MAX), [UNIT_OF_MEASURE] NVARCHAR(MAX), [UNIT_NAME] NVARCHAR(MAX), [DESCRIPTION] NVARCHAR(MAX), [UMS_RATIO] NVARCHAR(MAX), [PRICE_ROUND_FACTOR] NVARCHAR(MAX), [LAST_CHANGE_DATETIME] NVARCHAR(MAX), [LAST_CHANGE_USER] NVARCHAR(MAX), [ACTIVE_FLAG] NVARCHAR(MAX))


TABLE: 'dim_products_gold' checked/created successfully in sqlserver.
INSERT 0.06s: '1509 rows' successfully written to table 'dim_coa_gold' in sqlserver.
INSERT 0.00s: '12 rows' successfully written to table 'dim_salesperson_gold' in sqlserver.
TABLE: 'dim_unitofmeasure_gold' checked/created successfully in sqlserver.
INSERT 0.30s: '3007 rows' successfully written to table 'dim_customers_gold' in sqlserver.
INSERT 1.35s: '5844 rows' successfully written to table 'dim_date_gold' in sqlserver.
INSERT 0.01s: '33 rows' successfully written to table 'dim_unitofmeasure_gold' in sqlserver.
INSERT 1.07s: '28750 rows' successfully written to table 'dim_products_gold' in sqlserver.
TABLE: 'fact_sales_gold' checked/created successfully in sqlserver.
Processing large table 'fact_sales_gold' in 14 chunks.


Query executed successfully: IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'fact_sales_gold') CREATE TABLE fact_sales_gold ([SESSION_NO] NVARCHAR(MAX), [CUST_NO] NVARCHAR(MAX), [INVOICE_NO] NVARCHAR(MAX), [INVOICE_DATE] NVARCHAR(MAX), [TICKET_NO] NVARCHAR(MAX), [TICKET_DATE] NVARCHAR(MAX), [SALESPERSON] NVARCHAR(MAX), [VOID_FLAG] NVARCHAR(MAX), [PRODUCT_CODE] NVARCHAR(MAX), [UNIT_OF_MEASURE] NVARCHAR(MAX), [PLANT_NO] NVARCHAR(MAX), [UNIT_PRICE] NVARCHAR(MAX), [LIST_PRICE] NVARCHAR(MAX), [QUOTE_PRICE] NVARCHAR(MAX), [UNIT_DISC] NVARCHAR(MAX), [QUOTE_DISC] NVARCHAR(MAX), [QTY_SOLD] NVARCHAR(MAX), [SALES_GL] NVARCHAR(MAX), [TOT_REVENUE] NVARCHAR(MAX), [TOT_COST_OF_SALES] NVARCHAR(MAX), [TOT_DISCOUNT] NVARCHAR(MAX), [TOT_GROSS_PROFIT] NVARCHAR(MAX))


FloatProgress(value=0.0, layout=Layout(width='auto'), style=ProgressStyle(bar_color='black'))

INSERT: Chunk of data written to table 'fact_sales_gold' in sqlserver.
INSERT 3.75s: '100000 rows' successfully written to table 'fact_sales_gold' in sqlserver.
INSERT: Chunk of data written to table 'fact_sales_gold' in sqlserver.
INSERT 3.46s: '100000 rows' successfully written to table 'fact_sales_gold' in sqlserver.
TABLE: 'fact_gl_gold' checked/created successfully in sqlserver.
Processing large table 'fact_gl_gold' in 34 chunks.


Query executed successfully: IF NOT EXISTS (SELECT * FROM sys.tables WHERE name = 'fact_gl_gold') CREATE TABLE fact_gl_gold ([REFRESHED_AT] NVARCHAR(MAX), [SESSION_NO] NVARCHAR(MAX), [TRANS_NO] NVARCHAR(MAX), [TRX_DATE] NVARCHAR(MAX), [SOURCE_CODE] NVARCHAR(MAX), [GL_ACCOUNT] NVARCHAR(MAX), [PLANT_NO] NVARCHAR(MAX), [GL_DESCRIPTION] NVARCHAR(MAX), [DEBIT_AMOUNT] NVARCHAR(MAX), [CREDIT_AMOUNT] NVARCHAR(MAX), [TRX_AMOUNT] NVARCHAR(MAX), [TRX_AMOUNT_INDEX] NVARCHAR(MAX), [TRX_AMOUNT_DR_CR] NVARCHAR(MAX), [DESCRIPTION_1] NVARCHAR(MAX), [DESCRIPTION_2] NVARCHAR(MAX), [YEAR] NVARCHAR(MAX), [MONTH] NVARCHAR(MAX), [LAST_CHANGE_DATETIME] NVARCHAR(MAX), [LAST_CHANGE_USER] NVARCHAR(MAX))


INSERT: Chunk of data written to table 'fact_sales_gold' in sqlserver.
INSERT 6.67s: '100000 rows' successfully written to table 'fact_sales_gold' in sqlserver.
INSERT: Chunk of data written to table 'fact_gl_gold' in sqlserver.
INSERT 6.57s: '100000 rows' successfully written to table 'fact_gl_gold' in sqlserver.
INSERT: Chunk of data written to table 'fact_sales_gold' in sqlserver.
INSERT 6.24s: '100000 rows' successfully written to table 'fact_sales_gold' in sqlserver.
INSERT: Chunk of data written to table 'fact_gl_gold' in sqlserver.
INSERT 6.12s: '100000 rows' successfully written to table 'fact_gl_gold' in sqlserver.
INSERT: Chunk of data written to table 'fact_sales_gold' in sqlserver.
INSERT 6.12s: '100000 rows' successfully written to table 'fact_sales_gold' in sqlserver.
INSERT: Chunk of data written to table 'fact_gl_gold' in sqlserver.
INSERT 6.28s: '100000 rows' successfully written to table 'fact_gl_gold' in sqlserver.
INSERT: Chunk of data written to table 'fact_sales_g

#### **<u>MySQL</u>**