# Metadata-Only Classification Pipeline

End-to-end workflow that ingests the lifecycle catalog, derives metadata-based features (no file content reads), and trains LightGBM/XGBoost models.

## Notebook Outline
- Configure paths and dependencies
- Ingest the catalog (Excel) and derive metadata features
- Build the modeling dataset using path-based features only
- Train/test split and preprocessing pipeline
- Benchmark LightGBM and XGBoost classifiers
- Persist artifacts under `artifacts_metadata_only/`

In [None]:
# Optional: install dependencies before first run.
# !pip install pandas openpyxl scikit-learn lightgbm xgboost joblib

In [None]:
from __future__ import annotations

import re
from pathlib import Path
from typing import Iterable

import numpy as np
import pandas as pd

from sklearn.base import clone
from sklearn.compose import ColumnTransformer
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics import accuracy_score, classification_report
from sklearn.model_selection import StratifiedKFold, train_test_split, cross_validate
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import OneHotEncoder
import joblib

from lightgbm import LGBMClassifier
from xgboost import XGBClassifier

pd.set_option("display.max_columns", None)
pd.set_option("display.width", 140)

In [None]:
from sklearn import set_config
set_config(transform_output='pandas')

In [None]:
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
PROJECT_ROOT = Path('..').resolve()
EXCEL_PATH = PROJECT_ROOT / 'assets' / 'training_data.xlsx'
RAW_BASE_DIR = PROJECT_ROOT

OUTPUT_DIR = Path.cwd() / 'artifacts_metadata_only'
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)

CURATED_DATA_PATH = OUTPUT_DIR / 'metadata_features_metadata_only.csv'
TRAIN_DATA_PATH = OUTPUT_DIR / 'train_dataset_metadata_only.csv'
TEST_DATA_PATH = OUTPUT_DIR / 'test_dataset_metadata_only.csv'
MODEL_DIR = OUTPUT_DIR / 'models'
MODEL_DIR.mkdir(exist_ok=True)
MODEL_COMPARISON_PATH = OUTPUT_DIR / 'model_comparison.csv'

TARGET_COLUMN = 'business_capability'
TEXT_FEATURE = 'original_path_keywords'
CATEGORICAL_FEATURES = ['extension', 'extension_family']
NUMERIC_FEATURES = ['original_path_depth', 'file_size_bytes']
FEATURE_COLUMNS = [TEXT_FEATURE] + CATEGORICAL_FEATURES + NUMERIC_FEATURES

REQUIRED_COLUMNS = ['Original File Path', 'File Path', 'Business Capability']
OPTIONAL_COLUMNS = ['Record Type', 'Retention Code', 'Notes']

RANDOM_STATE = 42
TEST_SIZE = 0.2
CV_FOLDS = 3  # Reduced to handle rare classes

print(f"Excel catalog path: {EXCEL_PATH}")
print(f"Artifacts directory: {OUTPUT_DIR}")

In [None]:
RENAME_MAP = {
    'Original File Path': 'original_file_path',
    'File Path': 'file_path',
    'Business Capability': 'business_capability',
    'Record Type': 'record_type',
    'Retention Code': 'retention_code',
    'Notes': 'notes_text',
}


def load_catalog(path: Path, required_columns: Iterable[str], optional_columns: Iterable[str]) -> pd.DataFrame:
    if not path.exists():
        raise FileNotFoundError(f'Catalog not found: {path}')
    df = pd.read_excel(path)
    missing = [col for col in required_columns if col not in df.columns]
    if missing:
        raise ValueError(f'Missing required columns in catalog: {missing}')
    available_optional = [col for col in optional_columns if col in df.columns]
    df = df[list(required_columns) + available_optional].copy()
    df = df.rename(columns=RENAME_MAP)
    for column in ['original_file_path', 'file_path', 'business_capability']:
        df[column] = df[column].astype(str).str.strip()
    return df

In [None]:
KEYWORD_CLEANER = re.compile(r'[^A-Za-z0-9]+')


def safe_path(value: str | float | int | None) -> str:
    if value is None:
        return ''
    if isinstance(value, float) and np.isnan(value):
        return ''
    return str(value).strip()


def resolve_path(raw_value: str, base_dir: Path) -> Path:
    candidate_str = safe_path(raw_value)
    if not candidate_str:
        return base_dir
    try:
        candidate = Path(candidate_str)
    except Exception:
        candidate = base_dir / candidate_str
    if not candidate.is_absolute():
        candidate = base_dir / candidate
    try:
        return candidate.resolve(strict=False)
    except Exception:
        return candidate


def to_keywords_from_path(path: Path) -> str:
    tokens = []
    for part in path.parts:
        cleaned = KEYWORD_CLEANER.sub(' ', part).strip().lower()
        if cleaned:
            tokens.append(cleaned)
    return ' '.join(tokens)


def path_depth(path: Path) -> int:
    return len(path.parts)


EXTENSION_FAMILY_MAP = {
    '.xls': 'excel',
    '.xlsx': 'excel',
    '.xlsm': 'excel',
    '.xlsb': 'excel',
    '.csv': 'tabular',
    '.tsv': 'tabular',
    '.txt': 'text',
    '.log': 'text',
    '.json': 'json',
    '.xml': 'markup',
    '.yaml': 'markup',
    '.yml': 'markup',
    '.ini': 'config',
    '.cfg': 'config',
    '.conf': 'config',
    '.doc': 'word',
    '.docx': 'word',
    '.pdf': 'pdf',
    '.ppt': 'presentation',
    '.pptx': 'presentation',
    '.msg': 'outlook',
    '.html': 'html',
    '.htm': 'html',
    '.css': 'code',
    '.js': 'code',
    '.sql': 'code',
    '.py': 'code',
    '.sas': 'sas',
    '.sas7bdat': 'sas',
    '.ipynb': 'notebook',
    '.jpg': 'image',
    '.jpeg': 'image',
    '.png': 'image',
    '.gif': 'image',
    '.bmp': 'image',
    '.vsd': 'visio',
    '.vsdx': 'visio',
    '.twb': 'tableau',
    '.twbx': 'tableau',
}


def extension_family(suffix: str) -> str:
    return EXTENSION_FAMILY_MAP.get(suffix.lower(), 'other')


def gather_file_stats(path: Path) -> tuple[int, float]:
    try:
        stat_result = path.stat()
        return stat_result.st_size, float(stat_result.st_mtime)
    except OSError:
        return 0, float('nan')

In [None]:
def remove_illegal_characters(df: pd.DataFrame) -> pd.DataFrame:
    """
    Remove illegal or non-printable characters from all string columns in a DataFrame.
    """
    illegal_chars = [
        '\x00', '\x01', '\x02', '\x03', '\x04', '\x05',
        '\x06', '\x07', '\x08', '\x0b', '\x0c', '\x0e',
        '\x0f', '\x10', '\x11', '\x12', '\x13', '\x14',
        '\x15', '\x16', '\x17', '\x18', '\x19', '\x1a'
    ]

    def clean_value(x):
        if pd.isnull(x):
            return x
        return ''.join(c for c in str(x) if c.isprintable() and c not in illegal_chars)

    str_cols = df.select_dtypes(include=['object']).columns
    df[str_cols] = df[str_cols].applymap(clean_value)
    return df


In [None]:
catalog_df = load_catalog(EXCEL_PATH, REQUIRED_COLUMNS, OPTIONAL_COLUMNS)

catalog_df['resolved_path'] = catalog_df['file_path'].apply(lambda value: resolve_path(value, RAW_BASE_DIR))
catalog_df['original_resolved_path'] = catalog_df['original_file_path'].apply(lambda value: resolve_path(value, RAW_BASE_DIR))
catalog_df['extension'] = catalog_df['resolved_path'].apply(lambda p: p.suffix.lower())
catalog_df['extension_family'] = catalog_df['extension'].apply(extension_family)
catalog_df['original_path_depth'] = catalog_df['original_resolved_path'].apply(path_depth)
catalog_df['original_path_keywords'] = catalog_df['original_resolved_path'].apply(to_keywords_from_path)

size_mtime = catalog_df['resolved_path'].apply(gather_file_stats)
catalog_df['file_size_bytes'] = [pair[0] for pair in size_mtime]

catalog_df.head()

catalog_df = remove_illegal_characters(catalog_df)
cleaned_excel_path = OUTPUT_DIR / 'input_data_cleaned.xlsx'
try:
    catalog_df.to_excel(cleaned_excel_path, index=False, engine='openpyxl')
    print(f'Saved cleaned catalog to {cleaned_excel_path}')
except Exception as excel_error:
    print('Excel export skipped:', excel_error)


In [None]:
feature_df = catalog_df[
    [
        'original_file_path',
        'file_path',
        'business_capability',
        'original_path_keywords',
        'extension',
        'extension_family',
        'original_path_depth',
        'file_size_bytes',
    ]
].copy()

feature_df['extension'] = feature_df['extension'].fillna('').astype(str)
feature_df['extension_family'] = feature_df['extension_family'].fillna('').astype(str)
feature_df['original_path_keywords'] = feature_df['original_path_keywords'].fillna('').astype(str)
feature_df['business_capability'] = feature_df['business_capability'].fillna('').astype(str)
feature_df['original_path_depth'] = pd.to_numeric(feature_df['original_path_depth'], errors='coerce').fillna(0).astype(int)
feature_df['file_size_bytes'] = pd.to_numeric(feature_df['file_size_bytes'], errors='coerce').fillna(0).astype(float)

feature_df.head()

In [None]:
feature_df.to_csv(CURATED_DATA_PATH, index=False)
print(f'Saved curated dataset to {CURATED_DATA_PATH}')

In [None]:
modeling_df = feature_df[FEATURE_COLUMNS + [TARGET_COLUMN]].copy()
modeling_df = modeling_df[modeling_df[TARGET_COLUMN].str.strip() != '']

for column in [TEXT_FEATURE] + CATEGORICAL_FEATURES:
    modeling_df[column] = modeling_df[column].fillna('').astype(str)

for column in NUMERIC_FEATURES:
    modeling_df[column] = pd.to_numeric(modeling_df[column], errors='coerce').fillna(0.0).astype(float)

print('Class distribution:')
print(modeling_df[TARGET_COLUMN].value_counts())
modeling_df.head()

In [None]:
train_df, test_df = train_test_split(
    modeling_df,
    test_size=TEST_SIZE,
    stratify=modeling_df[TARGET_COLUMN],
    random_state=RANDOM_STATE,
)

train_df.to_csv(TRAIN_DATA_PATH, index=False)
test_df.to_csv(TEST_DATA_PATH, index=False)

print(f'Train rows: {len(train_df)} | Test rows: {len(test_df)}')

In [None]:
text_vectorizer = TfidfVectorizer(
    max_features=20000,
    ngram_range=(1, 3),
    min_df=1,
    strip_accents='unicode',
)

preprocessor = ColumnTransformer(
    transformers=[
        ('path_tfidf', text_vectorizer, TEXT_FEATURE),
        ('categorical', OneHotEncoder(handle_unknown='ignore'), CATEGORICAL_FEATURES),
        ('numeric', 'passthrough', NUMERIC_FEATURES),
    ],
    remainder='drop',
    sparse_threshold=0.3,
)

preprocessor_path = OUTPUT_DIR / 'preprocessor_metadata_only.joblib'
joblib.dump(preprocessor, preprocessor_path)
print(f'Saved preprocessing template to {preprocessor_path}')

In [None]:
X_train = train_df[FEATURE_COLUMNS]
    y_train = train_df[TARGET_COLUMN]
    X_test = test_df[FEATURE_COLUMNS]
    y_test = test_df[TARGET_COLUMN]

    candidate_models = {
        'lightgbm': LGBMClassifier(
            n_estimators=400,
            learning_rate=0.05,
            num_leaves=64,
            objective='multiclass',
            random_state=RANDOM_STATE,
            n_jobs=-1,
        ),
        'xgboost': XGBClassifier(
            n_estimators=500,
            learning_rate=0.05,
            max_depth=9,
            subsample=0.9,
            colsample_bytree=0.9,
            objective='multi:softprob',
            eval_metric='mlogloss',
            random_state=RANDOM_STATE,
            tree_method='hist',
            n_jobs=-1,
            use_label_encoder=False,
        ),
    }

    min_class_count = int(y_train.value_counts().min())
    effective_folds = max(2, min(CV_FOLDS, min_class_count))
    if effective_folds < CV_FOLDS:
        print(f'Adjusting CV folds from {CV_FOLDS} to {effective_folds} due to limited samples per class.')

    cv = StratifiedKFold(n_splits=effective_folds, shuffle=True, random_state=RANDOM_STATE)

    evaluation_rows = []
    model_reports = {}

    for model_name, estimator in candidate_models.items():
        print(f'
Training model: {model_name}')
        pipeline = Pipeline([
            ('preprocessor', clone(preprocessor)),
            ('classifier', estimator),
        ])

        cv_scores = cross_validate(
            pipeline,
            X_train,
            y_train,
            cv=cv,
            scoring=['accuracy', 'f1_macro', 'f1_weighted'],
            n_jobs=-1,
            return_train_score=False,
        )

        pipeline.fit(X_train, y_train)
        y_pred = pipeline.predict(X_test)

        report = classification_report(y_test, y_pred, output_dict=True, zero_division=0)
        model_reports[model_name] = report

        evaluation_rows.append({
            'model': model_name,
            'cv_accuracy_mean': cv_scores['test_accuracy'].mean(),
            'cv_accuracy_std': cv_scores['test_accuracy'].std(),
            'cv_macro_f1_mean': cv_scores['test_f1_macro'].mean(),
            'cv_weighted_f1_mean': cv_scores['test_f1_weighted'].mean(),
            'test_accuracy': accuracy_score(y_test, y_pred),
            'test_macro_f1': report['macro avg']['f1-score'],
            'test_weighted_f1': report['weighted avg']['f1-score'],
        })

        joblib.dump(pipeline, MODEL_DIR / f'{model_name}_pipeline.joblib')

In [None]:
results_df = pd.DataFrame(evaluation_rows).sort_values(by='test_weighted_f1', ascending=False).reset_index(drop=True)
results_df.to_csv(MODEL_COMPARISON_PATH, index=False)
results_df

In [None]:
best_model_name = results_df.iloc[0]['model']
print(f'Best model based on weighted F1: {best_model_name}')

best_report = pd.DataFrame(model_reports[best_model_name]).T
display(best_report)

BEST_MODEL_PATH = MODEL_DIR / f'{best_model_name}_pipeline.joblib'
print(f'Saved best model pipeline to {BEST_MODEL_PATH}')