### Modules importeren

In [1]:
import pandas as pd
import pyodbc
import sqlite3
import numpy as np

import os
from loguru import logger # pip install loguru
import warnings
warnings.filterwarnings("ignore", category=UserWarning)

servername = 'DESKTOP-8LNATDJ' # PAS DIT AAN NAAR JE EIGEN SERVER!!!

# Bron-databases inlezen

### Connecties maken

In [2]:
# Functie om een connectie te maken met een SQL Server database
def establish_connection(servername, database):
    logger.info(f'Establishing connection with {database} on {servername}...')
    try:
        conn = pyodbc.connect('DRIVER={SQL Server};SERVER=' + servername + 
                        ';DATABASE=' + database + ';Trusted_Connection=yes')
        logger.success(f'Connection established with {database} on {servername}.')
        return conn
    except Exception as e:
        logger.error(f'Failed to establish connection with {database} on {servername}. Error: {e}')
        return None

# Functie om een connectie te maken met een Microsoft Access database
def establish_access_connection(database):
    db_name = os.path.basename(database)
    logger.info(f'Establishing connection with {db_name}...')
    try:
        conn = pyodbc.connect('DRIVER={Microsoft Access Driver (*.mdb, *.accdb)};DBQ=' + database)
        logger.success(f'Connection established with {db_name}.')
        return conn
    except Exception as e:
        logger.error(f'Failed to establish connection with {db_name}. Error: {e}')
        return None

# Connecties maken met de bronnen
northwind_ssms_conn = establish_connection(servername, 'Northwind')
northwind_ssms_cursor = northwind_ssms_conn.cursor()

adventureworks_ssms_conn = establish_connection(servername, 'AdventureWorks2019')
adventureworks_ssms_cursor = adventureworks_ssms_conn.cursor()

aenc_access_conn = establish_access_connection('../data/raw/aenc.accdb')
aenc_access_cursor = aenc_access_conn.cursor()

# SQLite, maak nieuwe databases aan als deze nog niet bestaat
try:
    logger.info('Establishing connection with SQLite databases...')
    northwind_sqlite_conn = sqlite3.connect('../data/processed/northwind.sqlite')
    adventureworks_sqlite_conn = sqlite3.connect('../data/processed/adventureworks.sqlite')
    aenc_sqlite_conn = sqlite3.connect('../data/processed/aenc.sqlite')
    logger.success('Connection established with SQLite databases.')
except Exception as e:
    logger.error(f'Failed to establish connection with SQLite databases. Error: {e}')

[32m2024-05-21 18:03:12.506[0m | [1mINFO    [0m | [36m__main__[0m:[36mestablish_connection[0m:[36m3[0m - [1mEstablishing connection with Northwind on DESKTOP-8LNATDJ...[0m
[32m2024-05-21 18:03:20.681[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mestablish_connection[0m:[36m7[0m - [32m[1mConnection established with Northwind on DESKTOP-8LNATDJ.[0m
[32m2024-05-21 18:03:20.682[0m | [1mINFO    [0m | [36m__main__[0m:[36mestablish_connection[0m:[36m3[0m - [1mEstablishing connection with AdventureWorks2019 on DESKTOP-8LNATDJ...[0m
[32m2024-05-21 18:03:28.802[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mestablish_connection[0m:[36m7[0m - [32m[1mConnection established with AdventureWorks2019 on DESKTOP-8LNATDJ.[0m
[32m2024-05-21 18:03:28.802[0m | [1mINFO    [0m | [36m__main__[0m:[36mestablish_access_connection[0m:[36m16[0m - [1mEstablishing connection with aenc.accdb...[0m
[32m2024-05-21 18:03:29.409[0m | [32m[1mSUCCESS [0m |

### Bron-databases overzetten naar SQLite bestanden

In [34]:
# Functie om tabellen van een bron-database over te zetten naar een SQLite database
def transfer_tables_to_sqlite(tables, conn, sqlite_conn):
    db_name = sqlite_conn.execute("PRAGMA database_list;").fetchall()[0][2] # Padlocatie van de database
    db_name = os.path.basename(db_name) # Alleen de naam van de database
    logger.info(f'Transferring tables to SQLite database {db_name}...')

    failed_tables = []

    for table in tables:
        try:
            sql_query = f'SELECT * FROM {table};'
            data = pd.read_sql(sql_query, conn)
            sqlite_table_name = table.replace('[', '').replace(']', '')  # Verwijder vierkante haken voor tabellen met een spatie
            data.to_sql(sqlite_table_name, sqlite_conn, index=False, if_exists='replace')
        except Exception as e:
            logger.error(f'Error transferring table {table} to SQLite database {db_name}. Error: {e}')
            failed_tables.append(table)
            break
    
    if len(failed_tables) > 0:
        logger.warning(f'Tables that failed to transfer: {failed_tables}. Rest of the tables were transferred successfully to SQLite database {db_name}.')
    else:
        logger.success(f'Transferred tables to SQLite database {db_name}!')


# Tabellen die worden overgezet naar SQLite
northwind_tables = [
    'Categories', 
    'CustomerCustomerDemo', 
    'CustomerDemographics', 
    'Customers', 
    'Employees', 
    'EmployeeTerritories', 
    '[Order Details]', # Vanwege de spatie in de tabelnaam moet deze tussen vierkante haken (I <3 PYTHON!!!)
    'Orders', 
    'Products', 
    'Region', 
    'Shippers', 
    'Suppliers', 
    'Territories'
    ]

adventureworks_tables = [
    'dbo.AWBuildVersion',
    'dbo.DatabaseLog',
    'dbo.ErrorLog',
    'HumanResources.Department',
    'HumanResources.Employee',
    'HumanResources.EmployeeDepartmentHistory',
    'HumanResources.EmployeePayHistory',
    'HumanResources.JobCandidate',
    'HumanResources.Shift',
    'Person.Address',
    'Person.AddressType',
    'Person.BusinessEntity',
    'Person.BusinessEntityAddress',
    'Person.BusinessEntityContact',
    'Person.ContactType',
    'Person.CountryRegion',
    'Person.EmailAddress',
    'Person.Password',
    'Person.Person',
    'Person.PersonPhone',
    'Person.PhoneNumberType',
    'Person.StateProvince',
    'Production.BillOfMaterials',
    'Production.Culture',
    'Production.Document',
    'Production.Illustration',
    'Production.Location',
    'Production.Product',
    'Production.ProductCategory',
    'Production.ProductCostHistory',
    'Production.ProductDescription',
    'Production.ProductDocument',
    'Production.ProductInventory',
    'Production.ProductListPriceHistory',
    'Production.ProductModel',
    'Production.ProductModelIllustration',
    'Production.ProductModelProductDescriptionCulture',
    'Production.ProductPhoto',
    'Production.ProductProductPhoto',
    'Production.ProductReview',
    'Production.ProductSubcategory',
    'Production.ScrapReason',
    'Production.TransactionHistory',
    'Production.TransactionHistoryArchive',
    'Production.UnitMeasure',
    'Production.WorkOrder',
    'Production.WorkOrderRouting',
    'Purchasing.ProductVendor',
    'Purchasing.PurchaseOrderDetail',
    'Purchasing.PurchaseOrderHeader',
    'Purchasing.ShipMethod',
    'Purchasing.Vendor',
    'Sales.CountryRegionCurrency',
    'Sales.CreditCard',
    'Sales.Currency',
    'Sales.CurrencyRate',
    'Sales.Customer',
    'Sales.PersonCreditCard',
    'Sales.SalesOrderDetail',
    'Sales.SalesOrderHeader',
    'Sales.SalesOrderHeaderSalesReason',
    'Sales.SalesPerson',
    'Sales.SalesPersonQuotaHistory',
    'Sales.SalesReason',
    'Sales.SalesTaxRate',
    'Sales.SalesTerritory',
    'Sales.SalesTerritoryHistory',
    'Sales.ShoppingCartItem',
    'Sales.SpecialOffer',
    'Sales.SpecialOfferProduct',
    'Sales.Store'
]

aenc_tables = [
    'bonus',
    'customer',
    'department',
    'employee',
    'product',
    'region',
    'sales_order',
    'sales_order_item',
    'state'
]

transfer_tables_to_sqlite(northwind_tables, northwind_ssms_conn, northwind_sqlite_conn)
transfer_tables_to_sqlite(adventureworks_tables, adventureworks_ssms_conn, adventureworks_sqlite_conn)
transfer_tables_to_sqlite(aenc_tables, aenc_access_conn, aenc_sqlite_conn)

[32m2024-05-20 20:25:17.176[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_tables_to_sqlite[0m:[36m5[0m - [1mTransferring tables to SQLite database northwind.sqlite...[0m
[32m2024-05-20 20:25:21.233[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mtransfer_tables_to_sqlite[0m:[36m23[0m - [32m[1mTransferred tables to SQLite database northwind.sqlite![0m
[32m2024-05-20 20:25:21.234[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_tables_to_sqlite[0m:[36m5[0m - [1mTransferring tables to SQLite database adventureworks.sqlite...[0m
[32m2024-05-20 20:26:08.719[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mtransfer_tables_to_sqlite[0m:[36m23[0m - [32m[1mTransferred tables to SQLite database adventureworks.sqlite![0m
[32m2024-05-20 20:26:08.720[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_tables_to_sqlite[0m:[36m5[0m - [1mTransferring tables to SQLite database aenc.sqlite...[0m
[32m2024-05-20 20:26:11.729[0m | [32m[1m

# Data overzetten naar de Data-Warehouse

### Order_Details

In [35]:
def transfer_data_to_orderdetails(conn):

    try:
        logger.info('Wiping old data in Order_Details...')
        conn.execute('DELETE FROM Order_Details;')
        conn.commit()
        logger.success('Wiped old data in Order_Details.')
    except Exception as e:
        logger.error(f'Error wiping old data in Order_Details. Error: {e}')
        return
    
    try:
        logger.info('Connecting to the order data sources...')
        aw_sales_order_header = pd.read_sql('SELECT * FROM "Sales.SalesOrderHeader"', adventureworks_sqlite_conn)
        aenc_sales_order = pd.read_sql('SELECT * FROM sales_order', aenc_sqlite_conn)
        northwind_sales_order = pd.read_sql('SELECT * FROM Orders', northwind_sqlite_conn)
        logger.success('Connected to the order data sources.')
    except Exception as e:
        logger.error(f'Error connecting to the order data sources. Error: {e}')
        return

    surrogate_key = 1

    try:

        logger.info('Transferring the order data from Northwind to Order_Details...')

        for index, row in northwind_sales_order.iterrows():
            
            order_details_ids = pd.read_sql(f'SELECT * FROM "Order Details" WHERE OrderID = {row["OrderID"]}', northwind_sqlite_conn)

            customer_id = row['CustomerID']
            employee_id = row['EmployeeID']
            order_date = row['OrderDate']
            required_date = row['RequiredDate']
            shipped_date = row['ShippedDate']
            ship_via = row['ShipVia']
            freight = row['Freight']
            ship_name = row['ShipName']
            ship_address = row['ShipAddress']
            ship_city = row['ShipCity']
            ship_region = row['ShipRegion']
            ship_postal_code = row['ShipPostalCode']
            ship_country = row['ShipCountry']

            for index, row in order_details_ids.iterrows():

                query = """
                    INSERT INTO Order_Details (
                        ID,
                        CustomerID,
                        EmployeeID,
                        OrderDate,
                        RequiredDate,
                        ShippedDate,
                        ShipVia,
                        Freight,
                        ShipName,
                        ShipAddress,
                        ShipCity,
                        ShipRegion,
                        ShipPostalCode,
                        ShipCounty,
                        ProductID,
                        UnitPrice,
                        Quantity,
                        Discount,
                        SK
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
                """

                params = (
                    row['OrderID'],
                    customer_id,
                    employee_id,
                    order_date,
                    required_date,
                    shipped_date,
                    ship_via,
                    freight,
                    ship_name,
                    ship_address,
                    ship_city,
                    ship_region,
                    ship_postal_code,
                    ship_country,
                    row['ProductID'],
                    row['UnitPrice'],
                    row['Quantity'],
                    row['Discount'],
                    surrogate_key
                )

                conn.execute(query, params)
                conn.commit()

                surrogate_key += 1
        
        logger.success('Transferred the order data from Northwind to Order_Details.')
    except Exception as e:
        logger.error(f'Error transferring the order data from Northwind to Order_Details. Error: {e}')
        return


    try:
        logger.info('Transferring the order data from AENC to Order_Details...')

        for index, row in aenc_sales_order.iterrows():

            sales_order_item_ids = pd.read_sql(f'SELECT * FROM sales_order_item WHERE id = {row["id"]}', aenc_sqlite_conn)

            customer_id = row['cust_id']
            order_date = row['order_date']
            region = row['region']
            sales_person_id = row['sales_rep']

            for index, row in sales_order_item_ids.iterrows():
                query = """
                    INSERT INTO Order_Details (
                        ID,
                        CustomerID,
                        OrderDate,
                        region,
                        SalesPersonID,
                        line_id,
                        ProductID,
                        Quantity,
                        ShipDate,
                        SK
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
                """

                params = (
                    row['id'],
                    customer_id,
                    order_date,
                    region,
                    sales_person_id,
                    row['line_id'],
                    row['prod_id'],
                    row['quantity'],
                    row['ship_date'],
                    surrogate_key
                )
        
                conn.execute(query, params)
                conn.commit()

                surrogate_key += 1

        logger.success('Transferred the order data from AENC to Order_Details.')
    except Exception as e:
        logger.error(f'Error transferring the order data from AENC to Order_Details. Error: {e}')
        return

    try:
        logger.info('Transferring the order data from AdventureWorks to Order_Details...')

        for index, row in aw_sales_order_header.iterrows():

            sales_reason_id = pd.read_sql(f'SELECT * FROM "Sales.SalesOrderHeaderSalesReason" WHERE SalesOrderID = {row["SalesOrderID"]} LIMIT 1', adventureworks_sqlite_conn)
            sales_order_detail_ids = pd.read_sql(f'SELECT * FROM "Sales.SalesOrderDetail" WHERE SalesOrderID = {row["SalesOrderID"]}', adventureworks_sqlite_conn)
            freight = row['Freight']
            ship_method_id = row['ShipMethodID']
            ship_method_name = pd.read_sql(f'SELECT Name from "Purchasing.ShipMethod" WHERE ShipMethodID = {ship_method_id}', adventureworks_sqlite_conn)['Name'].values[0]
            ship_method_base = pd.read_sql(f'SELECT ShipBase from "Purchasing.ShipMethod" WHERE ShipMethodID = {ship_method_id}', adventureworks_sqlite_conn)['ShipBase'].values[0]
            ship_method_rate = pd.read_sql(f'SELECT ShipRate from "Purchasing.ShipMethod" WHERE ShipMethodID = {ship_method_id}', adventureworks_sqlite_conn)['ShipRate'].values[0]
            due_date = row['DueDate']
            online_order_flag = row['OnlineOrderFlag']
            order_date = row['OrderDate']
            ship_date = row['ShipDate']
            status = row['Status']
            sub_total = row['SubTotal']
            tax_amt = row['TaxAmt']
            total_due = row['TotalDue']
            customer_id = row['CustomerID']
            bill_to_address_id = row['BillToAddressID']
            credit_card_id = row['CreditCardID']
            currency_rate_id = row['CurrencyRateID']
            sales_person_id = row['SalesPersonID']
            ship_to_address_id = row['ShipToAddressID']
            territory_id = row['TerritoryID']

            for index, row in sales_order_detail_ids.iterrows():
                query = """
                    INSERT INTO Order_Details (
                        ID, 
                        SalesReasonID, 
                        SalesOrderDetailID,
                        UnitPrice,
                        Quantity,
                        Discount,
                        Freight,
                        ShipName,
                        ShipBase,
                        ShipRate,
                        DueDate,
                        OnlineOrderFlag,
                        OrderDate,
                        ShipDate,
                        Status,
                        SubTotal,
                        TaxAmt,
                        TotalDue,
                        CustomerID,
                        ProductID,
                        BillToAdressID,
                        CreditCardID,
                        CurrencyRateID,
                        SalesPersonID,
                        ShipMethodID,
                        ShipToAdressID,
                        TerritoryID,
                        SpecialOfferID,
                        SK
                    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
                """

                params = (
                    row['SalesOrderID'],
                    int(sales_reason_id['SalesReasonID'].values[0]) if not sales_reason_id.empty else np.nan,
                    row['SalesOrderDetailID'],
                    row['UnitPrice'],
                    row['OrderQty'],
                    row['UnitPriceDiscount'],
                    freight,
                    ship_method_name,
                    ship_method_base,
                    ship_method_rate,
                    due_date,
                    online_order_flag,
                    order_date,
                    ship_date,
                    status,
                    sub_total,
                    tax_amt,
                    total_due,
                    customer_id,
                    row['ProductID'],
                    bill_to_address_id,
                    credit_card_id,
                    currency_rate_id,
                    sales_person_id,
                    ship_method_id,
                    ship_to_address_id,
                    territory_id,
                    row['SpecialOfferID'],
                    surrogate_key
                )

                conn.execute(query, params)
                conn.commit()

                surrogate_key += 1
    except Exception as e:
        logger.error(f'Error transferring the order data from AdventureWorks to Order_Details. Error: {e}')
        return
    
def create_empty_order_details_dataframe():
    dtypes = {
        'ID': np.int32,
        'SalesReasonID': np.int32,
        'SalesOrderDetailID': np.int32,
        'UnitPrice': np.float64,
        'Quantity': np.int32,
        'Discount': np.float64,
        'ShipVia': np.int32,
        'Freight': np.float64,
        'ShipName': 'object',
        'ShipBase': np.float64,
        'ShipRate': np.float64,
        'ShipAddress': 'object',
        'ShipCity': 'object',
        'ShipRegion': 'object',
        'ShipPostalCode': 'object',
        'ShipCounty': 'object',
        'DueDate': 'datetime64[ns]',
        'ModifiedDate': 'datetime64[ns]',
        'OnlineOrderFlag': np.bool_,
        'OrderDate': 'datetime64[ns]',
        'ShipDate': 'datetime64[ns]',
        'Status': 'object',
        'SubTotal': np.float64,
        'TaxAmt': np.float64,
        'TotalDue': np.float64,
        'RequiredDate': 'datetime64[ns]',
        'ShippedDate': 'datetime64[ns]',
        'region': 'object',
        'line_id': np.int32,
        'CustomerID': np.int32,
        'EmployeeID': np.int32,
        'ProductID': np.int32,
        'BillToAdressID': np.int32,
        'CreditCardID': np.int32,
        'CurrencyRateID': np.int32,
        'SalesPersonID': np.int32,
        'ShipMethodID': np.int32,
        'ShipToAdressID': np.int32,
        'TerritoryID': np.int32,
        'SpecialOfferID': np.int32,
        'SK': np.int32
    }

    df = pd.DataFrame(columns=dtypes.keys()).astype(dtypes)
    return df

test_database = sqlite3.connect('../data/processed/test.sqlite')
create_empty_order_details_dataframe().to_sql('Order_Details', test_database, index=False, if_exists='replace')

transfer_data_to_orderdetails(test_database)

[32m2024-05-20 20:32:58.024[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_data_to_orderdetails[0m:[36m4[0m - [1mWiping old data in Order_Details...[0m
[32m2024-05-20 20:32:58.131[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mtransfer_data_to_orderdetails[0m:[36m7[0m - [32m[1mWiped old data in Order_Details.[0m
[32m2024-05-20 20:32:58.132[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_data_to_orderdetails[0m:[36m13[0m - [1mConnecting to the order data sources...[0m
[32m2024-05-20 20:32:58.485[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mtransfer_data_to_orderdetails[0m:[36m17[0m - [32m[1mConnected to the order data sources.[0m
[32m2024-05-20 20:32:58.485[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_data_to_orderdetails[0m:[36m26[0m - [1mTransferring the order data from Northwind to Order_Details...[0m
[32m2024-05-20 20:37:14.896[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mtransfer_data_to_orderdeta

### SalesPersonQuotaHistory

In [4]:
def transfer_data_to_salespersonquotahistory(conn):

    try:
        logger.info('Wiping old data in SalesPersonQuotaHistory...')
        conn.execute('DELETE FROM SalesPersonQuotaHistory;')
        conn.commit()
        logger.success('Wiped old data in SalesPersonQuotaHistory.')
    except Exception as e:
        logger.error(f'Error wiping old data in SalesPersonQuotaHistory. Error: {e}')
        return
    
    try:
        logger.info('Connecting to the SalesPersonQuotaHistory data sources...')
        aw_sales_person_quota_history = pd.read_sql('SELECT * FROM "Sales.SalesPersonQuotaHistory"', adventureworks_sqlite_conn)
        logger.success('Connected to the SalesPersonQuotaHistory data sources.')
    except Exception as e:
        logger.error(f'Error connecting to the SalesPersonQuotaHistory data sources. Error: {e}')
        return

    surrogate_key = 1

    try:
        logger.info('Transferring the SalesPersonQuotaHistory data from AdventureWorks to SalesPersonQuotaHistory...')

        for index, row in aw_sales_person_quota_history.iterrows():
            
            query = """
                INSERT INTO SalesPersonQuotaHistory (
                    BusinessEntityID,
                    QuotaDate,
                    SalesQuota,
                    ModifiedDate,
                    SK
                ) VALUES (?, ?, ?, ?, ?);
            """

            params = (
                row['BusinessEntityID'],
                row['QuotaDate'],
                row['SalesQuota'],
                row['ModifiedDate'],
                surrogate_key
            )

            conn.execute(query, params)
            conn.commit()

            surrogate_key += 1

        logger.success('Transferred the SalesPersonQuotaHistory data from AdventureWorks to SalesPersonQuotaHistory.')

    except Exception as e:
        logger.error(f'Error transferring the SalesPersonQuotaHistory data from AdventureWorks to SalesPersonQuotaHistory. Error: {e}')
        return

def create_empty_salespersonquotahistory_dataframe():
    dtypes = {
        'BusinessEntityID': np.int32,
        'QuotaDate': 'datetime64[ns]',
        'SalesQuota': np.int32,
        'ModifiedDate': 'datetime64[ns]',
        'SK': np.int32
    }

    df = pd.DataFrame(columns=dtypes.keys()).astype(dtypes)
    return df

test_database = sqlite3.connect('../data/processed/test.sqlite')
create_empty_salespersonquotahistory_dataframe().to_sql('SalesPersonQuotaHistory', test_database, index=False, if_exists='replace')

transfer_data_to_salespersonquotahistory(test_database)

[32m2024-05-21 18:04:29.798[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_data_to_salespersonquotahistory[0m:[36m4[0m - [1mWiping old data in SalesPersonQuotaHistory...[0m
[32m2024-05-21 18:04:29.888[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mtransfer_data_to_salespersonquotahistory[0m:[36m7[0m - [32m[1mWiped old data in SalesPersonQuotaHistory.[0m
[32m2024-05-21 18:04:29.888[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_data_to_salespersonquotahistory[0m:[36m13[0m - [1mConnecting to the SalesPersonQuotaHistory data sources...[0m
[32m2024-05-21 18:04:29.890[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mtransfer_data_to_salespersonquotahistory[0m:[36m15[0m - [32m[1mConnected to the SalesPersonQuotaHistory data sources.[0m
[32m2024-05-21 18:04:29.891[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_data_to_salespersonquotahistory[0m:[36m23[0m - [1mTransferring the SalesPersonQuotaHistory data from AdventureWork

### SalesTerritoryHistory

In [5]:
def transfer_data_to_salesterritoryhistory(conn):
    
        try:
            logger.info('Wiping old data in SalesTerritoryHistory...')
            conn.execute('DELETE FROM SalesTerritoryHistory;')
            conn.commit()
            logger.success('Wiped old data in SalesTerritoryHistory.')
        except Exception as e:
            logger.error(f'Error wiping old data in SalesTerritoryHistory. Error: {e}')
            return
        
        try:
            logger.info('Connecting to the SalesTerritoryHistory data sources...')
            aw_sales_territory_history = pd.read_sql('SELECT * FROM "Sales.SalesTerritoryHistory"', adventureworks_sqlite_conn)
            logger.success('Connected to the SalesTerritoryHistory data sources.')
        except Exception as e:
            logger.error(f'Error connecting to the SalesTerritoryHistory data sources. Error: {e}')
            return
    
        surrogate_key = 1
    
        try:
            logger.info('Transferring the SalesTerritoryHistory data from AdventureWorks to SalesTerritoryHistory...')
    
            for index, row in aw_sales_territory_history.iterrows():
                
                query = """
                    INSERT INTO SalesTerritoryHistory (
                        TerritoryID,
                        BusinessEntityID,
                        StartDate,
                        EndDate,
                        ModifiedDate,
                        SK
                    ) VALUES (?, ?, ?, ?, ?, ?);
                """
    
                params = (
                    row['TerritoryID'],
                    row['BusinessEntityID'],
                    row['StartDate'],
                    row['EndDate'],
                    row['ModifiedDate'],
                    surrogate_key
                )
    
                conn.execute(query, params)
                conn.commit()
    
                surrogate_key += 1
    
            logger.success('Transferred the SalesTerritoryHistory data from AdventureWorks to SalesTerritoryHistory.')
    
        except Exception as e:
            logger.error(f'Error transferring the SalesTerritoryHistory data from AdventureWorks to SalesTerritoryHistory. Error: {e}')
            return

def create_empty_salesterritoryhistory_dataframe():
    dtypes = {
        'TerritoryID': np.int32,
        'BusinessEntityID': np.int32,
        'StartDate': 'datetime64[ns]',
        'EndDate': 'datetime64[ns]',
        'ModifiedDate': 'datetime64[ns]',
        'SK': np.int32
    }

    df = pd.DataFrame(columns=dtypes.keys()).astype(dtypes)
    return df

test_database = sqlite3.connect('../data/processed/test.sqlite')
create_empty_salesterritoryhistory_dataframe().to_sql('SalesTerritoryHistory', test_database, index=False, if_exists='replace')

transfer_data_to_salesterritoryhistory(test_database)

[32m2024-05-21 18:44:41.813[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_data_to_salesterritoryhistory[0m:[36m4[0m - [1mWiping old data in SalesTerritoryHistory...[0m
[32m2024-05-21 18:44:41.912[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mtransfer_data_to_salesterritoryhistory[0m:[36m7[0m - [32m[1mWiped old data in SalesTerritoryHistory.[0m
[32m2024-05-21 18:44:41.913[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_data_to_salesterritoryhistory[0m:[36m13[0m - [1mConnecting to the SalesTerritoryHistory data sources...[0m
[32m2024-05-21 18:44:41.915[0m | [32m[1mSUCCESS [0m | [36m__main__[0m:[36mtransfer_data_to_salesterritoryhistory[0m:[36m15[0m - [32m[1mConnected to the SalesTerritoryHistory data sources.[0m
[32m2024-05-21 18:44:41.916[0m | [1mINFO    [0m | [36m__main__[0m:[36mtransfer_data_to_salesterritoryhistory[0m:[36m23[0m - [1mTransferring the SalesTerritoryHistory data from AdventureWorks to SalesTerritoryH

### Connecties sluiten

In [19]:
northwind_sqlite_conn.close()
adventureworks_sqlite_conn.close()
northwind_ssms_conn.close()
adventureworks_ssms_conn.close()
aenc_access_conn.close()
aenc_sqlite_conn.close()