In [None]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
import re

In [None]:
class LinearRegression:
    """
    A class implementing a Linear Regression model with Gradient Descent and L2 Regularization.
    """
    def __init__(self, learning_rate=0.01, epochs=2000, lambda_l2=0.1, tolerance=1e-7, patience=100):
        """
        Initializes the model with hyperparameters.
        
        Args:
            learning_rate (float): The step size for gradient descent.
            epochs (int): The maximum number of iterations over the dataset.
            lambda_l2 (float): The regularization parameter for L2 regularization.
            tolerance (float): The minimum improvement in validation loss to be considered progress.
            patience (int): The number of epochs to wait for improvement before early stopping.
        """
        self.lr = learning_rate
        self.epochs = epochs
        self.lambda_l2 = lambda_l2
        self.tolerance = tolerance
        self.patience = patience
        self.weights = None
        self.bias = None
        self.history = {'train_loss': [], 'val_loss': []}

    def fit(self, X_train, y_train, X_val, y_val):
        """
        Fits the linear regression model to the training data using gradient descent.
        Implements early stopping based on validation loss.
        """
        n_samples, n_features = X_train.shape
        self.weights = np.zeros(n_features)
        self.bias = 0
        best_val_loss = float('inf')
        no_improve_epochs = 0

        for epoch in range(self.epochs):
            # Calculate predictions on the training set
            y_pred_train = np.dot(X_train, self.weights) + self.bias
            
            # Calculate gradients for weights (dw) and bias (db) with L2 regularization
            dw = (1/n_samples) * np.dot(X_train.T, (y_pred_train - y_train)) + self.lambda_l2 * self.weights
            db = (1/n_samples) * np.sum(y_pred_train - y_train)
            
            # Update weights and bias using gradient descent
            self.weights -= self.lr * dw
            self.bias -= self.lr * db
            
            # Calculate and store training and validation loss
            train_loss = self.mean_squared_error(y_train, y_pred_train)
            y_pred_val = self.predict(X_val)
            val_loss = self.mean_squared_error(y_val, y_pred_val)
            self.history['train_loss'].append(train_loss)
            self.history['val_loss'].append(val_loss)
            
            if (epoch + 1) % 1000 == 0:
                print(f"Epoch {epoch+1}/{self.epochs} | Train Loss: {train_loss:.4f} | Validation Loss: {val_loss:.4f}")
            
            # Early stopping check
            if val_loss < best_val_loss - self.tolerance:
                best_val_loss = val_loss
                no_improve_epochs = 0
            else:
                no_improve_epochs += 1
            
            if no_improve_epochs >= self.patience:
                print(f"Early stopping at epoch {epoch+1}.")
                break

    def predict(self, X):
        """
        Makes predictions using the trained linear model.
        """
        return np.dot(X, self.weights) + self.bias

    @staticmethod
    def mean_squared_error(y_true, y_pred):
        """Calculates the Mean Squared Error metric."""
        return np.mean((y_true - y_pred) ** 2)

    @staticmethod
    def mean_absolute_error(y_true, y_pred):
        """Calculates the Mean Absolute Error metric."""
        return np.mean(np.abs(y_true - y_pred))

    @staticmethod
    def r2_score(y_true, y_pred):
        """Calculates the R-squared (coefficient of determination) metric."""
        ss_res = np.sum((y_true - y_pred) ** 2)
        ss_tot = np.sum((y_true - np.mean(y_true)) ** 2)
        if ss_tot == 0: return 1.0  # Handle case where y_true is constant
        return 1 - (ss_res / ss_tot)

In [None]:
class CarPricePredictor:
    """
    A class that encapsulates the entire pipeline from preprocessing to car price prediction.
    This class handles data cleaning, feature engineering, training, and prediction.
    """
    def __init__(self, top_n_makes=12, current_year=2024):
        """
        Initializes the predictor pipeline.
        
        Args:
            top_n_makes (int): The number of most frequent car makes to keep as separate features.
            current_year (int): The current year, used to calculate car age.
        """
        self.current_year = current_year
        self.top_n_makes = top_n_makes
        self.model = None
        self.state = {} # Dictionary to store all learned parameters (medians, scalers, etc.)

    def _preprocess(self, df, fit=False):
        """
        Internal method to handle all data preprocessing steps.
        If fit=True, it learns and stores preprocessing parameters in self.state.
        Otherwise, it uses the stored parameters to transform the data.
        """
        # Select a subset of columns to use
        cols_to_use = ['Year', 'Kilometer', 'Fuel Type', 'Transmission', 'Owner', 'Make', 
                       'Engine', 'Max Power', 'Max Torque', 'Seating Capacity']
        if 'Price' in df.columns:
            cols_to_use.append('Price')
        data = df[[col for col in cols_to_use if col in df.columns]].copy()

        # Feature Engineering: Create 'Age' from 'Year'
        if 'Year' in data.columns: data['Age'] = self.current_year - data['Year']

        # Helper function to extract numeric values from string columns
        def extract_numeric(text):
            if pd.isna(text): return np.nan
            match = re.search(r'(\d+\.?\d*)', str(text).lower())
            return float(match.group(1)) if match else np.nan
        
        # Apply numeric extraction to relevant columns
        for col in ['Engine', 'Max Power', 'Max Torque']:
            if col in data.columns: data[col] = data[col].apply(extract_numeric)

        # Impute missing numeric values with the median
        numeric_cols = ['Kilometer', 'Engine', 'Max Power', 'Max Torque', 'Seating Capacity', 'Age']
        if fit: self.state['medians'] = {}
        for col in numeric_cols:
            if col in data.columns:
                if fit: self.state['medians'][col] = data[col].median()
                data[col] = data[col].fillna(self.state.get(col, data[col].median()))

        # Map 'Owner' column to numerical values
        if 'Owner' in data.columns:
            if fit: self.state['owner_map'] = {'First': 1, 'Second': 2, 'Third': 3, 'Fourth & Above': 4, 'Test Drive Car': 0}
            data['Owner'] = data['Owner'].map(self.state.get('owner_map', {})).fillna(0)

        # Handle 'Make' by keeping top N and grouping others into 'Other'
        if 'Make' in data.columns:
            if fit: self.state['top_makes'] = data['Make'].value_counts().nlargest(self.top_n_makes).index.tolist()
            data['Make'] = data['Make'].apply(lambda x: x if x in self.state.get('top_makes', []) else 'Other')

        # Perform one-hot encoding for categorical features
        data = pd.get_dummies(data, columns=['Make', 'Fuel Type', 'Transmission'], drop_first=True)

        # Feature Engineering: Create polynomial features
        if 'Age' in data.columns: data['Age_Squared'] = data['Age']**2
        if 'Kilometer' in data.columns: data['Kilometer_Squared'] = data['Kilometer']**2
            
        # Store the final list of feature columns during fitting
        if fit:
            self.state['final_features'] = [c for c in data.columns if c not in ['Year', 'Price']]
        
        # Reindex columns to ensure consistency between train and test sets
        data = data.reindex(columns=self.state.get('final_features', []), fill_value=0)
        return data

    def fit(self, df_train, learning_rate=0.001, epochs=15000, lambda_l2=0.01, patience=800):
        """
        Executes the full training pipeline: outlier removal, preprocessing, splitting, scaling, and model training.
        """
        print("--- Starting Training Pipeline ---")
        # 1. Remove outliers from the target variable 'Price'
        q_low, q_hi = df_train["Price"].quantile(0.01), df_train["Price"].quantile(0.99)
        data_cleaned = df_train[(df_train["Price"] >= q_low) & (df_train["Price"] <= q_hi)].copy()
        print(f"Removed outliers: {len(data_cleaned)}/{len(df_train)} samples remaining.")
        
        # 2. Preprocess data and learn transformation parameters (fit=True)
        X = self._preprocess(data_cleaned, fit=True)
        # Apply log transformation to the target variable to handle skewed distribution
        y = np.log1p(data_cleaned['Price'])

        # 3. Split data into training and validation sets
        np.random.seed(42)
        indices = np.random.permutation(X.shape[0])
        val_size = int(X.shape[0] * 0.2)
        val_indices, train_indices = indices[:val_size], indices[val_size:]
        X_train, X_val = X.iloc[train_indices], X.iloc[val_indices]
        y_train, y_val = y.iloc[train_indices], y.iloc[val_indices]

        # 4. Standardize data: learn scaler on training data and transform both sets
        self.state['scaler_mean'] = X_train.mean(axis=0)
        self.state['scaler_std'] = X_train.std(axis=0)
        self.state['scaler_std'][self.state['scaler_std'] == 0] = 1.0 # Avoid division by zero
        X_train_scaled = (X_train - self.state['scaler_mean']) / self.state['scaler_std']
        X_val_scaled = (X_val - self.state['scaler_mean']) / self.state['scaler_std']
        
        # 5. Train the Linear Regression model
        print("\nTraining the Linear Regression model...")
        self.model = LinearRegression(learning_rate=learning_rate, epochs=epochs, lambda_l2=lambda_l2, patience=patience)
        self.model.fit(X_train_scaled.values, y_train.values, X_val_scaled.values, y_val.values)
        print("--- Training complete ---")
        
        # Evaluate on the full (cleaned) training set after training is done
        print("\n--- Evaluating on the full training set ---")
        self.evaluate(data_cleaned, dataset_name="Train")

    def predict(self, df_new):
        """
        Makes price predictions on new, unseen data.
        """
        if not self.model: raise RuntimeError("Model has not been trained. Please call .fit() first.")
        # Preprocess the new data using the saved state from training (fit=False)
        X_new = self._preprocess(df_new, fit=False)
        # Scale the new data using the saved scaler
        X_new_scaled = (X_new - self.state['scaler_mean']) / self.state['scaler_std']
        # Predict on the log-transformed scale
        y_pred_log = self.model.predict(X_new_scaled.values)
        # Revert the log transformation to get the actual price prediction
        y_pred_real = np.expm1(y_pred_log)
        return y_pred_real

    def evaluate(self, df, dataset_name="Test"):
        """
        Evaluates the model on a given dataframe and prints performance metrics.
        The `dataset_name` parameter is used for clear print statements.
        """
        if 'Price' not in df.columns:
            print(f"Error: 'Price' column not found in the {dataset_name} dataset for evaluation.")
            return None, None

        y_true = df['Price']
        y_pred = self.predict(df)
        
        # Calculate metrics on the real price scale
        mse = self.model.mean_squared_error(y_true, y_pred)
        mae = self.model.mean_absolute_error(y_true, y_pred)
        r2 = self.model.r2_score(y_true, y_pred)
        
        print(f"Metrics for {dataset_name} set:")
        print(f"  R^2 Score (Real Scale): {r2:.4f}")
        print(f"  Mean Squared Error (MSE): {mse:,.0f}")
        print(f"  Mean Absolute Error (MAE): {mae:,.0f}")
        
        # Prepare results DataFrame for visualization or inspection
        results = df[['Make', 'Model', 'Year']].copy()
        results['Actual_Price'] = y_true
        results['Predicted_Price'] = np.round(y_pred, 0)
        return results, r2

In [None]:
predictor = CarPricePredictor(top_n_makes=12)
df_train_full = pd.read_csv('train.csv')

# The .fit() method will run the entire training pipeline and print training set evaluation
predictor.fit(df_train_full)

In [None]:
print("\n" + "="*60)
print("EVALUATING ON THE TEST DATASET")
print("="*60)

df_test = pd.read_csv('test.csv')
test_results, test_r2 = predictor.evaluate(df_test, dataset_name="Test")

if test_results is not None:
    print("\nPreview of the first 10 predictions on the test set:")
    print(test_results.head(10))

In [None]:
if test_results is not None:
    plt.style.use('seaborn-v0_8-whitegrid')
    plt.figure(figsize=(14, 6))
    