# Data cleaning & Integration

## Libraries

In [13]:
pip install fastexcel

Note: you may need to restart the kernel to use updated packages.


In [14]:
pip install xlsxwriter

Note: you may need to restart the kernel to use updated packages.


Side note: It will be useful to put all the required libraries in a requirements.txt file

In [15]:
import pandas as pd
import polars as pl
import re

from datetime import datetime
from dateutil.relativedelta import relativedelta

## Global variables

In [16]:
# Input paths
PATH_DENSITY_REPORT       = 'DensityReports.xlsx'
PATH_HISTORICAL_INCIDENTS = 'HistoricalIncidents.xlsx'
PATH_PRODUCT_ATTRIBUTES   = 'ProductAttributes.xlsx'
PATH_SUPPLIER_SCORECARD   = 'SupplierScorecard.xlsx'

# Export paths
EXPORT_DENSITY_REPORT       = 'density_report.xlsx'
EXPORT_HISTORICAL_INCIDENTS = 'historical_incidents.xlsx'
EXPORT_PRODUCT_ATTRIBUTES   = 'product_attributes.xlsx'
EXPORT_SUPPLIER_SCORECARD   = 'supplier_scorecard.xlsx'
EXPORT_FULL_JOIN            = 'full_join.xlsx'

## Global functions

### 1. Read excel files with Polars

In [17]:
def polars_read_excel(file_name, sheet_name='Sheet1'):
  return pl.read_excel(source=file_name, sheet_name=sheet_name)

### 2. Removing invalid strings in Product Reference

In [19]:
def clean_product_reference_column(df: pl.DataFrame, column_name: str, valid_pattern: str = r"^PRD\d{5}$") -> pl.DataFrame:
    # Ensure the column is cast to string
    series_string = df[column_name].cast(pl.Utf8, strict=False)

    # Remove trailing 'X' characters
    cleaned_series = series_string.str.strip_chars_end('X')

    # Check which values match the valid pattern
    is_valid = cleaned_series.str.contains(valid_pattern)

    # Replace invalid entries with None
    corrected_series = pl.when(is_valid).then(cleaned_series).otherwise(None)

    # Return the dataframe with the updated column
    return df.with_columns([corrected_series.alias(column_name)])

### 3. Modify column based on a dictionary

In [20]:
def clean_column_with_mapping(df: pl.DataFrame, col_name: str, mapping_dict: dict) -> pl.DataFrame:
    cleaned_col = (
        df[col_name]
        .cast(pl.Utf8, strict=False)
        .str.strip_chars()
        .str.replace_all(" ", "")
        .map_elements(lambda val: mapping_dict.get(val, val), return_dtype=pl.Utf8)
    )

    return df.with_columns([cleaned_col.alias(col_name)])

### 4. Most frequent combination of columns

In [21]:
def most_common_combination(df: pl.DataFrame, group_cols: list[str], additional_cols: list[str] = []) -> pl.DataFrame:
  return (
      df
      .group_by(group_cols + additional_cols)
      .agg(pl.len().alias("count"))
      .sort(group_cols + ["count"], descending=[False] * len(group_cols) + [True])
      .unique(subset=group_cols, keep='first')
    )

### 5. Replace invalid values

In [22]:
def replace_invalid_values(df: pl.DataFrame, target_col: str, invalid_values: list[str], group_cols: list[str]) -> pl.DataFrame:

    # Get most common valid values per group
    df_most_common = most_common_combination(
        df.filter(~pl.col(target_col).is_in(invalid_values)),
        group_cols,
        additional_cols=[target_col]
    )

    # Build replacements for each invalid value
    fixed_parts = []
    for val in invalid_values:
        df_invalid = df.filter(pl.col(target_col) == val)
        df_replaced = (
            df_invalid.drop(target_col)
            .join(
                df_most_common.select(group_cols + [target_col]),
                on=group_cols,
                how="left"
            )
        )
        fixed_parts.append(df_replaced)

    # Retain all valid entries
    df_cleaned = df.filter(~pl.col(target_col).is_in(invalid_values))

    # Ensure consistent column order
    column_order = df_cleaned.columns
    fixed_parts = [part.select(column_order) for part in fixed_parts]

    return pl.concat([df_cleaned] + fixed_parts, how="vertical")

### 6. Supplier Scorecard aggregation function

In [23]:
def aggregate_supplier_scorecard(df_scorecard: pl.DataFrame) -> pl.DataFrame:
    # Aggregate core metrics
    df_agg = df_scorecard.group_by(['SupplierName', 'Month']).agg([
        pl.sum('PackagesHandled').alias('PackagesHandled'),
        pl.sum('TotalIncidents').alias('TotalIncidents'),
        pl.sum('AnomaliesDetected').alias('AnomaliesDetected'),
        (pl.col('PackagesHandled') * pl.col('BadPackagingRate (%)') / 100).sum().alias('TotalBadPackages'),
        (pl.col('PackagesHandled') * pl.col('OnTimeDeliveryRate (%)') / 100).sum().alias('TotalOnTimePackages'),
        (pl.col('AverageCostPerIncident (€)') * pl.col('TotalIncidents')).sum().alias('TotalIncidentCost')
    ])

    # Calculate final KPIs and round
    ph = pl.col('PackagesHandled')
    ti = pl.col('TotalIncidents')

    df_final = df_agg.with_columns([
        (pl.when(ph > 0).then(pl.col('TotalBadPackages') * 100 / ph).otherwise(0.0))
        .round(2).alias('BadPackagingRate (%)'),

        (pl.when(ph > 0).then(pl.col('TotalOnTimePackages') * 100 / ph).otherwise(None))
        .round(2).alias('OnTimeDeliveryRate (%)'),

        (pl.when(ti > 0).then(pl.col('TotalIncidentCost') / ti).otherwise(0.0))
        .round(2).alias('AverageCostPerIncident (€)')
    ]).drop([
        'TotalBadPackages',
        'TotalOnTimePackages',
        'TotalIncidentCost'
    ])

    return df_final


## File reading

In [24]:
df_density_report       = polars_read_excel(PATH_DENSITY_REPORT)
df_historical_incidents = polars_read_excel(PATH_HISTORICAL_INCIDENTS)
df_product_attributes   = polars_read_excel(PATH_PRODUCT_ATTRIBUTES)
df_supplier_scorecard   = polars_read_excel(PATH_SUPPLIER_SCORECARD)

## Execution

### 1. Density Report

#### 1.1 Product Reference correction

In [25]:
df_density_report = clean_product_reference_column(df_density_report, "ProductReference")

#### 1.2 Naming consistency generation

In [26]:
# Define mappings
supplier_mapping = {
    'SuplA':     'SupplierA',
    'supplierA': 'SupplierA',
    'SuppB':     'SupplierB',
    'SupllierC': 'SupplierC',
    'SPLF':      'SupplierF',
    'supplierh': 'SupplierH',
}

method_mapping = {
    'Methd1': 'Method1',
    'Method_2': 'Method2',
}

layout_mapping = {
    'layouta': 'LayoutA',
    'LayC': 'LayoutC',
}

quality_mapping = {
    'GOOD': 'Good',
    'bad': 'Bad',
}

# Apply supplier mapping
df_density_report = clean_column_with_mapping(df_density_report, 'SupplierName', supplier_mapping)

# Apply method, layout, and quality mappings
df_density_report = clean_column_with_mapping(df_density_report, 'ProposedFoldingMethod', method_mapping)
df_density_report = clean_column_with_mapping(df_density_report, 'ProposedLayout', layout_mapping)
df_density_report = clean_column_with_mapping(df_density_report, 'PackagingQuality', quality_mapping)

#### 1.3 Incorrect input labels modification

In [27]:
# Define grouping columns
group_cols_for_folding = [
    "SupplierName",
    "GarmentType",
    "Material",
    "Weight",
    "ProposedUnitsPerCarton",
    "ProposedLayout",
    "PackagingQuality"
]

group_cols_for_layout = [
    "SupplierName",
    "GarmentType",
    "Material",
    "Weight",
    "ProposedUnitsPerCarton",
    "ProposedFoldingMethod",
    "PackagingQuality"
]

# Replace invalid values in ProposedFoldingMethod
df_density_report = replace_invalid_values(
    df_density_report,
    target_col="ProposedFoldingMethod",
    invalid_values=["FoldX", "None"],
    group_cols=group_cols_for_folding
)

# Replace invalid values in ProposedLayout
df_density_report = replace_invalid_values(
    df_density_report,
    target_col="ProposedLayout",
    invalid_values=["Box9", "LayoutX"],
    group_cols=group_cols_for_layout
)


#### 1.4 Drop incorrect values of number of units

In [28]:
# Clean ProposedUnitsPerCarton: set invalid values to null
df_density_report = df_density_report.with_columns(
    pl.when(
        (pl.col("ProposedUnitsPerCarton") <= 0) |
        (pl.col("ProposedUnitsPerCarton") % 1 != 0) |
        (pl.col("ProposedUnitsPerCarton") > 100)
    )
    .then(None)
    .otherwise(pl.col("ProposedUnitsPerCarton"))
    .alias("ProposedUnitsPerCarton")
)

### 2. Historical Incidents

#### 2.1 Product Reference correction

In [29]:
df_historical_incidents = clean_product_reference_column(df_historical_incidents, "ProductReference")

#### 2.2 Naming consistency generation

In [30]:
# Define mappings
supplier_mapping = {
    'SuplA':     'SupplierA',
    'supplierA': 'SupplierA',
    'SuppB':     'SupplierB',
    'SupllierC': 'SupplierC',
    'SPLF':      'SupplierF',
    'supplierh': 'SupplierH',
}

# Apply supplier mapping
df_historical_incidents = clean_column_with_mapping(df_historical_incidents, 'SupplierName', supplier_mapping)

#### 3.3 

### 3. Product Attributes

No further cleaning of this dataset is necessary.

### 4. Supplier Scorecard

#### 4.1 Naming consistency generation

In [31]:
# Define mappings
supplier_mapping = {
    'SuplA':     'SupplierA',
    'supplierA': 'SupplierA',
    'SuppB':     'SupplierB',
    'SupllierC': 'SupplierC',
    'SPLF':      'SupplierF',
    'supplierh': 'SupplierH',
}

# Apply supplier mapping
df_supplier_scorecard = clean_column_with_mapping(df_supplier_scorecard, 'SupplierName', supplier_mapping)

#### 4.2 Grouping correction

In [32]:
df_supplier_scorecard_agg = aggregate_supplier_scorecard(df_supplier_scorecard)

### 5. Export separate files

In [33]:
df_density_report.write_excel(EXPORT_DENSITY_REPORT)
df_historical_incidents.write_excel(EXPORT_HISTORICAL_INCIDENTS)
df_product_attributes.write_excel(EXPORT_PRODUCT_ATTRIBUTES)
df_supplier_scorecard.write_excel(EXPORT_SUPPLIER_SCORECARD)

<xlsxwriter.workbook.Workbook at 0x32a19c690>

## Dataframe integration

### 1. Nulls filtering

In [34]:
df_density_report_join = df_density_report.filter(
    pl.col("ProductReference").is_not_null() &
    pl.col("ProposedUnitsPerCarton").is_not_null() &
    pl.col("ProposedFoldingMethod").is_not_null() &
    pl.col("ProposedLayout").is_not_null()
)

df_historical_incidents_join = df_historical_incidents.filter(
    pl.col("ProductReference").is_not_null()
)

df_product_attributes_join = df_product_attributes.filter(
    pl.col("ProductReference").is_not_null()
)

df_supplier_scorecard_join = df_supplier_scorecard_agg.filter(
    pl.col("SupplierName").is_not_null()
)

### 2. Dataframes joining

In [35]:
df_full_join = df_density_report_join.join(
    df_product_attributes_join.select(["ProductReference", "Size", "Collection"]),
    on="ProductReference",
    how="left"
)

# We create a new column to join with the scorecard dataframe
df_full_join = df_full_join.with_columns(
    pl.col("DateOfReport").map_elements(
        lambda d: (d - relativedelta(months=1)).strftime("%Y-%m"),
        return_dtype=pl.Utf8
    ).alias("PreviousMonth")
)

df_full_join = df_full_join.join(
    df_supplier_scorecard_join,
    left_on=["SupplierName", "PreviousMonth"],
    right_on=["SupplierName", "Month"],
    how="left"
)

In [36]:
#  Configuration 
INCIDENT_ID_COL = "IncidentID_AutoGen"
JOIN_KEYS_DENSITY_INCIDENTS = ["ProductReference", "SupplierName"]
TIME_WINDOW_DAYS = 14

# 1. Ensure unique ID for original incidents
df_incidents_with_id = df_historical_incidents.clone()
if INCIDENT_ID_COL not in df_incidents_with_id.columns:
    df_incidents_with_id = df_incidents_with_id.with_row_count(name=INCIDENT_ID_COL)
elif not df_incidents_with_id.get_column(INCIDENT_ID_COL).is_unique().all():
    df_incidents_with_id = df_incidents_with_id.drop(INCIDENT_ID_COL).with_row_count(name=INCIDENT_ID_COL)

# 2. Create candidate matches
df_candidates = df_density_report_join.join(
    df_incidents_with_id,
    on=JOIN_KEYS_DENSITY_INCIDENTS,
    how="inner"
).filter(
    (pl.col("DateOfIncident") >= pl.col("DateOfReport")) &
    (pl.col("DateOfIncident") <= pl.col("DateOfReport") + pl.duration(days=TIME_WINDOW_DAYS))
)

# 3. MODIFIED De-duplication: Each incident links to at most ONE ReportID (its "best" match)
df_report_to_incident_link = pl.DataFrame() # Initialize empty
if df_candidates.height > 0:
    df_candidates_with_lag = df_candidates.with_columns(
        (pl.col("DateOfIncident") - pl.col("DateOfReport")).dt.total_days().alias("days_lag")
    )
    df_sorted_candidates_for_incident = df_candidates_with_lag.sort(
        INCIDENT_ID_COL, "days_lag", "DateOfReport", "ReportID" # Sort order defines "best"
    )
    df_report_to_incident_link = df_sorted_candidates_for_incident.group_by(
        INCIDENT_ID_COL, maintain_order=True
    ).first()
else: # Handle case where df_candidates is empty
    empty_schema_df_for_link = df_density_report_join.head(0).join(
            df_incidents_with_id.head(0), on=JOIN_KEYS_DENSITY_INCIDENTS, how="inner"
        ).with_columns(pl.lit(None, dtype=pl.Int64).alias("days_lag"))
    df_report_to_incident_link = empty_schema_df_for_link.clear()


# 4. Integration
incident_details_to_add = [
    INCIDENT_ID_COL, "DateOfIncident", "CostImpact (€)", "IssueDescription", "ResolutionStatus"
]
cols_from_link_to_select = ["ReportID"] + [
    col for col in incident_details_to_add if col in df_report_to_incident_link.columns
]

df_full_join = df_full_join.join(
    df_report_to_incident_link.select(cols_from_link_to_select),
    on="ReportID",
    how="left"
)




  df_incidents_with_id = df_incidents_with_id.with_row_count(name=INCIDENT_ID_COL)


### 3. Dataframes joining

In [37]:
df_full_join.write_excel(EXPORT_FULL_JOIN)

<xlsxwriter.workbook.Workbook at 0x32a1dd450>