In [2]:
import pandas as pd
import numpy as np
from sklearn.impute import KNNImputer
from sklearn.preprocessing import StandardScaler
from sklearn.linear_model import LinearRegression  # Import LinearRegression
import joblib

# Load the test dataset
test_df = pd.read_csv('dataset/test.csv')

# Function to preprocess the test data
def preprocess_test_data(test_df, scaler=None):
    # Convert columns to appropriate data types
    test_df['humidity'] = pd.to_numeric(test_df['humidity'], errors='coerce')
    test_df['wind_speed'] = pd.to_numeric(test_df['wind_speed'], errors='coerce')
    test_df['pressure'] = pd.to_numeric(test_df['pressure'], errors='coerce')

    # Fill missing panel_age with the median age of the same string_id
    test_df['panel_age'] = test_df.groupby('string_id')['panel_age'].transform(
        lambda x: x.fillna(x.median())
    )

    # Impute missing values for maintenance_count
    error_string_median = test_df.groupby(['string_id', 'error_code'])['maintenance_count'].median()
    error_code_median = test_df.groupby('error_code')['maintenance_count'].median()
    string_id_median = test_df.groupby('string_id')['maintenance_count'].median()
    overall_median = test_df['maintenance_count'].median()

    missing_mask = test_df['maintenance_count'].isna()
    for idx in test_df[missing_mask].index:
        string_id = test_df.loc[idx, 'string_id']
        error_code = test_df.loc[idx, 'error_code']

        if pd.notna(error_code) and (string_id, error_code) in error_string_median:
            test_df.loc[idx, 'maintenance_count'] = error_string_median[(string_id, error_code)]
        elif pd.notna(error_code) and error_code in error_code_median:
            test_df.loc[idx, 'maintenance_count'] = error_code_median[error_code]
        elif string_id in string_id_median:
            test_df.loc[idx, 'maintenance_count'] = string_id_median[string_id]
        else:
            test_df.loc[idx, 'maintenance_count'] = overall_median

    # Impute missing values for soiling_ratio
    maintenance_bins = pd.cut(test_df['maintenance_count'], bins=5, include_lowest=True)
    maintenance_soiling_median = test_df.groupby(maintenance_bins)['soiling_ratio'].median()

    missing_soiling_mask = test_df['soiling_ratio'].isna()
    for idx in test_df[missing_soiling_mask].index:
        maintenance_val = test_df.loc[idx, 'maintenance_count']

        if pd.notna(maintenance_val):
            for bin_range, median_soiling in maintenance_soiling_median.items():
                if maintenance_val >= bin_range.left and maintenance_val <= bin_range.right:
                    test_df.loc[idx, 'soiling_ratio'] = median_soiling
                    break

    # Impute missing values for module_temperature
    if 'temperature' in test_df.columns:
        temp_numeric = pd.to_numeric(test_df['temperature'], errors='coerce')
        module_temp_numeric = pd.to_numeric(test_df['module_temperature'], errors='coerce')

        train_mask = module_temp_numeric.notna() & temp_numeric.notna()
        if train_mask.sum() > 10:
            lr = LinearRegression()
            lr.fit(temp_numeric[train_mask].values.reshape(-1, 1), module_temp_numeric[train_mask])

            predict_mask = module_temp_numeric.isna() & temp_numeric.notna()
            test_df.loc[predict_mask, 'module_temperature'] = lr.predict(temp_numeric[predict_mask].values.reshape(-1, 1))

    numeric_features = test_df.select_dtypes(include=['number']).columns.tolist()
    features_to_use = [f for f in numeric_features if f != 'module_temperature']

    if features_to_use:
        df_numeric = test_df[['module_temperature'] + features_to_use].apply(pd.to_numeric, errors='coerce')
        imputer = KNNImputer(n_neighbors=5)
        imputed_values = imputer.fit_transform(df_numeric)
        test_df['module_temperature'] = imputed_values[:, 0]

    # Create the 'power' feature
    test_df['power'] = test_df['current'] * test_df['voltage']

    # Impute missing values for irradiance
    irradiance = pd.to_numeric(test_df['irradiance'], errors='coerce')
    zero_mask = (irradiance == 0)
    to_impute = zero_mask | irradiance.isna()

    if 'power' in test_df.columns and 'efficiency' in test_df.columns:
        estimated_irradiance = pd.to_numeric(test_df['power'], errors='coerce') / (pd.to_numeric(test_df['efficiency'], errors='coerce') + 0.0001)
        plausible_mask = (estimated_irradiance > 0) & (estimated_irradiance < 1500)
        irradiance[to_impute & plausible_mask] = estimated_irradiance[to_impute & plausible_mask]

    relevant_features = ['temperature', 'module_temperature', 'humidity', 'cloud_coverage']
    use_features = [f for f in relevant_features if f in test_df.columns]

    if use_features:
        knn_data = test_df[use_features].apply(pd.to_numeric, errors='coerce')
        knn_data['irradiance'] = irradiance
        imputer = KNNImputer(n_neighbors=5)
        imputed = imputer.fit_transform(knn_data)
        irradiance[to_impute] = imputed[to_impute, -1]

    irradiance = np.clip(irradiance, 0, 1500)
    if irradiance.isna().any():
        irradiance.fillna(irradiance.median(), inplace=True)
    test_df['irradiance'] = irradiance

    # Impute missing values for humidity
    humidity = pd.to_numeric(test_df['humidity'], errors='coerce')
    zero_mask = (humidity == 0)
    to_impute = zero_mask | humidity.isna()

    relevant_features = ['temperature', 'module_temperature', 'cloud_coverage', 'wind_speed', 'pressure']
    use_features = [f for f in relevant_features if f in test_df.columns]

    if use_features:
        knn_data = test_df[use_features].apply(pd.to_numeric, errors='coerce')
        knn_data['humidity'] = humidity
        imputer = KNNImputer(n_neighbors=5)
        imputed = imputer.fit_transform(knn_data)
        humidity[to_impute] = imputed[to_impute, -1]

    humidity = np.clip(humidity, 0, 100)
    if humidity.isna().any():
        humidity.fillna(humidity.median(), inplace=True)
    test_df['humidity'] = humidity

    # Handle categorical variables: one-hot encode 'error_code'
    test_df = pd.get_dummies(test_df, columns=['error_code'])

    # Ensure all one-hot encoded columns from training are present
    expected_columns = ['error_code_E00', 'error_code_E01', 'error_code_E02', 'error_code_Unknown']
    for column in expected_columns:
        if column not in test_df.columns:
            test_df[column] = 0

    # Scale numerical features using the same scaler as the training data
    if scaler:
        numerical_features = ['module_temperature', 'irradiance', 'power', 'panel_age', 'maintenance_count', 'soiling_ratio', 'humidity']
        test_df[numerical_features] = scaler.transform(test_df[numerical_features])

    return test_df

# Load the scaler used for training data
scaler = joblib.load('scaler.pkl')  # Make sure to replace 'scaler.pkl' with the actual path to your saved scaler

# Preprocess the test data
test_df_processed = preprocess_test_data(test_df, scaler)

# Save the preprocessed test data
test_df_processed.to_csv('test_data_processed.csv', index=False)


  maintenance_soiling_median = test_df.groupby(maintenance_bins)['soiling_ratio'].median()
