In [0]:
# Importing Files and Libraries
import pandas as pd
import numpy as np
import re
import time
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql import functions as F
import warnings
import os
import tempfile
import shutil
warnings.filterwarnings('ignore')


print("="*120)
print("MATERIAL PEGGING MAP - PER-SKU PEGGING WITH COMPLETE HIERARCHY")
print("="*120)

dp_file = "/dbfs/FileStore/tables/20251006_DP_Material_Shortage___Working_file.xlsx"
snp_file = "/dbfs/FileStore/tables/ParkourSC_SNP.xlsx"
pegging_file = "/dbfs/FileStore/tables/Material_pegging_SKU_2_BOM.xlsx"

output_dir = "/dbfs/FileStore/tables/output"
os.makedirs(output_dir, exist_ok=True)

print(f"dp_file: {dp_file}")

if os.path.exists(dp_file):
    df_dp = pd.read_excel(dp_file)
else:
    raise FileNotFoundError(f"The file {dp_file} does not exist.")

print(df_dp.head())


# Check names of the Files
import os
print(os.listdir("/dbfs/FileStore/tables"))

MATERIAL PEGGING MAP - PER-SKU PEGGING WITH COMPLETE HIERARCHY
dp_file: /dbfs/FileStore/tables/20251006_DP_Material_Shortage___Working_file.xlsx


Exception ignored in: <function ZipFile.__del__ at 0x7f2d9b4bcd30>
Traceback (most recent call last):
  File "/usr/lib/python3.10/zipfile.py", line 1834, in __del__
    self.close()
  File "/usr/lib/python3.10/zipfile.py", line 1851, in close
    self.fp.seek(self.start_dir)
ValueError: seek of closed file


  Date: 01/06/2020     Unnamed: 1  ... Unnamed: 39 Unnamed: 40
0              NaN  Insugen 30/70  ...         NaN         NaN
1              May              0  ...         NaN         NaN
2             June              2  ...         NaN         NaN
3             July              3  ...         NaN         NaN
4           August              3  ...         NaN         NaN

[5 rows x 41 columns]
['20251006_DP_Material_Shortage___Working_file.xlsx', 'Aug_25_L2_DP_Plan_Circulation_V2.xlsx', 'Material_pegging_SKU_2_BOM.xlsx', 'ParkourSC_SNP.xlsx', 'my_dataframe.csv', 'output', 'test']


In [0]:
## Display Files stored in DBFS
display(dbutils.fs.ls("/FileStore/tables"))


path,name,size,modificationTime
dbfs:/FileStore/tables/20251006_DP_Material_Shortage___Working_file.xlsx,20251006_DP_Material_Shortage___Working_file.xlsx,26368580,1764843045000
dbfs:/FileStore/tables/Aug_25_L2_DP_Plan_Circulation_V2.xlsx,Aug_25_L2_DP_Plan_Circulation_V2.xlsx,176999,1764843036000
dbfs:/FileStore/tables/Material_pegging_SKU_2_BOM.xlsx,Material_pegging_SKU_2_BOM.xlsx,241208,1764843075000
dbfs:/FileStore/tables/ParkourSC_SNP.xlsx,ParkourSC_SNP.xlsx,623641,1764843047000
dbfs:/FileStore/tables/my_dataframe.csv,my_dataframe.csv,307320,1702349271000
dbfs:/FileStore/tables/output/,output/,0,1765174072053
dbfs:/FileStore/tables/test/,test/,0,1765174072053


## Normalize Text

In [0]:
# Nomalize Text
def normalize_text(text):
    if pd.isna(text) or text is None:
        return None
    text = str(text).strip()
    text = re.sub(r'\\s+', ' ', text)
    return text if text else None

def normalize_product_no(value):
    if pd.isna(value) or value is None:
        return None
    text = str(value).strip()
    cleaned = re.sub(r'[^a-zA-Z0-9]', '', text)
    return cleaned if cleaned else None

def extract_model_components(model_text):
    if pd.isna(model_text):
        return None
    model_text = str(model_text).strip()
    components = re.split(r'_+', model_text)
    components = [c.strip() for c in components if c.strip()]
    return '_'.join(components)

def is_valid_qty(qty):
    if pd.isna(qty):
        return False
    qty_str = str(qty).strip()
    if not qty_str or qty_str == '0' or qty_str == 'nan':
        return False
    try:
        return float(qty_str) > 0
    except (ValueError, TypeError):
        return False

print("Functions loaded")

Functions loaded


## Loading Data into DataFrames

In [0]:
## Loading Data into DataFrames
print("Loading data...")

df_headers = pd.read_excel(dp_file, sheet_name="DP Shortage", header=None, skiprows=18, nrows=4, usecols=range(14, 135))
product_headers = {}
for col_idx in range(df_headers.shape[1]):
    product_id = normalize_product_no(df_headers.iloc[1, col_idx])
    if product_id and product_id != '0':
        product_headers[product_id] = {
            'Product_ID': product_id,
            'Product_Description': normalize_text(df_headers.iloc[3, col_idx]),
            'Batch_Size': df_headers.iloc[0, col_idx],
            'Column_Index': col_idx + 14
        }

print(f"Product headers: {len(product_headers)}")

df_materials = pd.read_excel(dp_file, sheet_name="DP Shortage", header=None, skiprows=22, nrows=520, usecols=[0, 1, 2, 3, 4, 5, 10, 13])
df_materials.columns = ['Material', 'Material_Description', 'Model', 'Product_Family', 'Section', 'Common_Unique', 'Total_Lead_Time', 'BUoM']
df_materials['Material_Normalized'] = df_materials['Material'].apply(normalize_product_no)
df_materials_filtered = df_materials[(df_materials['Material_Normalized'].notna()) & (df_materials['Material_Normalized'] != '0')].copy()
print(f"Materials: {len(df_materials_filtered)}")

df_qty = pd.read_excel(dp_file, sheet_name="DP Shortage", header=None, skiprows=22, nrows=520, usecols=range(14, 135))
qty_col_map = {}
for product_id, info in product_headers.items():
    qty_col_map[product_id] = info['Column_Index'] - 14

print("Data loaded successfully")

Product headers: 87
Materials: 520
Data loaded successfully


## Extracting materials

In [0]:
## Extracting materials
print("Extracting materials per product...")
product_materials = {}

for product_id, col_idx_in_qty in qty_col_map.items():
    qty_values = df_qty.iloc[:, col_idx_in_qty]
    valid_qty_mask = qty_values.apply(is_valid_qty)
    valid_row_indices = df_materials_filtered.index[valid_qty_mask[df_materials_filtered.index]].tolist()
    
    if len(valid_row_indices) == 0:
        continue
    
    materials_for_product = df_materials_filtered.loc[valid_row_indices].copy()
    materials_for_product['QTY'] = qty_values[valid_row_indices].values
    materials_for_product['Product_ID'] = product_id
    product_materials[product_id] = materials_for_product

print(f"Extracted for {len(product_materials)} products")

Extracting materials per product...
Extracted for 87 products


## Loading Resources and SKU's


In [0]:
## Loading Resources and SKU's
print("Loading Resource and SKU data...")

resource_data = {}
try:
    df_resources = pd.read_excel(snp_file, sheet_name="DP Line Utilization", header=None, skiprows=2, nrows=240, usecols=[1, 2, 4])
    df_resources.columns = ['Resource_ID', 'Resource_Description', 'Product_ID']
    for _, row in df_resources.iterrows():
        prod_id = normalize_product_no(row['Product_ID'])
        if prod_id:
            resource_data[prod_id] = {
                'Resource_ID': normalize_text(row['Resource_ID']),
                'Resource_Description': normalize_text(row['Resource_Description'])
            }
except Exception as e:
    print(f"Note: {e}")

sku_data = {}
try:
    df_adv = pd.read_excel(snp_file, sheet_name="Adv Mkt-Mar'25", header=None, skiprows=2, nrows=363, usecols=[1, 3, 5, 8])
    df_adv.columns = ['Product_ID', 'SKU', 'Country', 'Pack_Size']
    for _, row in df_adv.iterrows():
        prod_id = normalize_product_no(row['Product_ID'])
        if prod_id and prod_id not in sku_data:
            sku_data[prod_id] = {'SKU': normalize_text(row['SKU']), 'Country': normalize_text(row['Country']), 'Pack_Size': row['Pack_Size']}
except Exception:
    pass

try:
    df_em = pd.read_excel(snp_file, sheet_name="EM-Mar'25", header=None, skiprows=2, nrows=44, usecols=[1, 6, 12, 14])
    df_em.columns = ['Product_ID', 'SKU', 'Country', 'Pack_Size']
    for _, row in df_em.iterrows():
        prod_id = normalize_product_no(row['Product_ID'])
        if prod_id and prod_id not in sku_data:
            sku_data[prod_id] = {'SKU': normalize_text(row['SKU']), 'Country': normalize_text(row['Country']), 'Pack_Size': row['Pack_Size']}
except Exception:
    pass

print(f"Resource data: {len(resource_data)}")
print(f"SKU data: {len(sku_data)}")

Loading Resource and SKU data...
Resource data: 64
SKU data: 76


In [0]:
# SKU hierarchy
print("Defining hierarchy...")

product_mapping = {}

mCB_skus = ['800004403', '800004402', '800008019', '800008020', '800008034', '800007997', '800007345', '800007516',
            '800002513', '800007608', '800007630', '800002984', '800004986', '800007310', '800007311', '800006648',
            '800007634', '800008073', '800006523', '800002297', '800002872', '800006741', '800007380']

sMCB_skus = ['800006506', '800006505', '800006527', '800006526', '800006525', '800007546', '800007583', '800007839',
             '800006524', '800006627', '800007872']

vial_skus = ['800004400', '800004401', '800006626', '800006740', '800007996']

aspart_dlp_skus = ['800008016', '800002958', '800002948', '800006528', '800002989', '800003528', '800006592', '800006691']

aspart_vial_skus = ['800008017', '800006529']

rhi_skus = ['800001300', '800001298', '800001299']

for sku in mCB_skus:
    product_mapping[sku] = {'assembly': '700003964', 'filling': '700001012', 'root': '700001470', 'family': 'Glargine_mCB_DLP'}

for sku in sMCB_skus:
    product_mapping[sku] = {'assembly': '700004129', 'filling': '700004130', 'root': '700004130', 'family': 'Glargine_sMCB_DLP_EU'}

for sku in vial_skus:
    product_mapping[sku] = {'assembly': '700001123', 'filling': '700001123', 'root': '700001123', 'family': 'Glargine_Vial'}

for sku in aspart_dlp_skus:
    product_mapping[sku] = {'assembly': '700002770', 'filling': '700001301', 'root': '700001301', 'family': 'Aspart_DLP'}

for sku in aspart_vial_skus:
    product_mapping[sku] = {'assembly': '700001318', 'filling': '700001318', 'root': '700001318', 'family': 'Aspart_Vial'}

for sku in rhi_skus:
    product_mapping[sku] = {'assembly': '700000536', 'filling': '700000536', 'root': '700000536', 'family': 'RHI'}

print(f"Hierarchy defined with {len(product_mapping)} SKU mappings")

Defining hierarchy...
Hierarchy defined with 52 SKU mappings


## Creating per-SKU pegging sheets


In [0]:
# Creating per-SKU pegging sheets
print("Creating per-SKU pegging sheets...")

output_cols = ['BOM_Type', 'BOM_Level', 'Product_ID', 'Product_Description', 'SKU', 'Country', 'Pack_Size',
               'Material', 'Material_Description', 'QTY', 'Section', 'Product_Family', 'Common_Unique',
               'Total_Lead_Time', 'BUoM', 'Model', 'Resource_ID', 'Resource_Description', 'Batch_Size']

sku_pegging_sheets = {}

for market_sku, mapping_info in sorted(product_mapping.items()):
    assembly_id = mapping_info.get('assembly')
    filling_id = mapping_info.get('filling')
    family_name = mapping_info.get('family', 'Unknown')
    
    pegging_data = []
    
    if market_sku in product_materials:
        for _, mat_row in product_materials[market_sku].iterrows():
            sku_info = sku_data.get(market_sku, {})
            product_info = product_headers.get(market_sku, {})
            resource_info = resource_data.get(market_sku, {})
            
            pegging_data.append({
                'BOM_Type': 'Packing',
                'BOM_Level': 'L1_Market_SKU',
                'Product_ID': market_sku,
                'Product_Description': product_info.get('Product_Description', 'N/A'),
                'SKU': sku_info.get('SKU', 'N/A'),
                'Country': sku_info.get('Country', 'N/A'),
                'Pack_Size': sku_info.get('Pack_Size', 'N/A'),
                'Material': normalize_text(mat_row['Material_Normalized']),
                'Material_Description': normalize_text(mat_row['Material_Description']),
                'QTY': mat_row['QTY'],
                'Section': normalize_text(mat_row['Section']),
                'Product_Family': family_name,
                'Common_Unique': normalize_text(mat_row['Common_Unique']),
                'Total_Lead_Time': mat_row['Total_Lead_Time'],
                'BUoM': normalize_text(mat_row['BUoM']),
                'Model': extract_model_components(mat_row['Model']),
                'Resource_ID': resource_info.get('Resource_ID', 'N/A'),
                'Resource_Description': resource_info.get('Resource_Description', 'N/A'),
                'Batch_Size': product_info.get('Batch_Size', 'N/A')
            })
    
    pegging_data.append({col: '0' if col == 'Product_ID' else None for col in output_cols})
    
    if assembly_id != market_sku and assembly_id in product_materials:
        for _, mat_row in product_materials[assembly_id].iterrows():
            product_info = product_headers.get(assembly_id, {})
            resource_info = resource_data.get(assembly_id, {})
            
            pegging_data.append({
                'BOM_Type': 'Assembly',
                'BOM_Level': 'L2_Assembly',
                'Product_ID': assembly_id,
                'Product_Description': product_info.get('Product_Description', 'N/A'),
                'SKU': 'N/A',
                'Country': 'N/A',
                'Pack_Size': 'N/A',
                'Material': normalize_text(mat_row['Material_Normalized']),
                'Material_Description': normalize_text(mat_row['Material_Description']),
                'QTY': mat_row['QTY'],
                'Section': normalize_text(mat_row['Section']),
                'Product_Family': family_name,
                'Common_Unique': normalize_text(mat_row['Common_Unique']),
                'Total_Lead_Time': mat_row['Total_Lead_Time'],
                'BUoM': normalize_text(mat_row['BUoM']),
                'Model': extract_model_components(mat_row['Model']),
                'Resource_ID': resource_info.get('Resource_ID', 'N/A'),
                'Resource_Description': resource_info.get('Resource_Description', 'N/A'),
                'Batch_Size': product_info.get('Batch_Size', 'N/A')
            })
    
    pegging_data.append({col: '0' if col == 'Product_ID' else None for col in output_cols})
    
    if filling_id != assembly_id and filling_id in product_materials:
        for _, mat_row in product_materials[filling_id].iterrows():
            product_info = product_headers.get(filling_id, {})
            resource_info = resource_data.get(filling_id, {})
            
            pegging_data.append({
                'BOM_Type': 'Filling',
                'BOM_Level': 'L3_Filling',
                'Product_ID': filling_id,
                'Product_Description': product_info.get('Product_Description', 'N/A'),
                'SKU': 'N/A',
                'Country': 'N/A',
                'Pack_Size': 'N/A',
                'Material': normalize_text(mat_row['Material_Normalized']),
                'Material_Description': normalize_text(mat_row['Material_Description']),
                'QTY': mat_row['QTY'],
                'Section': normalize_text(mat_row['Section']),
                'Product_Family': family_name,
                'Common_Unique': normalize_text(mat_row['Common_Unique']),
                'Total_Lead_Time': mat_row['Total_Lead_Time'],
                'BUoM': normalize_text(mat_row['BUoM']),
                'Model': extract_model_components(mat_row['Model']),
                'Resource_ID': resource_info.get('Resource_ID', 'N/A'),
                'Resource_Description': resource_info.get('Resource_Description', 'N/A'),
                'Batch_Size': product_info.get('Batch_Size', 'N/A')
            })
    
    df_pegging = pd.DataFrame(pegging_data)[output_cols]
    sku_pegging_sheets[market_sku] = df_pegging

print(f"Created pegging sheets for {len(sku_pegging_sheets)} market SKUs")

Creating per-SKU pegging sheets...
Created pegging sheets for 52 market SKUs


## Exporting to CSV

In [0]:
import os

# Save each SKU pegging sheet as CSV to DBFS
dbfs_output_dir = "/dbfs/FileStore/tables/output"
os.makedirs(dbfs_output_dir, exist_ok=True)

for idx, (sku, pegging_df) in enumerate(sorted(sku_pegging_sheets.items()), 1):
    csv_path = os.path.join(dbfs_output_dir, f"Pegging_{sku}.csv")
    pegging_df.to_csv(csv_path, index=False)
    if idx % 10 == 0 or idx == len(sku_pegging_sheets):
        print(f"  Exported {idx}/{len(sku_pegging_sheets)} SKU pegging sheets")

print(f"\nExported CSVs to: {dbfs_output_dir}")
print(f"Total sheets: {len(sku_pegging_sheets)}")

  Exported 10/52 SKU pegging sheets
  Exported 20/52 SKU pegging sheets
  Exported 30/52 SKU pegging sheets
  Exported 40/52 SKU pegging sheets
  Exported 50/52 SKU pegging sheets
  Exported 52/52 SKU pegging sheets

Exported CSVs to: /dbfs/FileStore/tables/output
Total sheets: 52
