<a href="https://colab.research.google.com/github/nageshnnr/digital-metallurgy-lab/blob/main/Welcome_To_Colab.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
# =======================================================================================
# data_preprocessing.py - Data cleaning and preprocessing utilities
# =======================================================================================

import pandas as pd
import numpy as np
from typing import Dict, List, Tuple

class ScrapDataPreprocessor:
    """
    Preprocessing utilities for galvanized steel scrap survey data
    """

    def __init__(self):
        self.metal_columns = ['Aluminum', 'Copper', 'Zinc', 'Lead', 'Brass', 'Bronze']
        self.product_types = [
            'Galvanized Coils',
            'Galvanized Plain Sheets',
            'Galvanized Corrugated Sheets',
            'Galvanized Pipes & Tubes',
            'Galvanized Wire'
        ]

    def clean_survey_responses(self, raw_data: pd.DataFrame) -> pd.DataFrame:
        """Clean and validate survey response data"""

        # Remove incomplete responses
        cleaned_data = raw_data.dropna(subset=['Facility', 'Product'])

        # Standardize facility names
        cleaned_data['Facility'] = cleaned_data['Facility'].str.strip().str.title()

        # Validate metal percentages sum to ~100%
        metal_sum = cleaned_data[self.metal_columns].sum(axis=1)
        valid_composition = (metal_sum >= 95) & (metal_sum <= 105)
        cleaned_data = cleaned_data[valid_composition]

        # Convert volume units to tonnes
        if 'Volume_Unit' in cleaned_data.columns:
            cleaned_data = self._standardize_volume_units(cleaned_data)

        # Add calculated fields
        cleaned_data = self._add_calculated_fields(cleaned_data)

        return cleaned_data

    def _standardize_volume_units(self, data: pd.DataFrame) -> pd.DataFrame:
        """Convert various volume units to tonnes"""
        conversion_factors = {
            'kg': 0.001,
            'tons': 1.0,
            'tonnes': 1.0,
            'mt': 1.0,
            'pounds': 0.000453592
        }

        for unit, factor in conversion_factors.items():
            mask = data['Volume_Unit'].str.lower() == unit
            data.loc[mask, 'Volume_Tonnes'] = data.loc[mask, 'Volume'] * factor

        return data

    def _add_calculated_fields(self, data: pd.DataFrame) -> pd.DataFrame:
        """Add derived metrics"""

        # Calculate scrap rate if production data available
        if 'Production_Volume' in data.columns:
            data['Scrap_Rate_Percent'] = (data['Monthly_Scrap_Tonnes'] /
                                        data['Production_Volume']) * 100

        # Calculate metal recovery potential
        for metal in ['Zinc', 'Aluminum', 'Copper']:
            if f'{metal}_Percentage' in data.columns:
                data[f'{metal}_Recovery_Tonnes'] = (
                    data['Annual_Scrap_Tonnes'] * data[f'{metal}_Percentage'] / 100
                )

        # Add efficiency category
        if 'Scrap_Rate_Percent' in data.columns:
            data['Efficiency_Category'] = pd.cut(
                data['Scrap_Rate_Percent'],
                bins=[0, 2, 5, 10, float('inf')],
                labels=['Excellent', 'Good', 'Average', 'Needs_Improvement']
            )

        return data

    def validate_data_quality(self, data: pd.DataFrame) -> Dict:
        """Generate data quality report"""

        quality_report = {
            'total_records': len(data),
            'missing_values': data.isnull().sum().to_dict(),
            'duplicate_records': data.duplicated().sum(),
            'data_types': data.dtypes.to_dict(),
            'value_ranges': {}
        }

        # Check value ranges for key metrics
        numeric_columns = data.select_dtypes(include=[np.number]).columns
        for col in numeric_columns:
            quality_report['value_ranges'][col] = {
                'min': float(data[col].min()),
                'max': float(data[col].max()),
                'mean': float(data[col].mean()),
                'outliers': self._detect_outliers(data[col])
            }

        return quality_report

    def _detect_outliers(self, series: pd.Series) -> int:
        """Detect outliers using IQR method"""
        Q1 = series.quantile(0.25)
        Q3 = series.quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR

        outliers = ((series < lower_bound) | (series > upper_bound)).sum()
        return int(outliers)

# =======================================================================================
# visualization_utils.py - Custom visualization functions
# =======================================================================================

import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots

class ScrapVisualizationUtils:
    """
    Custom visualization utilities for scrap analytics
    """

    def __init__(self):
        self.color_palette = px.colors.qualitative.Set3
        self.steel_colors = {
            'Zinc': '#71797E',
            'Aluminum': '#A8A8A8',
            'Copper': '#B87333',
            'Lead': '#2F4F4F',
            'Brass': '#DAA520',
            'Bronze': '#CD7F32'
        }

    def create_scrap_flow_diagram(self, data: pd.DataFrame) -> go.Figure:
        """Create Sankey diagram showing material flow"""

        # Prepare data for Sankey
        products = data['Product'].unique()
        regions = data['Region'].unique()

        # Create nodes
        source_nodes = list(products)
        target_nodes = [f"{region}_Region" for region in regions]
        all_nodes = source_nodes + target_nodes

        # Create links
        links = []
        for i, product in enumerate(products):
            for j, region in enumerate(regions):
                volume = data[(data['Product'] == product) &
                            (data['Region'] == region)]['Annual_Scrap_Tonnes'].sum()
                if volume > 0:
                    links.append({
                        'source': i,
                        'target': len(products) + j,
                        'value': volume
                    })

        fig = go.Figure(data=[go.Sankey(
            node=dict(
                pad=15,
                thickness=20,
                line=dict(color="black", width=0.5),
                label=all_nodes,
                color="blue"
            ),
            link=dict(
                source=[link['source'] for link in links],
                target=[link['target'] for link in links],
                value=[link['value'] for link in links]
            )
        )])

        fig.update_layout(
            title_text="Galvanized Steel Scrap Flow: Products to Regions",
            font_size=12,
            height=600
        )

        return fig

    def create_composition_heatmap(self, data: pd.DataFrame) -> go.Figure:
        """Create heatmap of metal composition by product"""

        metal_cols = ['Zinc_Percentage', 'Aluminum_Percentage', 'Copper_Percentage',
                     'Lead_Percentage', 'Brass_Percentage', 'Bronze_Percentage']

        composition_matrix = data.groupby('Product')[metal_cols].mean()

        fig = go.Figure(data=go.Heatmap(
            z=composition_matrix.values,
            x=[col.replace('_Percentage', '') for col in metal_cols],
            y=composition_matrix.index,
            colorscale='RdYlBu_r',
            text=composition_matrix.values.round(1),
            texttemplate="%{text}%",
            textfont={"size": 10}
        ))

        fig.update_layout(
            title='Metal Composition Heatmap by Product Type',
            xaxis_title='Metal Type',
            yaxis_title='Product Type',
            height=400
        )

        return fig

    def create_optimization_scatter(self, data: pd.DataFrame) -> go.Figure:
        """Create scatter plot for optimization opportunities"""

        fig = px.scatter(
            data,
            x='Production_Capacity',
            y='Scrap_Rate_Percent',
            size='Annual_Scrap_Tonnes',
            color='Region',
            hover_data=['Facility', 'Product'],
            title='Scrap Rate vs Production Capacity - Optimization Opportunities'
        )

        # Add trend line
        fig.add_trace(go.Scatter(
            x=data['Production_Capacity'],
            y=np.poly1d(np.polyfit(data['Production_Capacity'],
                                 data['Scrap_Rate_Percent'], 1))(data['Production_Capacity']),
            mode='lines',
            name='Trend Line',
            line=dict(color='red', dash='dash')
        ))

        fig.update_layout(
            xaxis_title='Production Capacity (tonnes/month)',
            yaxis_title='Scrap Rate (%)',
            height=500
        )

        return fig

# =======================================================================================
# model_evaluation.py - ML model validation utilities
# =======================================================================================

from sklearn.model_selection import cross_val_score, train_test_split
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
import matplotlib.pyplot as plt

class ModelEvaluator:
    """
    Model evaluation and validation utilities
    """

    def __init__(self):
        self.evaluation_metrics = {}

    def evaluate_regression_model(self, model, X, y, cv_folds=5):
        """Comprehensive evaluation of regression model"""

        # Split data
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42
        )

        # Train model
        model.fit(X_train, y_train)

        # Predictions
        y_pred_train = model.predict(X_train)
        y_pred_test = model.predict(X_test)

        # Calculate metrics
        metrics = {
            'train_r2': r2_score(y_train, y_pred_train),
            'test_r2': r2_score(y_test, y_pred_test),
            'train_mae': mean_absolute_error(y_train, y_pred_train),
            'test_mae': mean_absolute_error(y_test, y_pred_test),
            'train_rmse': np.sqrt(mean_squared_error(y_train, y_pred_train)),
            'test_rmse': np.sqrt(mean_squared_error(y_test, y_pred_test))
        }

        # Cross-validation
        cv_scores = cross_val_score(model, X, y, cv=cv_folds, scoring='r2')
        metrics['cv_mean'] = cv_scores.mean()
        metrics['cv_std'] = cv_scores.std()

        self.evaluation_metrics = metrics
        return metrics

    def plot_model_performance(self, model, X_test, y_test):
        """Create model performance visualization"""

        y_pred = model.predict(X_test)

        fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(12, 5))

        # Actual vs Predicted
        ax1.scatter(y_test, y_pred, alpha=0.6)
        ax1.plot([y_test.min(), y_test.max()], [y_test.min(), y_test.max()], 'r--', lw=2)
        ax1.set_xlabel('Actual Values')
        ax1.set_ylabel('Predicted Values')
        ax1.set_title('Actual vs Predicted Values')

        # Residuals
        residuals = y_test - y_pred
        ax2.scatter(y_pred, residuals, alpha=0.6)
        ax2.axhline(y=0, color='r', linestyle='--')
        ax2.set_xlabel('Predicted Values')
        ax2.set_ylabel('Residuals')
        ax2.set_title('Residual Plot')

        plt.tight_layout()
        return fig

In [3]:
# requirements.txt
!pip install \
pandas>=1.5.0 \
numpy>=1.24.0 \
matplotlib>=3.6.0 \
seaborn>=0.12.0 \
plotly>=5.15.0 \
scikit-learn>=1.3.0 \
openpyxl>=3.1.0 \
jupyter>=1.0.0

In [4]:
# Assuming your raw data is in a CSV file
try:
    raw_data = pd.read_csv('/path/to/your/data.csv')
except FileNotFoundError:
    print("Error: Data file not found. Please replace '/path/to/your/data.csv' with the correct path.")
    # You might want to exit or handle this error differently depending on your needs
    raw_data = pd.DataFrame() # Create an empty DataFrame to avoid errors later
except Exception as e:
    print(f"An error occurred while reading the data file: {e}")
    raw_data = pd.DataFrame() # Create an empty DataFrame to avoid errors later


# Instantiate the preprocessor
preprocessor = ScrapDataPreprocessor()

# Clean the data
if not raw_data.empty:
    cleaned_data = preprocessor.clean_survey_responses(raw_data.copy()) # Use a copy to avoid modifying the original DataFrame

    # Display the first few rows of the cleaned data
    display(cleaned_data.head())
else:
    print("Data loading failed. Cannot proceed with cleaning.")

Error: Data file not found. Please replace '/path/to/your/data.csv' with the correct path.
Data loading failed. Cannot proceed with cleaning.
