In [1]:
import pandas as pd
from pathlib import Path

In [2]:
BASE_DIR = Path("s:/dev/Lakehouse Projekt/lakehouse-demo")
SILVER = BASE_DIR / "data" / "silver"


# Dim Customer & Dim Geography

In [3]:

customers = pd.read_parquet(SILVER / "sales" / "customers.parquet")
cities = pd.read_parquet(SILVER / "dimensions" / "cities.parquet")
provinces = pd.read_parquet(SILVER / "dimensions" / "provinces.parquet")
countries = pd.read_parquet(SILVER / "dimensions" / "countries.parquet")
delivery_methods = pd.read_parquet(SILVER / "dimensions" / "delivery_methods.parquet")
people = pd.read_parquet(SILVER / "dimensions" / "people.parquet")


In [4]:
print(f"customers:{customers.shape}")
print(f"cities:{cities.shape}")
print(f"provinces:{provinces.shape}")
print(f"countries:{countries.shape}")
print(f"delivery_methods:{delivery_methods.shape}")
print(f"people:{people.shape}")

customers:(625, 29)
cities:(37940, 8)
provinces:(53, 10)
countries:(190, 14)
delivery_methods:(10, 5)
people:(1111, 20)


In [5]:
# provinces -> countries
responsible_market = provinces.merge(
    countries[["country_id", "country_name", "iso_alpha3_code","continent","region","subregion"]],
    on="country_id",
    how="left"
)

responsible_market = responsible_market.merge(
    cities[["state_province_id","city_id", "city_name"]],
    on ="state_province_id",
    how="left"

)

print(f"Shape after Join: {responsible_market.shape}")


Shape after Join: (37940, 17)


In [6]:
df = customers.merge(
    delivery_methods[["delivery_method_id", "delivery_method_name"]],
    on="delivery_method_id",
    how = "left"
)
print(f"Shape nach Join: {df.shape}")


Shape nach Join: (625, 30)


In [7]:
df = df.merge(
    responsible_market[["city_id", "city_name", "country_name","state_province_code","state_province_name","iso_alpha3_code", "continent", "region", "subregion","sales_territory"]],
    left_on="delivery_city_id",
    right_on="city_id",
    how="left"
)
print(f"Shape nach Join: {df.shape}")

Shape nach Join: (625, 40)


In [8]:
df = df.merge(
    people[["person_id","full_name","email_address"]],
    left_on="primary_contact_person_id",
    right_on="person_id",
    how="left"
)

In [9]:
dim_customers = df[[
    "customer_id",
    "customer_name",
    "full_name",
    "email_address",
    "phone_number",
    "fax_number",
    "website_url",
    "bill_to_customer_id",
    "credit_limit",
    "delivery_method_name",
    "delivery_address_line1",
    "delivery_address_line2",
    "delivery_postal_code",
    "postal_address_line1",
    "postal_address_line2",
    "postal_postal_code",
    "city_name",
    "state_province_code",
    "state_province_name",
    "sales_territory",
    "country_name",
    "iso_alpha3_code",
    "continent",
    "region",
    "subregion"
]]

print(f"Shape dim_customers: {dim_customers.shape}")
dim_customers.head()

Shape dim_customers: (625, 25)


Unnamed: 0,customer_id,customer_name,full_name,email_address,phone_number,fax_number,website_url,bill_to_customer_id,credit_limit,delivery_method_name,...,postal_postal_code,city_name,state_province_code,state_province_name,sales_territory,country_name,iso_alpha3_code,continent,region,subregion
0,1,Tailspin Toys (Head Office),Waldemar Fisar,waldemar@tailspintoys.com,(308) 555-0100,(308) 555-0101,http://www.tailspintoys.com,1,,Delivery Van,...,90410,Lisco,NE,Nebraska,Plains,United States,USA,North America,Americas,Northern America
1,2,"Tailspin Toys (Sylvanite, MT)",Lorena Cindric,lorena@tailspintoys.com,(406) 555-0100,(406) 555-0101,http://www.tailspintoys.com/Sylvanite,1,,Delivery Van,...,90216,Sylvanite,MT,Montana,Rocky Mountain,United States,USA,North America,Americas,Northern America
2,3,"Tailspin Toys (Peeples Valley, AZ)",Bhaargav Rambhatla,bhaargav@tailspintoys.com,(480) 555-0100,(480) 555-0101,http://www.tailspintoys.com/PeeplesValley,1,,Delivery Van,...,90205,Peeples Valley,AZ,Arizona,Southwest,United States,USA,North America,Americas,Northern America
3,4,"Tailspin Toys (Medicine Lodge, KS)",Daniel Roman,daniel@tailspintoys.com,(316) 555-0100,(316) 555-0101,http://www.tailspintoys.com/MedicineLodge,1,,Delivery Van,...,90152,Medicine Lodge,KS,Kansas,Plains,United States,USA,North America,Americas,Northern America
4,5,"Tailspin Toys (Gasport, NY)",Johanna Huiting,johanna@tailspintoys.com,(212) 555-0100,(212) 555-0101,http://www.tailspintoys.com/Gasport,1,,Delivery Van,...,90261,Gasport,NY,New York,Mideast,United States,USA,North America,Americas,Northern America


In [10]:
# ===== IMPUTATION: FEHLENDE WERTE FÜLLEN =====

# 1. credit_limit: Mit Hauptkundenkredit füllen
print("=== IMPUTATION CREDIT_LIMIT ===")
print(f"Vorher: {dim_customers['credit_limit'].isna().sum()} missing")

# Merge mit Hauptkunde um deren Kreditlimit zu bekommen
main_customer_credits = dim_customers[['customer_id', 'credit_limit']].rename(
    columns={'customer_id': 'bill_to_customer_id', 'credit_limit': 'parent_credit_limit'}
)
dim_customers = dim_customers.merge(main_customer_credits, on='bill_to_customer_id', how='left')

# Füllen: Erst mit Hauptkundenkredit, dann mit Durchschnitt
avg_credit_limit = dim_customers['credit_limit'].mean()
dim_customers['credit_limit'] = (
    dim_customers['credit_limit']
    .fillna(dim_customers['parent_credit_limit'])
    .fillna(avg_credit_limit)
)

print(f"Nachher: {dim_customers['credit_limit'].isna().sum()} missing")
print(f"Durchschnittlicher credit_limit: {avg_credit_limit:.2f}\n")

# 2. full_name: Mit 'Head Office' füllen
print("=== IMPUTATION FULL_NAME & EMAIL_ADDRESS ===")
print(f"Vorher - full_name: {dim_customers['full_name'].isna().sum()} missing")
print(f"Vorher - email_address: {dim_customers['email_address'].isna().sum()} missing")

dim_customers['full_name'] = dim_customers['full_name'].fillna('Head Office')

# 3. email_address: Mit generischer Adresse basierend auf Kundennamen füllen
# Format: info@customername.com (in lowercase, spaces entfernt)
dim_customers['email_address'] = dim_customers.apply(
    lambda row: row['email_address'] if pd.notna(row['email_address'])
    else 'info@' + row['customer_name'].replace(' (Head Office)', '').replace(' ', '').lower() + '.com',
    axis=1
)

print(f"Nachher - full_name: {dim_customers['full_name'].isna().sum()} missing")
print(f"Nachher - email_address: {dim_customers['email_address'].isna().sum()} missing")

# Beispiele anzeigen
print("\nBeispiele der gefüllten Daten:")
filled_mask = dim_customers['customer_name'].str.contains('Head Office', na=False)
print(dim_customers[filled_mask][['customer_name', 'full_name', 'email_address', 'credit_limit']].head(5))

=== IMPUTATION CREDIT_LIMIT ===
Vorher: 402 missing
Nachher: 0 missing
Durchschnittlicher credit_limit: 2607.68

=== IMPUTATION FULL_NAME & EMAIL_ADDRESS ===
Vorher - full_name: 0 missing
Vorher - email_address: 0 missing
Nachher - full_name: 0 missing
Nachher - email_address: 0 missing

Beispiele der gefüllten Daten:
                   customer_name         full_name              email_address  \
0    Tailspin Toys (Head Office)    Waldemar Fisar  waldemar@tailspintoys.com   
201   Wingtip Toys (Head Office)  Olga Alexandrova       olga@wingtiptoys.com   

     credit_limit  
0     2607.680493  
201   2607.680493  


In [11]:

cols_to_drop = [col for col in dim_customers.columns if 'parent_credit_limit' in col]
if cols_to_drop:
    dim_customers = dim_customers.drop(columns=cols_to_drop)
    print(f"Entfernt: {cols_to_drop} ✓")
else:
    print("Keine parent_credit_limit Spalten gefunden")

Entfernt: ['parent_credit_limit'] ✓


# Dim Suppliers

In [12]:
suppliers = pd.read_parquet(SILVER / "purchasing" / "suppliers.parquet")

In [13]:
#delivery methods
Dim_Suppliers = suppliers.merge(
    delivery_methods[["delivery_method_id", "delivery_method_name"]],
    on="delivery_method_id",
    how="left"
)


In [14]:
# Contact Person
Dim_Suppliers = Dim_Suppliers.merge(
    people[["person_id","full_name","email_address","phone_number"]],
    left_on="primary_contact_person_id",
    right_on="person_id",
    how="left"
)

In [15]:
Dim_Suppliers = Dim_Suppliers.merge(
    cities[["city_id", "city_name"]],
    left_on="delivery_city_id",
    right_on="city_id",
    how="left"
).rename(columns={"city_name": "delivery_city"})

Dim_Suppliers = Dim_Suppliers.merge(
    cities[["city_id", "city_name"]],
    left_on="postal_city_id",
    right_on="city_id",
    how="left"
).rename(columns={"city_name": "postal_city"})

In [16]:
# Impute 
Dim_Suppliers["internal_comments"] = Dim_Suppliers["internal_comments"].fillna("Not available")
Dim_Suppliers["delivery_address_line1"] = Dim_Suppliers["delivery_address_line1"].fillna("No address line")
Dim_Suppliers["delivery_method_name"] = Dim_Suppliers["delivery_method_name"].fillna("Upon request")
Dim_Suppliers["delivery_address_line1"] = Dim_Suppliers["delivery_address_line1"].fillna("No adress line")


In [17]:
# Reorder columns
Dim_Suppliers = Dim_Suppliers[[
    "supplier_id",
    "supplier_name",
    "full_name",
    "email_address",
    "phone_number_y",
    "delivery_method_name",
    "delivery_address_line2",
    "delivery_address_line1",
    "delivery_postal_code",
    "delivery_city",
    "postal_address_line2",
    "postal_address_line1",
    "postal_postal_code",
    "postal_city",
    "internal_comments",
    "supplier_reference",
    "bank_account_name",
    "bank_account_branch",
    "bank_account_code"
]]

In [18]:
column_mapping_suppliers = {
    "supplier_id": "supplier_id",
    "supplier_name": "supplier_name",
    'full_name': 'contact_full_name',
    'email_address': 'contact_email',
    'phone_number_y': 'contact_phone',
    "delivery_method_name": "delivery_method",
    "delivery_address_line2": "delivery_street_address",
    "delivery_address_line1": "delivery_address_additional",
    "delivery_postal_code": "delivery_postal_code",
    "delivery_city": "delivery_city",
    "postal_address_line2": "postal_street_address",
    "postal_address_line1": "postal_address_additional",
    "postal_postal_code": "postal_postal_code",
    "postal_city": "postal_city",
    "internal_comments": "internal_comments",
    "supplier_reference": "supplier_reference",
    "bank_account_name": "bank_account_name",
    "bank_account_branch": "bank_account_branch",
    "bank_account_code": "bank_account_code"
}

Dim_Suppliers = Dim_Suppliers.rename(columns=column_mapping_suppliers)

In [19]:
print(f"Shape dim_suppliers: {Dim_Suppliers.shape}")

Shape dim_suppliers: (13, 19)


# Dim Stock Items

In [20]:
stockitems = pd.read_parquet(SILVER / "dimensions" / "stock_items.parquet")
colors = pd.read_parquet(SILVER / "dimensions" / "colors.parquet")
package_types = pd.read_parquet(SILVER / "dimensions" / "package_types.parquet")

In [21]:
dim_stockitems = stockitems.merge(
    colors[["color_id", "color_name"]],
    on="color_id",
    how="left"
).rename(columns={"color_name": "color"})


In [22]:
dim_stockitems =dim_stockitems.merge(
    package_types[["package_type_id", "package_type_name"]],
    right_on="package_type_id",
    left_on="unit_package_id",
    how="left"
).rename(columns={"package_type_name": "unit_package"})

dim_stockitems =dim_stockitems.merge(
    package_types[["package_type_id", "package_type_name"]],
    right_on="package_type_id",
    left_on="outer_package_id",
    how="left"
).rename(columns={"package_type_name": "outer_package"})



In [23]:
# Select and reorder columns
dim_stockitems = dim_stockitems[[
    "stock_item_id",
    "stock_item_name",
    "color",
    "unit_package",
    "outer_package",
    "supplier_id",
    "brand",
    "size",
    "typical_weight_per_unit",
    "lead_time_days",
    "quantity_per_outer",
    "is_chiller_stock",
    "tax_rate",
    "unit_price",
    "recommended_retail_price",
    "search_details"
    
]]

#rename columns
column_mapping_stockitems = {
    "stock_item_id":              "stock_item_id",
    "stock_item_name":            "product_name",
    "color":                      "color",
    "unit_package":               "unit_package_type",
    "outer_package":              "outer_package_type",
    "supplier_id":                "supplier_id",
    "brand":                      "brand",
    "size":                       "size",
    "typical_weight_per_unit":    "weight_per_unit_kg",
    "lead_time_days":             "lead_time_days",
    "quantity_per_outer":         "quantity_per_outer",
    "is_chiller_stock":           "is_chiller_stock",
    "tax_rate":                   "tax_rate",
    "unit_price":                 "unit_price",
    "recommended_retail_price":   "recommended_retail_price",
    "search_details":             "search_details"
}

dim_stockitems = dim_stockitems.rename(columns=column_mapping_stockitems)
print(f"Shape dim_stockitems: {dim_stockitems.shape}")

Shape dim_stockitems: (227, 16)


# Dim_People

In [24]:
Dim_People = people[[
    "person_id",
    "full_name",
    "email_address",
    "phone_number",
    "fax_number",
    
]]

In [25]:
Dim_People = Dim_People[Dim_People["person_id"] != 1]


In [26]:
GOLD = BASE_DIR / "data" / "gold"


In [27]:
df_test = pd.read_parquet(GOLD / "dim_supplier.parquet")

# Fact Orders

In [33]:
orders = pd.read_parquet(SILVER / "sales" / "orders.parquet")
order_lines = pd.read_parquet(SILVER / "sales" / "order_lines.parquet")

print(orders.shape)

(40286, 13)


In [None]:
# Joins 

df = orders.merge(
    customers[["customer_id","customer_name"]],
    on="customer_id",
    how="left"
)

print(df.shape)

df = df.merge(
    people[["person_id","full_name"]],
    left_on = "sales_person_id",
    right_on = "person_id",
    how="left"
).rename(columns={"full_name": "sales_person"})

df = df.merge(
    people[["person_id","full_name"]],
    left_on = "contact_person_id",
    right_on = "person_id",
    how="left"
).rename(columns={"full_name": "contact_person"})


(40286, 14)


In [35]:
df = df.merge(
    people[["person_id","full_name"]],
    left_on = "salesperson_id",
    right_on = "person_id",
    how="left"
).rename(columns={"full_name": "sales_person"})

df = df.merge(
    people[["person_id","full_name"]],
    left_on = "contact_person_id",
    right_on = "person_id",
    how="left"
).rename(columns={"full_name": "contact_person"})


In [36]:
print(df.shape)

(40286, 18)
