In [34]:
import pandas as pd
import numpy as np
import json
import os

from sklearn.impute import SimpleImputer
from sklearn.preprocessing import LabelEncoder, StandardScaler

configs = json.load(open("config.json"))
configs

{'row_read_folder_path': 'data/versions/18/csv',
 'row_write_folder_path': 'data/versions/18/tables'}

In [35]:
file_names = os.listdir(configs['row_write_folder_path'])
file_names

['market_prices_20251108-130512.csv',
 'market_prices_20251108-152423.csv',
 'market_prices_20251108-152737.csv']

In [36]:
file_name = os.listdir(configs['row_write_folder_path'])[-1]
full_data_path = os.path.join(configs['row_write_folder_path'], file_name)

In [37]:
df = pd.read_csv(full_data_path, low_memory=False)
df.head()

Unnamed: 0,State,District,Market,Commodity,Variety,Grade,Arrival_Date,Min_Price,Max_Price,Modal_Price,Commodity_Code
0,Andhra Pradesh,Anantapur,Anantapur,Ground Nut Seed,Ground Nut Seed,Medium,2011-01-01,3800.0,3900.0,3850.0,268
1,Andhra Pradesh,Anantapur,Dharmavaram,Paddy (Dhan)(Common),B P T,Medium,2011-01-01,970.0,1030.0,1000.0,2
2,Andhra Pradesh,Anantapur,Guntakal,Ground Nut Seed,Ground Nut Seed,Medium,2011-01-01,4300.0,4500.0,4400.0,268
3,Andhra Pradesh,Anantapur,Hindupur,Dry Chillies,2nd Sort,Medium,2011-01-01,2400.0,4000.0,3400.0,132
4,Andhra Pradesh,Anantapur,Hindupur,Tamarind Fruit,Flower A/c,Medium,2011-01-01,3600.0,4900.0,4250.0,261


In [39]:
df.columns

Index(['State', 'District', 'Market', 'Commodity', 'Variety', 'Grade',
       'Arrival_Date', 'Min_Price', 'Max_Price', 'Modal_Price',
       'Commodity_Code'],
      dtype='object')

In [40]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 20021522 entries, 0 to 20021521
Data columns (total 11 columns):
 #   Column          Dtype  
---  ------          -----  
 0   State           object 
 1   District        object 
 2   Market          object 
 3   Commodity       object 
 4   Variety         object 
 5   Grade           object 
 6   Arrival_Date    object 
 7   Min_Price       float64
 8   Max_Price       float64
 9   Modal_Price     float64
 10  Commodity_Code  int64  
dtypes: float64(3), int64(1), object(7)
memory usage: 1.6+ GB


In [44]:
df['Arrival_Date'].value_counts().sort_index(ascending=True)

Arrival_Date
2011-01-01    2671
2011-01-02    1345
2011-01-03    2976
2011-01-04    2871
2011-01-05    3056
              ... 
2025-11-02    1734
2025-11-03    2384
2025-11-04    4114
2025-11-05     700
2025-11-06       5
Name: count, Length: 5424, dtype: int64

In [46]:
cat_cols = df.select_dtypes(include='object').columns
num_cols = df.select_dtypes(exclude='object').columns
print("Cat Columns: ", cat_cols)
print("Num Columns: ", num_cols)

Cat Columns:  Index(['State', 'District', 'Market', 'Commodity', 'Variety', 'Grade',
       'Arrival_Date'],
      dtype='object')
Num Columns:  Index(['Min_Price', 'Max_Price', 'Modal_Price', 'Commodity_Code'], dtype='object')


In [100]:
from sklearn.preprocessing import OneHotEncoder, LabelEncoder, StandardScaler, MinMaxScaler
import pandas as pd
from typing import Union
import joblib
import json

processing_configs = json.load(open("src/processing_config.json"))


class ScaleData:
    """
    This class is responsible for scaling numeric data columns.
    """

    def __init__(self, num_cols: list, method: str):
        """
        Args:
            num_cols: list of column names that need to be scaled
            method: Choose string from ('standard', 'minmax')
        """
        self.num_cols = num_cols
        if method == "standard":
            self.scaler = StandardScaler()
        elif method == "minmax":
            self.scaler = MinMaxScaler()
        else:
            raise ValueError("Choose method from ('standard', 'minmax')")

    def fit(self, X: pd.DataFrame, y: Union[pd.Series, None] = None) -> None:
        """
        Fits the scaler on numeric columns.
        """
        self.scaler.fit(X[self.num_cols])
        return self


    def transform(self, X: pd.DataFrame, y: Union[pd.Series, None] = None) -> pd.DataFrame:
        """
        Transforms numeric columns and returns formatted DataFrame.
        """
        X_copy = X.copy()
        scaled_values = self.scaler.transform(X_copy[self.num_cols])
        joblib.dump(self.scaler, processing_configs['scaler_file_path'])
        X_copy[self.num_cols] = pd.DataFrame(
            scaled_values, columns=self.num_cols, index=X_copy.index
        )
        return X_copy



class EncodelData:
    """
    This class is responsible for labeling categorical columns.
    """



    def __init__(self, cat_cols: list, method: str):
        """
        Args:
            cat_cols: list of column names that need to be labelled
            method: Choose string from ('onehot', 'label')
        """
        self.cat_cols = cat_cols
        self.method = method

        if method == "onehot":
            self.encoder = OneHotEncoder(sparse_output=False, drop=None, handle_unknown="ignore")
        elif method == "label":
            self.encoder = {col: LabelEncoder() for col in cat_cols}
        else:
            raise ValueError("Choose method from ('onehot', 'label')")



    def fit(self, X: pd.DataFrame, y: Union[pd.Series, None] = None) -> None:
        """
        Fitting data before transformation.
        """
        if self.method == "onehot":
            self.encoder.fit(X[self.cat_cols])
        else:  # label encoding
            for col in self.cat_cols:
                self.encoder[col].fit(X[col])
        return self



    def transform(self, X: pd.DataFrame, y: Union[pd.Series, None] = None) -> pd.DataFrame:
        """
        Transforms categorical columns and returns formatted DataFrame.
        """
        X_copy = X.copy()

        if self.method == "onehot":
            encoded_array = self.encoder.transform(X_copy[self.cat_cols])
            joblib.dump(self.encoder, processing_configs['one_hot_encoder_file_path'])
            encoded_df = pd.DataFrame(
                encoded_array,
                columns=self.encoder.get_feature_names_out(self.cat_cols),
                index=X_copy.index
            )
            X_copy = pd.concat([X_copy.drop(columns=self.cat_cols), encoded_df], axis=1)

        elif self.method == "label":
            for col in self.cat_cols:
                X_copy[col] = self.encoder[col].transform(X_copy[col])
                joblib.dump(self.encoder, os.path.join(processing_configs['label_encoder_folder_path'], col + '.pkl'))

        return X_copy



class Imputer:
    """
    This class is responsible for imputing missing values.
    """


    def __init__(self, cat_cols:list, num_cols:list, num_method:str, cat_method:str='most_frequent'):
        """
        Args:
            cat_cols: list of column names that need to be imputed
            num_cols: list of column names that need to be imputed
            cat_method: Choose string from ('most_frequent')
            num_method: Choose string from ('mean', 'median', 'mode')
        """
        self.cat_cols = cat_cols
        self.num_cols = num_cols
        self.cat_method = cat_method
        self.num_method = num_method



    def __findReplacements(self, X: pd.DataFrame) -> tuple:
        """
        Args:
            X: pd.DataFrame
        Description:
            Finds the values which needs to be imputed for categorical and numerical columns.
        Returns:
            pd.DataFrame
        """

        cat_rep = {}
        num_rep = {}

        if self.cat_method == "most_frequent":
            for col in self.cat_cols:
                cat_rep[col] = X[col].mode()[0]
        else:
            raise ValueError("Choose method from ('most_frequent')")


        if self.num_method == "mean":
            for col in self.num_cols:
                num_rep[col] = X[col].mean()
        elif self.num_method == "median":
            for col in self.num_cols:
                num_rep[col] = X[col].median()
        elif self.num_method == "mode":
            for col in self.num_cols:
                num_rep[col] = X[col].mode()[0]
        else:
            raise ValueError("Choose method from ('mean', 'median', 'mode')")

        return cat_rep, num_rep



    def fit(self, X: pd.DataFrame, y: Union[pd.Series, None] = None) -> None:
        """
        Finding value to be replaced with missing values for categorical and numeric columns.
        """
        self.cat_rep, self.num_rep = self.__findReplacements(X=X)
        return self



    def transform(self, X: pd.DataFrame, y: Union[pd.Series, None] = None) -> pd.DataFrame:
        """
        Fills missing values with imputed values for categorical and numeric columns.
        """
        X_copy = X.copy()
        for col in cat_cols:
            X_copy[col] = X_copy[col].fillna(self.cat_rep[col])

        for col in num_cols:
            X_copy[col] = X_copy[col].fillna(self.num_rep[col])
        return X_copy




In [101]:
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer

pipeline_steps = [
        ("imputer", Imputer(cat_cols=cat_cols, num_cols=num_cols, num_method="mean")),
        ("scaler", ScaleData(num_cols=num_cols, method="minmax")),
        ("encoder", EncodelData(cat_cols=cat_cols, method="label"))
    ]
processing_pipeline = Pipeline(pipeline_steps)
processing_pipeline

0,1,2
,steps,"[('imputer', ...), ('scaler', ...), ...]"
,transform_input,
,memory,
,verbose,False


In [102]:
processed_df = processing_pipeline.fit_transform(df.head(100000))
processed_df.head()

Unnamed: 0,State,District,Market,Commodity,Variety,Grade,Arrival_Date,Min_Price,Max_Price,Modal_Price,Commodity_Code
0,0,19,85,101,214,2,0,0.02375,0.022941,0.023331,0.702632
1,0,19,470,169,46,2,0,0.006063,0.006059,0.006058,0.002632
2,0,19,597,101,214,2,0,0.026875,0.026471,0.026664,0.702632
3,0,19,636,74,11,2,0,0.015,0.023529,0.020604,0.344737
4,0,19,636,229,188,2,0,0.0225,0.028824,0.025755,0.684211


In [103]:
test = df.iloc[0:10]
test

Unnamed: 0,State,District,Market,Commodity,Variety,Grade,Arrival_Date,Min_Price,Max_Price,Modal_Price,Commodity_Code
0,Andhra Pradesh,Anantapur,Anantapur,Ground Nut Seed,Ground Nut Seed,Medium,2011-01-01,3800.0,3900.0,3850.0,268
1,Andhra Pradesh,Anantapur,Dharmavaram,Paddy (Dhan)(Common),B P T,Medium,2011-01-01,970.0,1030.0,1000.0,2
2,Andhra Pradesh,Anantapur,Guntakal,Ground Nut Seed,Ground Nut Seed,Medium,2011-01-01,4300.0,4500.0,4400.0,268
3,Andhra Pradesh,Anantapur,Hindupur,Dry Chillies,2nd Sort,Medium,2011-01-01,2400.0,4000.0,3400.0,132
4,Andhra Pradesh,Anantapur,Hindupur,Tamarind Fruit,Flower A/c,Medium,2011-01-01,3600.0,4900.0,4250.0,261
5,Andhra Pradesh,Anantapur,Kadiri,Sunflower Seed,Sunflower Seed,Medium,2011-01-01,2500.0,2600.0,2550.0,285
6,Andhra Pradesh,Anantapur,Kalyandurg,Tamarind Fruit,Non A/c Flower,Medium,2011-01-01,2800.0,3100.0,2900.0,261
7,Andhra Pradesh,Anantapur,Penukonda,Groundnut,Local,Medium,2011-01-01,2700.0,2800.0,2750.0,10
8,Andhra Pradesh,Anantapur,Rayadurg,Paddy (Dhan)(Common),B P T,Medium,2011-01-01,1000.0,1030.0,1015.0,2
9,Andhra Pradesh,Anantapur,Tadipatri,Coriander (Leaves),Local,Medium,2011-01-01,2500.0,2700.0,2600.0,43


TypeError: '<' not supported between instances of 'str' and 'int'