In [1]:
import pandas as pd
import numpy as np
from typing import Tuple, Union, List
from sklearn.linear_model import LogisticRegression
from datetime import datetime
from pathlib import Path
import joblib

class DelayModel:
    def __init__(
        self
    ):
        self._model = None
        self._model_path = "models/delay_model.joblib" 
        self._columns_path = "models/columns.joblib"
        self.features_cols = [
            "OPERA_Latin American Wings", 
            "MES_7",
            "MES_10",
            "OPERA_Grupo LATAM",
            "MES_12",
            "TIPOVUELO_I",
            "MES_4",
            "MES_11",
            "OPERA_Sky Airline",
            "OPERA_Copa Air"
        ]
        self._opera_categories = None
        self._tipo_vuelo_categories = None
        self._mes_categories = None

    def _get_min_diff(self, data: pd.DataFrame) -> pd.Series:
        """Calculate time difference in minutes between Fecha-O and Fecha-I"""
        fecha_o = pd.to_datetime(data['Fecha-O'])
        fecha_i = pd.to_datetime(data['Fecha-I'])
        min_diff = ((fecha_o - fecha_i).dt.total_seconds())/60
        return min_diff

    def _save_categories(self) -> None:
        """Save the dummy categories to disk"""
        categories = {
            'opera': self._opera_categories,
            'tipo_vuelo': self._tipo_vuelo_categories,
            'mes': self._mes_categories
        }
        Path(self._columns_path).parent.mkdir(parents=True, exist_ok=True)
        joblib.dump(categories, self._columns_path)

    def _load_categories(self) -> None:
        """Load the dummy categories from disk"""        
        categories = joblib.load(self._columns_path)
        self._opera_categories = categories['opera']
        self._tipo_vuelo_categories = categories['tipo_vuelo']
        self._mes_categories = categories['mes']

    def _adjust_dummy_columns(self, dummies: pd.DataFrame, prefix: str, expected_categories: np.ndarray) -> pd.DataFrame:
        """Adjust dummy columns to match training data columns"""
        expected_columns = [f"{prefix}{cat}" for cat in expected_categories]
        
        # Add missing columns
        for col in expected_columns:
            if col not in dummies.columns:
                dummies[col] = 0
                
        # Remove extra columns
        extra_columns = [col for col in dummies.columns if col not in expected_columns]
        dummies.drop(columns=extra_columns, inplace=True, errors='ignore')
        
        return dummies

    def preprocess(
        self,
        data: pd.DataFrame,
        target_column: str = None
    ) -> Union[Tuple[pd.DataFrame, pd.DataFrame], pd.DataFrame]:
        """
        Prepare raw data for training or predict.
        Args:
            data (pd.DataFrame): raw data.
            target_column (str, optional): if set, the target is returned.
        Returns:
            Tuple[pd.DataFrame, pd.DataFrame]: features and target.
            or
            pd.DataFrame: features.
        """
        # Calculate delay
        data['min_diff'] = self._get_min_diff(data)
        data['delay'] = np.where(data['min_diff'] > 15, 1, 0)

        # If in training mode (when target_column is provided), save categories
        is_training = target_column is not None

        if is_training:
            self._opera_categories = data['OPERA'].unique()
            self._tipo_vuelo_categories = data['TIPOVUELO'].unique()
            self._mes_categories = data['MES'].unique()
            self._save_categories()
        else:
            self._load_categories()
            for col, expected_categories in [
                ('OPERA', self._opera_categories),
                ('TIPOVUELO', self._tipo_vuelo_categories),
                ('MES', self._mes_categories)
            ]:
                valores_entrada = set(data[col].unique())
                valores_esperados = set(expected_categories)
                valores_incorrectos = valores_entrada - valores_esperados
                if valores_incorrectos:
                    raise ValueError(
                    f"Unexpected values in the column {col}: {valores_incorrectos}"
                )
        
        # Get dummy variables for each categorical column and combine them
        opera_dummies = pd.get_dummies(data['OPERA'], prefix='OPERA')
        tipovuelo_dummies = pd.get_dummies(data['TIPOVUELO'], prefix='TIPOVUELO')
        mes_dummies = pd.get_dummies(data['MES'], prefix='MES')

        opera_dummies = self._adjust_dummy_columns(opera_dummies, 'OPERA_', self._opera_categories)
        tipovuelo_dummies = self._adjust_dummy_columns(tipovuelo_dummies, 'TIPOVUELO_', self._tipo_vuelo_categories)
        mes_dummies = self._adjust_dummy_columns(mes_dummies, 'MES_', self._mes_categories)

        features = pd.concat([opera_dummies, tipovuelo_dummies, mes_dummies], axis=1)

        # Select only the specified features
        features = features[self.features_cols]
        #features = features.reindex(columns=self.features_cols, fill_value=0)
        
        if is_training:
            target = pd.DataFrame()
            target['delay'] = data['delay']
            return features, target
        else:
            return features

    def fit(
        self,
        features: pd.DataFrame,
        target: pd.DataFrame
    ) -> None:
        """
        Fit model with preprocessed data.
        Args:
            features (pd.DataFrame): preprocessed data.
            target (pd.DataFrame): target.
        """
        # Convert target DataFrame to array for LogisticRegression
        target_values = target.values.ravel()

        # Calculate class weights
        n_y0 = np.sum(target_values == 0)
        n_y1 = np.sum(target_values == 1)
        class_weight = {0: 1, 1: n_y0/n_y1}

        # Initialize and train the model
        self._model = LogisticRegression(
            random_state=1,
            class_weight=class_weight,
            max_iter=1000
        )
        
        self._model.fit(features, target_values)
        self.save_model(self._model_path)

    def save_model(
        self,
        filepath: str
    ) -> None:
        """
        Save the trained model to disk.
        Args:
            filepath (str): Path where the model will be saved
        """
        # Create directory if it doesn't exist
        Path(filepath).parent.mkdir(parents=True, exist_ok=True)
        
        # Save the model
        joblib.dump(self._model, filepath)

    def load_model(
        self, 
        filepath: str
    ) -> None:
        """
        Load a trained model from disk.
        Args:
            filepath (str): Path to the saved model
        """            
        self._model = joblib.load(filepath)

    def predict(
        self,
        features: pd.DataFrame
    ) -> List[int]:
        """
        Predict delays for new flights.
        Args:
            features (pd.DataFrame): preprocessed data.
        Returns:
            List[int]: predicted targets.
        """
        if self._model is None:
            self.load_model(self._model_path)
        
        predictions = self._model.predict(features)
        return predictions.tolist()

In [28]:
import unittest
import pandas as pd

from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split

class TestModel(unittest.TestCase):    
    FEATURES_COLS = [
        "OPERA_Latin American Wings", 
        "MES_7",
        "MES_10",
        "OPERA_Grupo LATAM",
        "MES_12",
        "TIPOVUELO_I",
        "MES_4",
        "MES_11",
        "OPERA_Sky Airline",
        "OPERA_Copa Air"
    ]

    TARGET_COL = [
        "delay"
    ]


    def setUp(self) -> None:
        super().setUp()
        self.model = DelayModel()
        self.data = pd.read_csv(filepath_or_buffer="../data/data.csv",low_memory=False) ## 
        #self.model.fit(features=features, target=target)  ## 
        

    def test_model_preprocess_for_training(
        self
    ):
        features, target = self.model.preprocess(
            data=self.data,
            target_column="delay"
        )

        assert isinstance(features, pd.DataFrame)
        assert features.shape[1] == len(self.FEATURES_COLS)
        assert set(features.columns) == set(self.FEATURES_COLS)

        assert isinstance(target, pd.DataFrame)
        assert target.shape[1] == len(self.TARGET_COL)
        assert set(target.columns) == set(self.TARGET_COL)


    def test_model_preprocess_for_serving(
        self
    ):
        features = self.model.preprocess(
            data=self.data
        )

        assert isinstance(features, pd.DataFrame)
        assert features.shape[1] == len(self.FEATURES_COLS)
        assert set(features.columns) == set(self.FEATURES_COLS)


    def test_model_fit(
        self
    ):
        features, target = self.model.preprocess(
            data=self.data,
            target_column="delay"
        )

        _, features_validation, _, target_validation = train_test_split(features, target, test_size = 0.33, random_state = 42)

        self.model.fit(
            features=features,
            target=target
        )

        predicted_target = self.model._model.predict(
            features_validation
        )

        report = classification_report(target_validation, predicted_target, output_dict=True)
        
        assert report["0"]["recall"] < 0.60
        assert report["0"]["f1-score"] < 0.70
        assert report["1"]["recall"] > 0.60
        assert report["1"]["f1-score"] > 0.30


    def test_model_predict(
        self
    ):
        features = self.model.preprocess(
            data=self.data
        )

        predicted_targets = self.model.predict(
            features=features
        )
        print(predicted_targets)
        assert isinstance(predicted_targets, list)
        assert len(predicted_targets) == features.shape[0]
        assert all(isinstance(predicted_target, int) for predicted_target in predicted_targets)

In [29]:
a = TestModel()

In [30]:
a.setUp()

In [31]:
a.test_model_preprocess_for_training()

In [32]:
a.test_model_fit()

In [36]:
a.test_model_preprocess_for_serving()

In [37]:
a = True

In [40]:
if not a:
    print(2)
else:
    print(3)

3


In [48]:
    def load_training_categories(categories_path):
        """Load valid categories from saved training data"""
        categories = joblib.load(categories_path)
        #valid_airliness = categories['valid_airliness']
        return categories

In [51]:
path = r"D:\GitHub\Challenge_MLE\models\categories.joblib"
load_training_categories(path)

{'valid_airlines': {'Aerolineas Argentinas',
  'Aeromexico',
  'Air Canada',
  'Air France',
  'Alitalia',
  'American Airlines',
  'Austral',
  'Avianca',
  'British Airways',
  'Copa Air',
  'Delta Air',
  'Gol Trans',
  'Grupo LATAM',
  'Iberia',
  'JetSmart SPA',
  'K.L.M.',
  'Lacsa',
  'Latin American Wings',
  'Oceanair Linhas Aereas',
  'Plus Ultra Lineas Aereas',
  'Qantas Airways',
  'Sky Airline',
  'United Airlines'}}

In [54]:
x = (1,2)

In [55]:
len(x)

2