# 2.3: ETL
Deze notebook kan 2 dingen doen:
1. Alle data in de data warehouse schoonmaken
2. Alle data vanuit de 3 sqlite DB's en de 2 CSV-bestanden vullen in de DW

In [37]:
import pyodbc
import sqlite3
import pandas as pd
import sqlalchemy as sa

connection_url = sa.engine.URL.create(
    'mssql+pyodbc',
    username='SA',
    password='MyPass@word',
    host='127.0.0.1',
    database='deds',
    query={"driver": "ODBC Driver 17 for SQL Server", "Encrypt": "no"}
)

try:
    engine = sa.create_engine(connection_url)
    with engine.connect() as conn:
        print("Connected to MSSQL successfully!")
except Exception as e:
    print(f"Failed to connect to MSSQL: {e}")

try:
    crm_conn = sqlite3.connect("../data/go_crm_volledig.sqlite")
    sales_conn = sqlite3.connect("../data/go_sales_volledig.sqlite")
    staff_conn = sqlite3.connect("../data/go_staff_volledig.sqlite")
    print("Connected to SQLite databases successfully!")
except Exception as e:
    print(f"Failed to connect to SQLite: {e}")

Connected to MSSQL successfully!
Connected to SQLite databases successfully!


## Clear Data

In [3]:
from sqlalchemy import text
from sqlalchemy.orm import sessionmaker
import sqlalchemy as sa

Session = sessionmaker(bind=engine)
session = Session()

# Step 2: Disable foreign key constraints (SQL Server-specific)
session.execute(text("EXEC sp_MSforeachtable 'ALTER TABLE ? NOCHECK CONSTRAINT ALL'"))
session.commit()

# Step 3: Delete all data from all tables
meta = sa.MetaData()
meta.reflect(bind=engine)  # Load table information

with session.begin():  # Ensures transaction safety
    for table in reversed(meta.sorted_tables):  # Drop data respecting dependencies
        session.execute(table.delete())  # `DELETE FROM table`

# Step 4: Re-enable foreign key constraints
session.execute(text("EXEC sp_MSforeachtable 'ALTER TABLE ? CHECK CONSTRAINT ALL'"))
session.commit()

# Close session
session.close()

print("Cleared Data")

Cleared Data


## Fill Data

### Age Group
From:
- CRM

In [4]:
age_group = pd.read_sql_query('SELECT * FROM age_group', crm_conn)
age_group.to_sql('age_group', engine, if_exists='append', index=False)

6

### Sales Territory
From:
- CRM

In [5]:
sales_territory = pd.read_sql_query('SELECT * FROM sales_territory', crm_conn)
sales_territory.to_sql('sales_territory', engine, if_exists='append', index=False)

5

### Country
From:
- CRM
- Sales

Depends on:
- Sales Territory

In [21]:
crm_country = pd.read_sql_query('SELECT * FROM country', crm_conn)

sales_country = pd.read_sql_query('SELECT * FROM country', sales_conn)

country = pd.merge(crm_country, sales_country, on=['COUNTRY_CODE'], how='inner')
country = country.drop(columns=['COUNTRY'])

country.to_sql('country', engine, if_exists='append', index=False)

Unnamed: 0,COUNTRY_CODE,COUNTRY_EN,FLAG_IMAGE,SALES_TERRITORY_CODE,LANGUAGE,CURRENCY_NAME
0,1,France,F01,6,EN,francs
1,2,Germany,F02,6,EN,marks
2,3,United States,F03,1,EN,dollars
3,4,Canada,F04,1,EN,dollars
4,5,Austria,F05,7,EN,schillings


### Retailer Segment
From:
- CRM

In [7]:
retailer_segment = pd.read_sql_query('SELECT * FROM retailer_segment', crm_conn)
retailer_segment.to_sql('retailer_segment', engine, if_exists='append', index=False)

12

### Retailer Headquarters
From:
- CRM

Depends on:
- Retailer Segment

In [8]:
retailer_headquarters = pd.read_sql_query('SELECT * FROM retailer_headquarters', crm_conn)
retailer_headquarters.to_sql('retailer_headquarters', engine, if_exists='append', index=False)

205

### Retailer Type
From:
- CRM

In [9]:
retailer_type = pd.read_sql_query('SELECT * FROM retailer_type', crm_conn)
retailer_type.to_sql('retailer_type', engine, if_exists='append', index=False)

8

### Retailer
From:
- CRM

Depends on:
- Retailer Headquarters
- Retailer Type

In [10]:
retailer = pd.read_sql_query('SELECT * FROM retailer', crm_conn)
retailer.to_sql('retailer', engine, if_exists='append', index=False)

109

### Retailer Site
From:
- CRM
- Sales

Depends on:
- Retailer
- Country

In [17]:
crm_retailer_site.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 391 entries, 0 to 390
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   RETAILER_SITE_CODE  391 non-null    int64 
 1   RETAILER_CODE       391 non-null    int64 
 2   ADDRESS1            391 non-null    object
 3   ADDRESS2            105 non-null    object
 4   CITY                391 non-null    object
 5   REGION              251 non-null    object
 6   COUNTRY_CODE        391 non-null    int64 
 7   ACTIVE_INDICATOR    391 non-null    int64 
dtypes: int64(4), object(4)
memory usage: 24.6+ KB


In [18]:
sales_retailer_site.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 391 entries, 0 to 390
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   RETAILER_SITE_CODE  391 non-null    int64 
 1   RETAILER_CODE       391 non-null    int64 
 2   ADDRESS1            391 non-null    object
 3   ADDRESS2            105 non-null    object
 4   CITY                391 non-null    object
 5   REGION              251 non-null    object
 6   COUNTRY_CODE        391 non-null    int64 
 7   ACTIVE_INDICATOR    391 non-null    int64 
dtypes: int64(4), object(4)
memory usage: 24.6+ KB


In [22]:
crm_retailer_site = pd.read_sql_query('SELECT * FROM retailer_site', crm_conn)

sales_retailer_site = pd.read_sql_query('SELECT * FROM retailer_site', sales_conn)

retailer_site = pd.merge(
    crm_retailer_site, 
    sales_retailer_site, 
    on=['RETAILER_SITE_CODE'], 
    how='inner', 
    suffixes=('_crm', '_sales')
)

retailer_site = retailer_site.drop(columns=['RETAILER_CODE_sales', 'ADDRESS1_sales', 'ADDRESS2_sales', 
                   'CITY_sales', 'REGION_sales', 'COUNTRY_CODE_sales', 'ACTIVE_INDICATOR_sales'])

retailer_site = retailer_site.rename(columns=lambda x: x.replace('_crm', ''))

retailer_site.to_sql('retailer_site', engine, if_exists='append', index=False)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 391 entries, 0 to 390
Data columns (total 8 columns):
 #   Column              Non-Null Count  Dtype 
---  ------              --------------  ----- 
 0   RETAILER_SITE_CODE  391 non-null    int64 
 1   RETAILER_CODE       391 non-null    int64 
 2   ADDRESS1            391 non-null    object
 3   ADDRESS2            105 non-null    object
 4   CITY                391 non-null    object
 5   REGION              251 non-null    object
 6   COUNTRY_CODE        391 non-null    int64 
 7   ACTIVE_INDICATOR    391 non-null    int64 
dtypes: int64(4), object(4)
memory usage: 24.6+ KB
None


129

### Retailer Contact
From:
- CRM

Depends on:
- Retailer Site

In [23]:
retailer_contact = pd.read_sql_query('SELECT * FROM retailer_contact', crm_conn)
retailer_contact.to_sql('retailer_contact', engine, if_exists='append', index=False)

158

### Sales Demographic
From:
- CRM

Depends on:
- Retailer Headquarters
- Age Group

In [24]:
sales_demographic = pd.read_sql_query('SELECT * FROM sales_demographic', crm_conn)
sales_demographic.to_sql('sales_demographic', engine, if_exists='append', index=False)

388

### Sales Branch
From:
- Sales
- Staff

Depends on:
- Country

In [34]:
sales_sales_branch = pd.read_sql_query('SELECT * FROM sales_branch', sales_conn)

staff_sales_branch = pd.read_sql_query('SELECT * FROM sales_branch', staff_conn)

sales_branch = pd.merge(
    sales_sales_branch, 
    staff_sales_branch, 
    on=['SALES_BRANCH_CODE'], 
    how='inner', 
    suffixes=('_sales', '_staff')
)

sales_branch = sales_branch.drop(columns=['ADDRESS1_sales', 'ADDRESS2_sales', 'CITY_sales', 
                   'REGION_sales', 'COUNTRY_CODE_sales'])

sales_branch = sales_branch.rename(columns=lambda x: x.replace('_staff', ''))

sales_branch.to_sql('sales_branch', engine, if_exists='append', index=False)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 28 entries, 0 to 27
Data columns (total 6 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   SALES_BRANCH_CODE  28 non-null     int64 
 1   ADDRESS1           28 non-null     object
 2   ADDRESS2           9 non-null      object
 3   CITY               28 non-null     object
 4   REGION             15 non-null     object
 5   COUNTRY_CODE       28 non-null     int64 
dtypes: int64(2), object(4)
memory usage: 1.4+ KB


28

### Course
From:
- Staff

In [35]:
course = pd.read_sql_query('SELECT * FROM course', staff_conn)
course.to_sql('course', engine, if_exists='append', index=False)

9

### Sales Staff
From:
- Sales
- Staff

Depends on:
- Sales Branch

In [41]:
sales_sales_staff = pd.read_sql_query('SELECT * FROM sales_staff', sales_conn)

staff_sales_staff = pd.read_sql_query('SELECT * FROM sales_staff', staff_conn)

sales_staff = pd.merge(
    sales_sales_staff, 
    staff_sales_staff, 
    on=['SALES_STAFF_CODE'], 
    how='inner', 
    suffixes=('_sales', '_staff')
)

sales_staff = sales_staff.drop(columns=['FIRST_NAME_sales', 'LAST_NAME_sales', 'POSITION_EN_sales', 'WORK_PHONE_sales', 'EXTENSION_sales', 
                                        'FAX_sales', 'EMAIL_sales', 'DATE_HIRED_sales', 'SALES_BRANCH_CODE_sales'])

sales_staff = sales_staff.rename(columns=lambda x: x.replace('_staff', ''))

sales_staff.to_sql('sales_staff', engine, if_exists='append', index=False)

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 102 entries, 0 to 101
Data columns (total 20 columns):
 #   Column                   Non-Null Count  Dtype  
---  ------                   --------------  -----  
 0   SALES_STAFF_CODE         102 non-null    int64  
 1   FIRST_NAME_sales         102 non-null    object 
 2   LAST_NAME_sales          102 non-null    object 
 3   POSITION_EN_sales        102 non-null    object 
 4   WORK_PHONE_sales         102 non-null    object 
 5   EXTENSION_sales          100 non-null    float64
 6   FAX_sales                102 non-null    object 
 7   EMAIL_sales              102 non-null    object 
 8   DATE_HIRED_sales         102 non-null    object 
 9   SALES_BRANCH_CODE_sales  102 non-null    int64  
 10  FIRST_NAME_staff         102 non-null    object 
 11  LAST_NAME_staff          102 non-null    object 
 12  POSITION_EN_staff        102 non-null    object 
 13  WORK_PHONE_staff         102 non-null    object 
 14  EXTENSION_staff          1

102

### Order Method
From:
- Sales

In [42]:
order_method = pd.read_sql_query('SELECT * FROM order_method', sales_conn)
order_method.to_sql('order_method', engine, if_exists='append', index=False)

7

### Order Header
From:
- Sales

Depends on:
- Retailer Site
- Sales Staff
- Sales Branch
- Order Method

In [43]:
order_header = pd.read_sql_query('SELECT * FROM order_header', sales_conn)
order_header.to_sql('order_header', engine, if_exists='append', index=False)

120

### Product Line
From:
- Sales

In [44]:
product_line = pd.read_sql_query('SELECT * FROM product_line', sales_conn)
product_line.to_sql('product_line', engine, if_exists='append', index=False)

5

### Product Type
From:
- Sales

Depends on:
- Product Line

In [45]:
product_type = pd.read_sql_query('SELECT * FROM product_type', sales_conn)
product_type.to_sql('product_type', engine, if_exists='append', index=False)

21

### Product
From:
- Sales

Depends on:
- Product Type

In [46]:
product = pd.read_sql_query('SELECT * FROM product', sales_conn)
product.to_sql('product', engine, if_exists='append', index=False)

115

### Order Details
From:
- Sales

Depends on:
- Order Header
- Product

In [47]:
order_details = pd.read_sql_query('SELECT * FROM order_details', sales_conn)
order_details.to_sql('order_details', engine, if_exists='append', index=False)

7

### Return Reason
From:
- Sales

In [48]:
return_reason = pd.read_sql_query('SELECT * FROM return_reason', sales_conn)
return_reason.to_sql('return_reason', engine, if_exists='append', index=False)

5

### Returned Item
From:
- Sales

Depends on:
- Order Details
- Return Reason

In [49]:
returned_item = pd.read_sql_query('SELECT * FROM returned_item', sales_conn)
returned_item.to_sql('returned_item', engine, if_exists='append', index=False)

287

### Satisfaction Type
From:
- Staff

In [51]:
satisfaction_type = pd.read_sql_query('SELECT * FROM satisfaction_type', staff_conn)
satisfaction_type.to_sql('satisfaction_type', engine, if_exists='append', index=False)

5

### Satisfaction
From:
- Staff

Depends on:
- Sales Staff
- Satisfaction Type

In [52]:
satisfaction = pd.read_sql_query('SELECT * FROM satisfaction', staff_conn)
satisfaction.to_sql('satisfaction', engine, if_exists='append', index=False)

301

### Training
From:
- Staff

Depends on:
- Sales Staff
- Course

In [54]:
training = pd.read_sql_query('SELECT * FROM training', staff_conn)
training.to_sql('training', engine, if_exists='append', index=False)

402

### Product Forecast

In [56]:
product_forecast = pd.read_csv('../data/product_forecast_volledig.csv')

product_forecast.to_sql('product_forecast', engine, if_exists='append', index=False)

204

### Inventory Levels

In [57]:
inventory_levels = pd.read_csv('../data/inventory_levels_volledig.csv')

inventory_levels.to_sql('product_forecast', engine, if_exists='append', index=False)

ProgrammingError: (pyodbc.ProgrammingError) ('42S22', "[42S22] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid column name 'INVENTORY_YEAR;INVENTORY_MONTH;PRODUCT_NUMBER;INVENTORY_COUNT'. (207) (SQLExecDirectW)")
[SQL: INSERT INTO product_forecast ([INVENTORY_YEAR;INVENTORY_MONTH;PRODUCT_NUMBER;INVENTORY_COUNT]) VALUES (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?) ... 4750 characters truncated ... , (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?), (?)]
[parameters: ('2023;4;48;1932', '2023;4;49;1400', '2023;4;50;21705', '2023;4;51;9710', '2023;4;52;5616', '2023;4;53;7060', '2023;4;54;6943', '2023;4;55;4432', '2023;4;56;14188', '2023;4;57;1599', '2023;4;58;1645', '2023;4;59;749', '2023;4;60;8419', '2023;4;61;7175', '2023;4;62;2428', '2023;4;63;2091', '2023;4;64;4175', '2023;4;65;1754', '2023;4;66;576', '2023;4;67;1354', '2023;4;68;1576', '2023;4;69;609', '2023;4;70;803', '2023;4;71;1009', '2023;4;72;1009', '2023;4;73;1508', '2023;4;74;2690', '2023;4;75;2416', '2023;4;76;1938', '2023;4;77;2354', '2023;4;78;1073', '2023;4;79;733', '2023;4;80;889', '2023;4;81;573', '2023;4;82;2075', '2023;4;83;2034', '2023;4;84;-183', '2023;4;85;1390', '2023;4;86;17366', '2023;4;87;19789', '2023;4;88;18456', '2023;4;89;21414', '2023;4;90;29722', '2023;4;91;14100', '2023;4;92;6273', '2023;4;93;17880', '2023;4;94;26290', '2023;4;95;20552', '2023;4;96;3404', '2023;4;97;1375' ... 900 parameters truncated ... '2023;12;78;889', '2023;12;79;547', '2023;12;80;1327', '2023;12;81;939', '2023;12;82;3069', '2023;12;83;2362', '2023;12;84;1195', '2023;12;85;1624', '2023;12;86;11284', '2023;12;87;19731', '2023;12;88;15362', '2023;12;89;15364', '2023;12;90;21124', '2023;12;91;10098', '2023;12;92;4967', '2023;12;93;14606', '2023;12;94;19716', '2023;12;95;17402', '2023;12;96;2312', '2023;12;97;1421', '2023;12;98;7605', '2023;12;99;5043', '2023;12;100;5078', '2023;12;101;1221', '2023;12;102;1010', '2023;12;103;350', '2023;12;104;497', '2023;12;105;957', '2023;12;106;887', '2023;12;107;409', '2023;12;108;500', '2023;12;109;1161', '2023;12;110;1343', '2023;12;111;736', '2023;12;112;6764', '2023;12;113;3839', '2023;12;114;246', '2023;12;115;2462', '2024;1;1;6985', '2024;1;2;7862', '2024;1;3;4628', '2024;1;4;9386', '2024;1;5;2232', '2024;1;6;904', '2024;1;7;2627', '2024;1;8;1383', '2024;1;9;1885', '2024;1;10;6984', '2024;1;11;820', '2024;1;12;2108')]
(Background on this error at: https://sqlalche.me/e/20/f405)