In [30]:
import os
%pwd  # this tell us which path we are currently working , so based on the below output path we are working under the research file
os.chdir("c:\\datascience End to End Projects\\steel-plant-Load-Prediction-")  #  but i would like to work with main ProjectML_with_MLFlow file , so for getting i step back in path inorder to enter the main project file i used this command os.chdir("../")
%pwd

'c:\\datascience End to End Projects\\steel-plant-Load-Prediction-'

In [31]:
# This is called the entity 
from dataclasses import dataclass # here i imported the dataclass from the dataclasses
from pathlib import Path  # here i imported path from pathlib

@dataclass(frozen=True)
class DataTransformationConfig:
    root_dir: Path  # these are variables which are present inside the config.yaml file data_transformation code part and here iam mentioning inside the entity of the class
    data_path: Path
    preprocessor_obj: str

In [32]:
from PROJECTML.constants import *
from PROJECTML.utils.common import read_yaml, create_directories

In [33]:
# this is same part of the code in every step 
class ConfigurationManager:
    def __init__(
        self,
        config_filepath = CONFIG_FILE_PATH,
        params_filepath = PARAMS_FILE_PATH,
        schema_filepath = SCHEMA_FILE_PATH):

        self.config = read_yaml(config_filepath)
        self.params = read_yaml(params_filepath)
        self.schema = read_yaml(schema_filepath)

        create_directories([self.config.artifacts_root])


    # only this part get changes in every step, only defining the get_data_transformation_config get changes according to which step we are performing like 01_data_ingestion,02_data_validation
    def get_data_transformation_config(self) -> DataTransformationConfig:
        config = self.config.data_transformation

        create_directories([config.root_dir])

        data_transformation_config = DataTransformationConfig(
            root_dir=config.root_dir,  # here iam returning these 2 varaibles by using this code 
            data_path=config.data_path,
            preprocessor_obj=config.preprocessor_obj
        )

        return data_transformation_config

In [34]:
import os
from PROJECTML import logger
import pandas as pd
from sklearn.preprocessing import LabelEncoder
from sklearn.pipeline import Pipeline
#from sklearn.preprocessing import LabelEncoder
from sklearn.compose import ColumnTransformer
import joblib
from sklearn.preprocessing import OrdinalEncoder
from sklearn.pipeline import Pipeline,make_pipeline
import numpy as np
from sklearn.compose import make_column_transformer
from PROJECTML.config.configuration import DataTransformationConfig
from sklearn.model_selection import train_test_split
from imblearn.over_sampling import RandomOverSampler
from scipy.stats import f_oneway
from sklearn.feature_selection import VarianceThreshold
from sklearn.feature_selection import f_classif
from sklearn.preprocessing import OneHotEncoder



In [43]:
# here i defined the component of DataTransformationConfig below
class DataTransformation:
    def __init__(self, config: DataTransformationConfig):
        self.config = config
        

    def creating_new_renamed_columns_dataset(self):
        self.dataset=self.config.data_path
        self.new_data=pd.read_csv(self.dataset)
        logger.info("loaded the dataset successfully")
        #Rename some columns
        self.new_data= self.new_data.rename(columns={'Lagging_Current_Reactive.Power_kVarh': 'Lagging_Reactive_Power_kVarh',
                                'Leading_Current_Reactive_Power_kVarh': 'Leading_Reactive_Power_kVarh',
                                'Lagging_Current_Power_Factor': 'Lagging_Power_Factor',
                                'Leading_Current_Power_Factor': 'Leading_Power_Factor',
                                'CO2(tCO2)':'CO2'})
        logger.info("renamed the dataset columns successfully")


        # Assuming self.new_data is your DataFrame containing the target feature column 'Load_Type'

        # Define the oversampler
        oversampler = RandomOverSampler(random_state=42)

        # Separate features and target
        X = self.new_data.drop(columns=['Load_Type'])  # Features
        y = self.new_data['Load_Type']  # Target

        # Perform random oversampling
        X_resampled, y_resampled = oversampler.fit_resample(X, y)

        # Convert back to DataFrame if needed
        self.new_data = pd.DataFrame(X_resampled, columns=X.columns)
        self.new_data['Load_Type'] = y_resampled

        # Check the distribution after oversampling
        print(self.new_data['Load_Type'].value_counts())
        print(self.new_data.head())


        #self.new_data.to_csv(os.path.join(self.config.root_dir, "renamed_columns_dataset.csv"),index = False)
        
        self.new_data.head()
        self.new_data['date'] = pd.to_datetime(self.new_data['date'], format='%d/%m/%Y %H:%M')
        #self.new_data['date_year'] = self.new_data['date'].dt.year  # iam dropping it because it is a constant feature 
        #self.new_data['date_month_no'] = self.new_data['date'].dt.month # iam dropping this feature because annova test suggested me to give the least for this feature because it got very less value of annova test
        #self.new_data['date_day'] = self.new_data['date'].dt.day # same applies here
        self.new_data['hour'] = self.new_data['date'].dt.hour
        #self.new_data['min'] = self.new_data['date'].dt.minute  # same applies here 
        self.new_data.drop(columns='date', inplace=True)# we got extract the imp features from this date so thatsy iam dropping this date column]
        self.new_data.drop(columns='Day_of_week', inplace=True) # iam removing this i already performed the annova test there it suggest me to not select this feature based on statistical measure thatsy iam dropping this feature
        self.new_data.info()
        

    def pipeline_creation(self):
        self.new_data1 = self.new_data.copy()
        self.new_data1.drop(columns='Load_Type', inplace=True)

        # Numerical columns
        numerical_columns = self.new_data1.select_dtypes(include=[np.number]).columns.tolist()

        # Categorical columns
        categorical_columns = ['WeekStatus']

        # Apply One-Hot Encoding for categorical columns
        categorical_pipeline = make_column_transformer(
            (OneHotEncoder(), categorical_columns),
            remainder='passthrough'
        )

        # Fit and transform the categorical pipeline
        X_transformed = categorical_pipeline.fit_transform(self.new_data1)

        # Save the preprocessor object for categorical features
        joblib.dump(categorical_pipeline, os.path.join(self.config.root_dir, "categorical_preprocessor_obj.joblib"))

        # Label encode the target variable
        le = LabelEncoder()
        self.new_data['Load_Type'] = le.fit_transform(self.new_data['Load_Type'])

        # Print the classes that the LabelEncoder has seen
        print("Labels that were encoded:", le.classes_)

        # Print the corresponding encoded values
        print("Encoded values:", np.arange(len(le.classes_)))

        # Save the label encoder object for the target variable
        joblib.dump(le, os.path.join(self.config.root_dir, "label_encoder_obj.joblib"))

        #label_mapping = dict(zip(self.new_data['Load_Type'], self.new_data['Load_Type_encoded']))
        #logger.info("Label Encoding Mapping:")
        # Print the classes that the LabelEncoder has seen
        print("Labels that were encoded:", le.classes_)

        # Get the feature names after one-hot encoding
        transformed_columns = categorical_pipeline.named_transformers_['onehotencoder'].get_feature_names_out(input_features=categorical_columns).tolist()

        # Combine transformed features with numerical columns
        transformed_columns += numerical_columns

        # Convert X_transformed to DataFrame
        X_transformed_df = pd.DataFrame(X_transformed, columns=transformed_columns)

        # Combine transformed features with target variable
        self.new_data = pd.concat([X_transformed_df, self.new_data['Load_Type']], axis=1)

        # Print transformed dataset info
        logger.info("Transformed Dataset Info:")
        logger.info(self.new_data.info())

        # Print shape and head of the transformed dataset
        logger.info("Shape of Transformed Dataset:")
        logger.info(self.new_data.shape)
        logger.info("Head of Transformed Dataset:")
        logger.info(self.new_data.head())



    def find_constant_features(self):
        # Assuming self.new_data is your DataFrame containing all features

        # Initialize VarianceThreshold with the threshold
        vt = VarianceThreshold(threshold=0)

        # Fit the VarianceThreshold to identify constant features
        vt.fit(self.new_data)

        # Get boolean mask of features that are not constant
        mask = vt.get_support()

        # Get the list of constant features
        constant_features = self.new_data.columns[~mask].tolist()

        # Print or log the constant features
        logger.info("Constant Features:")
        logger.info(constant_features)

        return constant_features

    def find_quasi_constant_features(self):
        # Assuming self.new_data is your DataFrame containing all features

        # Remove the constant features before identifying quasi-constant features
        self.new_data = self.new_data.drop(columns=self.find_constant_features())

        # Initialize VarianceThreshold with the threshold
        vt = VarianceThreshold(threshold=0.01)

        # Fit the VarianceThreshold to identify quasi-constant features
        vt.fit(self.new_data)

        # Get boolean mask of features that are not quasi-constant
        mask = vt.get_support()

        # Get the list of quasi-constant features
        quasi_constant_features = self.new_data.columns[~mask].tolist()

        # Print or log the quasi-constant features
        logger.info("Quasi-Constant Features:")
        logger.info(quasi_constant_features)

        return quasi_constant_features
    
    def perform_anova_test(self):
        # Assuming self.new_data is your DataFrame containing the target feature column 'Load_Type'
        # and independent numerical features

        # Select numerical columns
        numerical_columns = self.new_data.select_dtypes(include=[np.number]).columns.tolist()
        print(numerical_columns)

        # Perform ANOVA test for each numerical feature
        f_values, p_values = f_classif(self.new_data[numerical_columns], self.new_data['Load_Type'])

        # Create a DataFrame to store results
        anova_results = pd.DataFrame({'Feature': numerical_columns, 'F-value': f_values, 'p-value': p_values})

        # Sort the results based on F-values
        anova_results.sort_values(by='F-value', ascending=False, inplace=True)

        # Print or log ANOVA results
        logger.info("ANOVA Test Results:")
        logger.info(anova_results)

        return anova_results
    
    #def selecting_the_best_features(self):
    #   features_to_drop = ['date_day', 'date_month_no', 'min']
    #   self.new_data.drop(columns=features_to_drop, inplace=True)
            
        
    def train_test_spliting(self):
       
        transformed_dataset=self.new_data
        
        train, test = train_test_split(transformed_dataset,test_size=0.25,random_state=42) # this line splits the data into train_test_split

        train.to_csv(os.path.join(self.config.root_dir, "train.csv"),index = False) # here it saves the train and test data in csv format inisde the artifacts-> transformation folder
        test.to_csv(os.path.join(self.config.root_dir, "test.csv"),index = False)

        logger.info("Splited data into training and test sets")
        logger.info(train.shape) # this logs the information about that how many training and testing samples i have 
        logger.info(test.shape)


In [44]:
try:
    config = ConfigurationManager() # here iam initlizing my ConfigurationManager
    data_transformation_config = config.get_data_transformation_config() # and here iam getting my get_data_transformation_config()
    data_transformation = DataTransformation(config=data_transformation_config) # here iam passing my data_transformation_config it means iam calling this data_transformation_config
    data_transformation.creating_new_renamed_columns_dataset()
    data_transformation.pipeline_creation()
    data_transformation.find_constant_features()
    data_transformation.find_quasi_constant_features()
    data_transformation.perform_anova_test()
    #data_transformation.selecting_the_best_features()
    data_transformation.train_test_spliting() # here performing the train_test_split()
except Exception as e: # this part of code will raise error if anything goes wrong
    raise e

[2024-04-16 21:15:06,168: INFO: common: yaml file: config\config.yaml loaded successfully]
[2024-04-16 21:15:06,171: INFO: common: yaml file: params.yaml loaded successfully]
[2024-04-16 21:15:06,176: INFO: common: yaml file: schema.yaml loaded successfully]
[2024-04-16 21:15:06,178: INFO: common: created directory at: artifacts]
[2024-04-16 21:15:06,181: INFO: common: created directory at: artifacts/data_transformation]
[2024-04-16 21:15:06,251: INFO: 2960569452: loaded the dataset successfully]
[2024-04-16 21:15:06,256: INFO: 2960569452: renamed the dataset columns successfully]
Load_Type
Light_Load      18072
Medium_Load     18072
Maximum_Load    18072
Name: count, dtype: int64
               date  Usage_kWh  Lagging_Reactive_Power_kVarh  \
0  01/01/2018 00:15       3.17                          2.95   
1  01/01/2018 00:30       4.00                          4.46   
2  01/01/2018 00:45       3.24                          3.28   
3  01/01/2018 01:00       3.31                        

  f = msb / msw


[2024-04-16 21:15:07,210: INFO: 2960569452: Splited data into training and test sets]
[2024-04-16 21:15:07,212: INFO: 2960569452: (40662, 11)]
[2024-04-16 21:15:07,213: INFO: 2960569452: (13554, 11)]
