# Load SpaceParts data
- SpaceParts is a fictional company that sells... spaceship parts.
- The SQL database is a free, public training database for learning data tools like Fabric or Power BI.
- This notebook provides you with a simple and convenient way to load data from the SpaceParts database to practice either in the notebook or to write it to a Fabric data item for further use.

## Note: SpaceParts database is IP from Tabular Editor
- You can only use this database for non-commercial use.
- Respect the terms and conditions of the license agreement.

### Import libraries

In [1]:
import pyodbc
import pandas as pd
import re
from pyspark.sql import SparkSession

### Function to load data from a single table
- Used in a later function to load all tables

In [2]:
def load_data_spark(table_name, server, database, username, password, schema=None):
    """Load data using Fabric's built-in Spark connector"""
    spark = SparkSession.builder.getOrCreate()
    
    # Handle table names with spaces by enclosing in square brackets
    table_name_formatted = f"[{table_name}]"
    schema_formatted = f"[{schema}]" if schema else None
    
    # Create properly formatted table name
    if schema_formatted:
        full_table_name = f"{schema_formatted}.{table_name_formatted}"
    else:
        full_table_name = table_name_formatted
    
    # Create connection string with encryption settings
    jdbc_url = f"jdbc:sqlserver://{server};databaseName={database};encrypt=true;trustServerCertificate=true;"
    
    df = spark.read \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", full_table_name) \
        .option("user", username) \
        .option("password", password) \
        .load()
    
    return df

### Functions to standardize column / table names
- Removes capitals and spaces in names after loading the tables.

In [3]:
def standardize_column_names(df):
    """Convert all column names to lowercase with underscores instead of spaces"""
    for column in df.columns:
        # Convert to lowercase, replace spaces with underscores, and remove any special characters
        new_column = re.sub(r'[^a-zA-Z0-9_]', '', column.lower().replace(' ', '_'))
        df = df.withColumnRenamed(column, new_column)
    return df

def convert_table_names_for_lakehouse(table_name):
    """Convert table names to lowercase with underscores instead of spaces"""
    return table_name.lower().replace(' ', '_')

### Function to load all tables from the SpaceParts database
- Alternatively, you can load one table at a time.

In [4]:
def load_spaceparts(server, database, username, password):
    tables = {
        # DimView tables
        'brands': ('DimView', 'Brands'),
        'budget_rate': ('DimView', 'Budget Rate'),
        'customers': ('DimView', 'Customers'),
        'employees': ('DimView', 'Employees'),
        'exchange_rate': ('DimView', 'Exchange Rate'),
        'invoice_doc_type': ('DimView', 'Invoice Document Type'),
        'order_doc_type': ('DimView', 'Order Document Type'),
        'order_status': ('DimView', 'Order Status'),
        'products': ('DimView', 'Products'),
        'regions': ('DimView', 'Regions'),
        
        # FactView tables
        'budget': ('FactView', 'Budget'),
        'forecast': ('FactView', 'Forecast'),
        'invoices': ('FactView', 'Invoices'),
        'orders': ('FactView', 'Orders')
    }
    
    result = {}
    
    for key, (schema, table) in tables.items():
        try:
            result[key] = load_data_spark(table, server, database, username, password, schema)
            print(f"Successfully loaded {schema}.{table}")
        except Exception as e:
            print(f"Error loading {schema}.{table}: {str(e)}")
    
    return result

### Function to load data to lakehouse
- Assumes you have already created and connected to the lakehouse in the notebook explorer 


In [None]:
def write_tables_to_lakehouse(all_tables, target_schema="SpaceParts"):
    """Write all tables to Lakehouse with standardized names"""
    for key, df in all_tables.items():
        try:
            # Create standardized table name
            target_table = f"{target_schema}.{key}"
            
            # Write to Lakehouse
            df.write.mode("overwrite")\
            .option("overwriteSchema", "true")\
            .saveAsTable(target_table)
            
            print(f"Successfully wrote table to {target_table}")
        except Exception as e:
            print(f"Error writing table {key} to Lakehouse: {str(e)}")

## Loading tables

In [None]:
def load_spaceparts(server, database, username, password):
    import pyodbc
    import pandas as pd

    conn_str = (
        "DRIVER={ODBC Driver 18 for SQL Server};"
        f"SERVER={server};DATABASE={database};UID={username};PWD={password};"
        "Encrypt=yes;TrustServerCertificate=no;"
    )

    conn = pyodbc.connect(conn_str)
    cursor = conn.cursor()

    # Obtener todas las tablas disponibles
    cursor.execute("SELECT TABLE_SCHEMA, TABLE_NAME FROM INFORMATION_SCHEMA.TABLES")
    tables = cursor.fetchall()

    result = {}
    for schema, table in tables:
        query = f"SELECT * FROM [{schema}].[{table}]"
        try:
            df = pd.read_sql(query, conn)
            result[f"{schema}.{table}"] = df
            print(f"✅ Loaded {schema}.{table} ({len(df)} rows)")
        except Exception as e:
            print(f"⚠️ Error cargando {schema}.{table}: {e}")

    conn.close()
    return result


1. ## prueba de revision de la tabla Employees - revisión de información

In [None]:
import pyodbc
import pandas as pd

# Conexión directa
conn = pyodbc.connect(
    "DRIVER={ODBC Driver 18 for SQL Server};"
    "SERVER=te3-training-eu.database.windows.net;"
    "DATABASE=SpacePartsCoDW;"
    "UID=dwreader;"
    "PWD=TE3#reader!"
)

# Traer una tabla puntual
query = "SELECT * FROM dim.Employees"
Employees_df = pd.read_sql(query, conn)

print(f"Clientes cargados: {len(Employees_df)} filas")
Employees_df.head()


  Employees_df = pd.read_sql(query, conn)


Clientes cargados: 893 filas


Unnamed: 0,Role,Employee Name,Employee Email,Data Security Rule,DWCreatedDate
0,Business Line Leader,Aaneta Gibson,aaneta.gibson@spaceparts.co,"Sales for all accounts, cost & margin only for...",2023-02-10 14:52:07.963
1,Business Line Leader,Aaron Rogacz,aaron.rogacz@spaceparts.co,"Sales for all accounts, cost & margin only for...",2023-02-10 14:52:07.963
2,Station Sales Managers,Abilio Decker,abilio.decker@spaceparts.co,All accounts in Stations for which they are re...,2023-02-10 14:52:07.963
3,Account Manager,Adam Alexander,adam.alexander@spaceparts.co,Sales for all stations in which they have a re...,2023-02-10 14:52:07.963
4,Account Manager,Adam Medina,adam.medina@spaceparts.co,Sales for all stations in which they have a re...,2023-02-10 14:52:07.963


2. revisar cantidas de tablas en la conexión con la base de datos SpaceParts

In [None]:
import pyodbc
import pandas as pd

# Conexión directa
conn = pyodbc.connect(
    "DRIVER={ODBC Driver 18 for SQL Server};"
    "SERVER=te3-training-eu.database.windows.net;"
    "DATABASE=SpacePartsCoDW;"
    "UID=dwreader;"
    "PWD=TE3#reader!;"
    "Encrypt=yes;"
    "TrustServerCertificate=yes;"
)

# Consulta para traer solo los nombres de tablas
query = """
SELECT TABLE_SCHEMA, TABLE_NAME
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_TYPE = 'BASE TABLE'
ORDER BY TABLE_SCHEMA, TABLE_NAME;
"""

tables_df = pd.read_sql(query, conn)

print(f"Se encontraron {len(tables_df)} tablas:")
print(tables_df)


Se encontraron 14 tablas:
   TABLE_SCHEMA       TABLE_NAME
0           dim           Brands
1           dim      Budget-Rate
2           dim        Customers
3           dim        Employees
4           dim    Exchange-Rate
5           dim  Invoice-DocType
6           dim    Order-DocType
7           dim     Order-Status
8           dim         Products
9           dim          Regions
10         fact           Budget
11         fact         Forecast
12         fact         Invoices
13         fact           Orders


  tables_df = pd.read_sql(query, conn)


3. Revisar cantidad de filas que tienen las tablas con gran información. 

In [13]:
import pandas as pd

# Contar filas en fact.Invoices con filtro de fecha
query_invoices = """
    SELECT COUNT(*) AS filas
    FROM fact.Invoices
    WHERE DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01') 
          BETWEEN '2020-01-01' AND '2022-12-31'
"""
count_invoices = pd.read_sql(query_invoices, conn)

print("📊 Filas en fact.Invoices (2020-2022):", count_invoices.iloc[0,0])


  count_invoices = pd.read_sql(query_invoices, conn)


📊 Filas en fact.Invoices (2020-2022): 10795685


4. Revisión de las columnas de la tabla Orders, revisar forma de bajar la información por medio de filtros si tanto detalle. 

In [14]:
import pandas as pd

query = """
    SELECT TOP 10 *
    FROM fact.Orders
"""

df = pd.read_sql(query, conn)
print(df.columns)

Index(['Sales Order Document Number', 'Order Date', 'Customer Key',
       'Sales Order Document Line Item Number', 'Product Key', 'Billing Date',
       'Ship Date', 'Request Goods Receipt Date', 'Confirm Goods Receipt Date',
       'Sales Order Document Type Code',
       'Sales Order Document Line Item Status', 'Local Currency',
       'Net Order Value', 'Net Order Quantity', 'DWCreatedDate'],
      dtype='object')


  df = pd.read_sql(query, conn)


5. Revisión de las columnas de la tabla Invoices, revisar forma de bajar la información por medio de filtros si tanto detalle. 

In [15]:
import pandas as pd

query = """
    SELECT TOP 10 *
    FROM fact.Invoices
"""

df = pd.read_sql(query, conn)
print(df.columns)

  df = pd.read_sql(query, conn)


Index(['Billing Document Number', 'Billing Date', 'Customer Key',
       'Billing Document Line Item Number', 'Product Key', 'Ship Date',
       'OTD Indicator', 'Billing Document Type Code', 'Local Currency',
       'Delivery Cost', 'Net Invoice COGS', 'Late Delivery Penalties',
       'Overdue Payment Penalties', 'Taxes & Commercial Fees', 'Freight',
       'Net Invoice Cost', 'Net Invoice Value', 'Net Invoice Quantity',
       'DWCreatedDate'],
      dtype='object')


Exportacion de tabla invoices - con todos los filtros y agrupaciones necesarios para conectarla con la tabla budget de presupuestos y realizar las respectivos analisis - todo con el motivo de que esta tabla tenia mas de 10 millones de filas con infromación muy detallada

In [None]:
import pandas as pd

query = """
    SELECT 
        [Customer Key],
        [Product Key],
        YEAR(DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01')) AS Año,
        MONTH(DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01')) AS Mes,
        SUM([Net Invoice Value]) AS VentasTotales,
        SUM([Net Invoice Cost]) AS CostosTotales,
        SUM([Net Invoice Quantity]) AS CantidadVendida
    FROM fact.Invoices
    WHERE DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01') 
          BETWEEN '2020-01-01' AND '2022-12-31'
    GROUP BY 
        [Customer Key],
        [Product Key],
        YEAR(DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01')),
        MONTH(DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01'))
    ORDER BY 
        [Customer Key], [Product Key], Año, Mes;
"""

output_file = "fact_Invoices_2020_2022_byCustomer_Product.csv"

df = pd.read_sql(query, conn)

# Exportar con separador ; y coma como decimal
df.to_csv(output_file, index=False, encoding="utf-8", sep=";", decimal=",")

print(f"🎉 Exportación terminada. Archivo guardado en {output_file} con {len(df)} filas")


  df = pd.read_sql(query, conn)


In [None]:
import pandas as pd

query = """
    SELECT 
        [Customer Key],
        [Product Key],
        YEAR(DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01')) AS Año,
        MONTH(DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01')) AS Mes,
        SUM([Net Invoice Value]) AS VentasTotales,
        SUM([Net Invoice Cost]) AS CostosTotales,
        SUM([Net Invoice Quantity]) AS CantidadVendida
    FROM fact.Invoices
    WHERE DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01') 
          BETWEEN '2020-01-01' AND '2022-12-31'
    GROUP BY 
        [Customer Key],[Product Key],
        YEAR(DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01')),
        MONTH(DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01'))
    ORDER BY 
        [Customer Key], Año, Mes;
"""

output_file = "fact_Invoices_2020_2022_byCustomer.csv"

df = pd.read_sql(query, conn)
df.to_csv(output_file, index=False, encoding="utf-8")

print(f"🎉 Exportación terminada. Archivo guardado en {output_file} con {len(df)} filas")


  df = pd.read_sql(query, conn)


🎉 Exportación terminada. Archivo guardado en fact_Invoices_2020_2022_byCustomer.csv con 7031016 filas


Revisión de tablas con mucha información 

In [None]:
import os
import pandas as pd

# Crear carpeta de exportación
os.makedirs("exported_tables", exist_ok=True)

# Diccionario con todas las tablas y si se exporta completa o parcial
tables = {
    "dim_Employees": {"query": "SELECT * FROM dim.Employees", "full": True},
    "fact_Invoices": {"query": "SELECT TOP 20 * FROM fact.Invoices", "full": False},
    "fact_Orders":   {"query": "SELECT TOP 20 * FROM fact.Orders", "full": False}
}

# Exportar cada tabla según corresponda
for name, info in tables.items():
    df = pd.read_sql(info["query"], conn)
    filepath = os.path.join("exported_tables", f"{name}.csv")
    df.to_csv(filepath, index=False, encoding="utf-8")
    filas = "todas" if info["full"] else "20 primeras"
    print(f"✅ Exportado {name}.csv con {filas} filas")


  df = pd.read_sql(info["query"], conn)


✅ Exportado dim_Employees.csv con todas filas
✅ Exportado fact_Invoices.csv con 20 primeras filas
✅ Exportado fact_Orders.csv con 20 primeras filas


In [None]:
# Usage example:
# Connection parameters
server = "te3-training-eu.database.windows.net"
database = "SpacePartsCoDW"
username = "dwreader" 
password = "TE3#reader!"

# Load all tables using pyodbc
all_tables = load_spaceparts(server, database, username, password)

  df = pd.read_sql(query, conn)


✅ Loaded dim.Employees (893 rows)
✅ Loaded dim.Brands (20 rows)
✅ Loaded dim.Budget-Rate (15 rows)
✅ Loaded dim.Customers (3911 rows)
✅ Loaded dim.Exchange-Rate (57900 rows)
✅ Loaded dim.Invoice-DocType (5 rows)
✅ Loaded dim.Order-DocType (4 rows)
✅ Loaded dim.Order-Status (6 rows)
✅ Loaded dim.Products (256293 rows)
✅ Loaded dim.Regions (181 rows)
✅ Loaded fact.Budget (2947811 rows)


Exportacion de archivos no pesados

In [None]:
import os
import pandas as pd

# Crear carpeta de exportación (a nivel raíz del repo)
export_path = os.path.join(os.getcwd(), "exported_tables")
os.makedirs(export_path, exist_ok=True)

# Diccionario con todas las tablas excepto Invoices y Orders
tables = {
    "dim_Brands": "SELECT * FROM dim.Brands",
    "dim_BudgetRate": "SELECT * FROM dim.[Budget-Rate]",
    "dim_Customers": "SELECT * FROM dim.Customers",
    "dim_Employees": "SELECT * FROM dim.Employees",
    "dim_ExchangeRate": "SELECT * FROM dim.[Exchange-Rate]",
    "dim_InvoiceDocType": "SELECT * FROM dim.[Invoice-DocType]",
    "dim_OrderDocType": "SELECT * FROM dim.[Order-DocType]",
    "dim_OrderStatus": "SELECT * FROM dim.[Order-Status]",
    "dim_Products": "SELECT * FROM dim.Products",
    "dim_Regions": "SELECT * FROM dim.Regions",
    "fact_Budget": "SELECT * FROM fact.Budget",
    "fact_Forecast": "SELECT * FROM fact.Forecast"
}

# Exportar cada tabla
for name, query in tables.items():
    df = pd.read_sql(query, conn)
    filepath = os.path.join(export_path, f"{name}.csv")
    df.to_csv(filepath, index=False, encoding="utf-8", sep=";", decimal=",")
    print(f"✅ Exportado {name}.csv con {len(df)} filas")


Exportacion y query de archivo Orders 

In [None]:
import os
import pandas as pd

# Ruta correcta: exported_tables está al mismo nivel que notebooks
export_path = "exported_tables"
os.makedirs(export_path, exist_ok=True)

query = """
SELECT
    [Customer Key],
    [Product Key],
    
    -- Año y Mes para análisis agregado
    YEAR(DATEADD(SECOND, [Order Date]/1000000000, '1970-01-01')) AS Año,
    MONTH(DATEADD(SECOND, [Order Date]/1000000000, '1970-01-01')) AS Mes,
    
    -- Métricas de negocio
    SUM([Net Order Value]) AS ValorPedidos,
    SUM([Net Order Quantity]) AS CantidadPedidos,
    
    -- Diferencias en días (promedio por agrupación)
    AVG(DATEDIFF(DAY, DATEADD(SECOND, [Order Date]/1000000000, '1970-01-01'),
                      DATEADD(SECOND, [Ship Date]/1000000000, '1970-01-01'))) AS TiempoEntregaPromedio,
                      
    AVG(DATEDIFF(DAY, DATEADD(SECOND, [Order Date]/1000000000, '1970-01-01'),
                      DATEADD(SECOND, [Billing Date]/1000000000, '1970-01-01'))) AS TiempoFacturacionPromedio
FROM fact.Orders
WHERE DATEADD(SECOND, [Order Date]/1000000000, '1970-01-01')
      BETWEEN '2020-01-01' AND '2022-12-31'
GROUP BY
    [Customer Key],
    [Product Key],
    YEAR(DATEADD(SECOND, [Order Date]/1000000000, '1970-01-01')),
    MONTH(DATEADD(SECOND, [Order Date]/1000000000, '1970-01-01'))
ORDER BY
    [Customer Key], [Product Key], Año, Mes
"""

# Exportar en la carpeta exported_tables
df = pd.read_sql(query, conn)
filepath = os.path.join(export_path, "fact_Orders_2020_2022.csv")
df.to_csv(filepath, index=False, encoding="utf-8", sep=";", decimal=",")
print(f"✅ Exportado {filepath} con {len(df)} filas resumidas")


## Viewing a table

In [1]:
# Access individual tables
customers_df = all_tables['customers']
display(customers_df)

NameError: name 'all_tables' is not defined

## Writing SpaceParts tables to a lakehouse
- You must first attach a lakehouse to the notebook from the "Explorer" pane.

In [None]:
write_tables_to_lakehouse(all_tables, target_schema="SpaceParts")