ML by simple linear regression and only 1 column (living area(total_area_sqm))

In [66]:
import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from sklearn.preprocessing import OrdinalEncoder
from sklearn.model_selection import train_test_split
from sklearn.linear_model import LinearRegression
from sklearn.preprocessing import StandardScaler
from sklearn.preprocessing import OneHotEncoder




In [67]:
dataset_path = "C:/Users/becod/AI/my-projects/immo-eliza-ml-Majid/immo-eliza-ml-Majid/becode_properties.csv"

basic_preprocessed_dataset = "C:/Users/becod/AI/my-projects/immo-eliza-ml-Majid/immo-eliza-ml-Majid/basic_preprocessed_dataset.csv"

# Define the path for saving the preprocessed dataset
advance_preprocessed_dataset = "C:/Users/becod/AI/my-projects/immo-eliza-ml-Majid/immo-eliza-ml-Majid/advance_preprocessed_dataset.csv"

# load the dataset
df = pd.read_csv(dataset_path)
df.columns


Index(['id', 'price', 'property_type', 'subproperty_type', 'region',
       'province', 'locality', 'zip_code', 'latitude', 'longitude',
       'construction_year', 'total_area_sqm', 'surface_land_sqm',
       'nbr_frontages', 'nbr_bedrooms', 'equipped_kitchen', 'fl_furnished',
       'fl_open_fire', 'fl_terrace', 'terrace_sqm', 'fl_garden', 'garden_sqm',
       'fl_swimming_pool', 'fl_floodzone', 'state_building',
       'primary_energy_consumption_sqm', 'epc', 'heating_type',
       'fl_double_glazing', 'cadastral_income'],
      dtype='object')

Basic Preprocessing Module

In [68]:
class BasicPreprocessing:
    def __init__(self, df):
        # Initialize the class with a copy of the DataFrame
        self.dataset = df.copy()
    
    #------------------------------------------------------------------------
    # Method to remove duplicate rows from the dataset
    def drop_duplicates(self):
        print("Original dataset size:", self.dataset.shape)
        self.dataset = self.dataset.drop_duplicates()
        print("After removing duplicates:", self.dataset.shape)
        
    #------------------------------------------------------------------------
    # Method to drop unnecessary columns from the dataset
    def drop_columns(self):
        columns_to_drop = ['id', 'subproperty_type', 'region', 'province', 'locality', 
                           'primary_energy_consumption_sqm', 'heating_type', 'cadastral_income']
        self.dataset = self.dataset.drop(columns=columns_to_drop)
        print("After dropping unnecessary columns:", self.dataset.shape)
    
    #------------------------------------------------------------------------
    # Method to drop rows with NaN values in specific columns
    def drop_na(self):
        columns_to_check_na = ['price', 'total_area_sqm', 'nbr_bedrooms']
        self.dataset = self.dataset.dropna(subset=columns_to_check_na)
        print(f"After dropping rows with NaN in {columns_to_check_na}: {self.dataset.shape}")
    #------------------------------------------------------------------------
    # Method to filter out rows where specific columns have zero values
    def filter_non_zero(self):
        columns_to_check_zero = ['total_area_sqm', 'nbr_bedrooms']
        condition = (self.dataset[columns_to_check_zero] > 0).all(axis=1)
        self.dataset = self.dataset[condition]
        print(f"After filtering rows with zero in {columns_to_check_zero}: {self.dataset.shape}")
    #------------------------------------------------------------------------
    # Method to replace specific missing values with NaN
    def replace_special_missing(self):
        self.dataset.replace(["", " ", "MISSING", 0], np.nan, inplace=True)
    #------------------------------------------------------------------------
    # Method to fill NaN values with zero in specified columns
    def fill_na_with_zero(self):
        columns_to_fill_zero = [
            'fl_furnished', 'fl_open_fire', 'fl_terrace', 'fl_garden',
            'fl_swimming_pool', 'fl_floodzone', 'fl_double_glazing'
        ]
        self.dataset[columns_to_fill_zero] = self.dataset[columns_to_fill_zero].fillna(0)
        print(f"NaNs replaced with 0 for columns: {columns_to_fill_zero}")
    #------------------------------------------------------------------------
    # Method to fill NaN values with a weighted random distribution in specified columns
    def fill_na_with_random_distribution(self):
        columns_to_fill_random_distribution = [
            'construction_year', 'total_area_sqm', 'surface_land_sqm',
            'nbr_frontages', 'nbr_bedrooms', 'equipped_kitchen',
            'terrace_sqm', 'garden_sqm', 'state_building', 'epc','latitude', 'longitude'
        ]
        for column in columns_to_fill_random_distribution:
            value_counts = self.dataset[column].value_counts(normalize=True)
            values = value_counts.index.tolist()
            probabilities = value_counts.values
            self.dataset[column] = self.dataset[column].apply(
                lambda x: np.random.choice(values, p=probabilities) if pd.isna(x) else x
            )
        print(f"NaNs filled with random distribution for columns: {columns_to_fill_random_distribution}")

    #------------------------------------------------------------------------
    def remove_outliers(self):
        Q1 = self.dataset['total_area_sqm'].quantile(0.25)
        Q3 = self.dataset['total_area_sqm'].quantile(0.75)
        IQR = Q3 - Q1
        lower_bound = Q1 - 1.5 * IQR
        upper_bound = Q3 + 1.5 * IQR
        self.dataset = self.dataset[(self.dataset['total_area_sqm'] >= lower_bound) & (self.dataset['total_area_sqm'] <= upper_bound)]
        print("After removing outliers in 'total_area_sqm':", self.dataset.shape)

    #------------------------------------------------------------------------
    # Main method to process the dataset through all the steps
    def process_basic(self):
        
        self.drop_duplicates()  # Remove duplicate rows
        self.drop_columns()  # Drop unnecessary columns
        self.replace_special_missing()  # Replace special missing values with NaN
        self.drop_na()  # Drop rows with NaN in essential columns
        self.filter_non_zero()  # Filter out rows with zero values in key columns
        self.fill_na_with_zero()  # Fill NaN values with zero in specific columns
        self.fill_na_with_random_distribution()  # Fill NaN values using a random distribution
        self.remove_outliers()  # Remove outliers

        return self.dataset  # Return the processed DataFrame

    
# Instantiate the class and process the DataFrame
basic_preprocessor = BasicPreprocessing(df)
basic_preprocessed_data = basic_preprocessor.process_basic()

# Save the cleaned data to a CSV file
basic_preprocessed_data.to_csv(basic_preprocessed_dataset, index=False)


Original dataset size: (75511, 30)
After removing duplicates: (75511, 30)
After dropping unnecessary columns: (75511, 22)
After dropping rows with NaN in ['price', 'total_area_sqm', 'nbr_bedrooms']: (66349, 22)
After filtering rows with zero in ['total_area_sqm', 'nbr_bedrooms']: (66349, 22)
NaNs replaced with 0 for columns: ['fl_furnished', 'fl_open_fire', 'fl_terrace', 'fl_garden', 'fl_swimming_pool', 'fl_floodzone', 'fl_double_glazing']
NaNs filled with random distribution for columns: ['construction_year', 'total_area_sqm', 'surface_land_sqm', 'nbr_frontages', 'nbr_bedrooms', 'equipped_kitchen', 'terrace_sqm', 'garden_sqm', 'state_building', 'epc', 'latitude', 'longitude']
After removing outliers in 'total_area_sqm': (62553, 22)


In [69]:
basic_preprocessed_data.head()

Unnamed: 0,price,property_type,zip_code,latitude,longitude,construction_year,total_area_sqm,surface_land_sqm,nbr_frontages,nbr_bedrooms,...,fl_open_fire,fl_terrace,terrace_sqm,fl_garden,garden_sqm,fl_swimming_pool,fl_floodzone,state_building,epc,fl_double_glazing
0,225000.0,APARTMENT,2050,51.217172,4.379982,1963.0,100.0,294.0,2.0,2.0,...,0.0,1.0,5.0,0.0,79.0,0.0,0.0,TO_RENOVATE,C,1.0
2,335000.0,APARTMENT,1070,50.842043,4.334543,1900.0,142.0,200.0,2.0,3.0,...,0.0,1.0,28.0,0.0,352.0,0.0,1.0,AS_NEW,G,0.0
3,501000.0,HOUSE,2275,51.238312,4.817192,2024.0,187.0,505.0,2.0,3.0,...,0.0,0.0,6.0,0.0,45.0,0.0,1.0,GOOD,A,0.0
4,982700.0,APARTMENT,1410,50.754677,3.092787,2022.0,169.0,305.0,2.0,2.0,...,0.0,1.0,20.0,1.0,142.0,0.0,0.0,AS_NEW,A+,0.0
5,548514.0,HOUSE,1700,50.456027,4.686474,1991.0,187.0,710.0,4.0,3.0,...,0.0,0.0,13.0,0.0,72.0,0.0,1.0,AS_NEW,A,0.0


In [70]:
basic_preprocessed_data.columns

Index(['price', 'property_type', 'zip_code', 'latitude', 'longitude',
       'construction_year', 'total_area_sqm', 'surface_land_sqm',
       'nbr_frontages', 'nbr_bedrooms', 'equipped_kitchen', 'fl_furnished',
       'fl_open_fire', 'fl_terrace', 'terrace_sqm', 'fl_garden', 'garden_sqm',
       'fl_swimming_pool', 'fl_floodzone', 'state_building', 'epc',
       'fl_double_glazing'],
      dtype='object')

Advance Preprocessing Module

In [71]:
class AdvancedPreprocessing:
    #------------------------------------------------------------------------
    def __init__(self, basic_preprocessed_data):
        self.dataset = basic_preprocessed_data.copy()
        self.existing_columns_to_standardize = ['total_area_sqm', 'surface_land_sqm', 'terrace_sqm', 'garden_sqm']
        self.new_columns_to_standardize = ['bedrooms_per_sqm']

    #------------------------------------------------------------------------
    def create_new_features(self):

        #--------------------- bedrooms_per_sqm
        # crete new feature 'bedrooms_per_sqm' to compute density of rooms per square meter of area
        # Replace zero in 'total_area_sqm' with a minimum value (e.g., 1) to avoid division by zero
        self.dataset.loc[:, 'total_area_sqm'] = self.dataset['total_area_sqm'].replace(0, 1)

        # Create new feature 'bedrooms_per_sqm'
        self.dataset['bedrooms_per_sqm'] = self.dataset['nbr_bedrooms'] / self.dataset['total_area_sqm']
        self.dataset.loc[:, 'bedrooms_per_sqm'] = self.dataset['bedrooms_per_sqm'].replace([np.inf, -np.inf], np.nan)  # Replace inf and -inf with NaN
        mean_value_bedrooms = self.dataset['bedrooms_per_sqm'].mean()
        self.dataset.loc[:, 'bedrooms_per_sqm'] = self.dataset['bedrooms_per_sqm'].fillna(mean_value_bedrooms)  # Fill NaNs with mean

        # Standardize the values
        scaler_bedrooms = StandardScaler()
        self.dataset['bedrooms_per_sqm_scaled'] = scaler_bedrooms.fit_transform(self.dataset[['bedrooms_per_sqm']])

        #--------------------- total_area_per_bedroom
        # create new feature 'total_area_per_bedroom' to compute average area per bedroom in the property
        # Replace zero in 'nbr_bedrooms' with NaN to avoid division by zero
        self.dataset.loc[:, 'nbr_bedrooms'] = self.dataset['nbr_bedrooms'].replace(0, np.nan)

        # Create new feature 'total_area_per_bedroom'
        self.dataset['total_area_per_bedroom'] = self.dataset['total_area_sqm'] / self.dataset['nbr_bedrooms']
        self.dataset.loc[:, 'total_area_per_bedroom'] = self.dataset['total_area_per_bedroom'].replace([np.inf, -np.inf], np.nan)  # Replace inf and -inf with NaN
        mean_value_area_per_bedroom = self.dataset['total_area_per_bedroom'].mean()
        self.dataset.loc[:, 'total_area_per_bedroom'] = self.dataset['total_area_per_bedroom'].fillna(mean_value_area_per_bedroom)  # Fill NaNs with mean

        # Standardize the values
        scaler_bedroom_area = StandardScaler()
        self.dataset['total_area_per_bedroom_scaled'] = scaler_bedroom_area.fit_transform(self.dataset[['total_area_per_bedroom']])

        #--------------------- price_per_total_area
        # crete new feature 'price_per_total_area'
        self.dataset['price_per_total_area'] = self.dataset['price'] / self.dataset['total_area_sqm']  # create new feature
        self.dataset.loc[:, 'price_per_total_area'] = self.dataset['price_per_total_area'].replace([np.inf, -np.inf], np.nan)  # Replace inf and -inf with NaN
        mean_price_per_area = self.dataset['price_per_total_area'].mean()
        self.dataset.loc[:, 'price_per_total_area'] = self.dataset['price_per_total_area'].fillna(mean_price_per_area)  # Fill NaNs with mean

        # Standardize the values
        scaler_price = StandardScaler()
        self.dataset['price_per_total_area_scaled'] = scaler_price.fit_transform(self.dataset[['price_per_total_area']])

    #------------------------------------------------------------------------
    def encode_standardize_categories_features(self):

        #--------------------- encode and standardize 'equipped_kitchen'
        kitchen_order = ['unknown', 'NOT_INSTALLED', "USA_UNINSTALLED", "SEMI_EQUIPPED", "USA_SEMI_EQUIPPED", 
                        "INSTALLED", "USA_INSTALLED", "HYPER_EQUIPPED", "USA_HYPER_EQUIPPED"]
        
        # Check if all unique values in the column are in the defined order
        kitchen_values = set(self.dataset['equipped_kitchen'].unique())
        unmatched_values = kitchen_values - set(kitchen_order)
        if unmatched_values:
            # print(f"Warning: The following values in 'equipped_kitchen' are not in the defined order list: {unmatched_values}")
            # Replace unmatched values with 'unknown'
            self.dataset['equipped_kitchen'].replace(list(unmatched_values), 'unknown', inplace=True)

        # Proceed with encoding
        encoder_kit = OrdinalEncoder(categories=[kitchen_order])
        self.dataset['kitchen_type_encoded'] = encoder_kit.fit_transform(self.dataset[['equipped_kitchen']])
        
        # Standardize 'kitchen_type_encoded' and convert it to integer
        scaler_kitchen = StandardScaler()
        self.dataset['kitchen_type_encoded'] = scaler_kitchen.fit_transform(self.dataset[['kitchen_type_encoded']])
        self.dataset['kitchen_type_encoded'] = self.dataset['kitchen_type_encoded'].astype(int)  # Convert to integer

        #--------------------- encode and standardize 'state_building'
        building_order = ['unknown', "TO_RESTORE", "TO_RENOVATE", "TO_BE_DONE_UP", "GOOD", "JUST_RENOVATED", "AS_NEW"]

        # Check if all unique values in the column are in the defined order
        building_values = set(self.dataset['state_building'].unique())
        unmatched_values = building_values - set(building_order)
        if unmatched_values:
            # print(f"Warning: The following values in 'state_building' are not in the defined order list: {unmatched_values}")
            # Replace unmatched values with 'unknown'
            self.dataset['state_building'].replace(list(unmatched_values), 'unknown', inplace=True)

        # Proceed with encoding
        encoder_bul = OrdinalEncoder(categories=[building_order])
        self.dataset['Bulding_sta_encoded'] = encoder_bul.fit_transform(self.dataset[['state_building']])
        
        # Standardize 'Bulding_sta_encoded'and convert it to integer
        scaler_building = StandardScaler()
        self.dataset['Bulding_sta_encoded'] = scaler_building.fit_transform(self.dataset[['Bulding_sta_encoded']])
        self.dataset['Bulding_sta_encoded'] = self.dataset['Bulding_sta_encoded'].astype(int)  # Convert to integer
                
        #--------------------- encode and standardize 'epc'
        epc_order = ['G', 'F', 'E', 'D', 'C', 'B', 'A', 'A+', 'A++']
        
        # Check if all unique values in the column are in the defined order
        epc_values = set(self.dataset['epc'].unique())
        unmatched_values = epc_values - set(epc_order)
        if unmatched_values:
            # print(f"Warning: The following values in 'epc' are not in the defined order list: {unmatched_values}")
            # Replace unmatched values with 'F' as a default (lowest category)
            self.dataset['epc'].replace(list(unmatched_values), 'F', inplace=True)

        # Proceed with encoding
        encoder_epc = OrdinalEncoder(categories=[epc_order])
        self.dataset['epc_encoded'] = encoder_epc.fit_transform(self.dataset[['epc']])
        
        # Standardize 'epc_encoded' and convert it to integer
        scaler_epc = StandardScaler()
        self.dataset['epc_encoded'] = scaler_epc.fit_transform(self.dataset[['epc_encoded']])
        self.dataset['epc_encoded'] = self.dataset['epc_encoded'].astype(int)  # Convert to integer

        print("Data after encoding and standardization.")
        
    #------------------------------------------------------------------------
  
    def categorize_construction_year(self):
         
        # Replace missing values with the median of the column
        self.dataset['construction_year'] = self.dataset['construction_year'].fillna(self.dataset['construction_year'].median())

        # Create a new column 'construction_category' with zip_code // 10
        self.dataset['construction_category'] = (self.dataset['construction_year'] // 10).astype(int)
        # for bulding older that 1900, same category in 1900
        self.dataset['construction_category'] = self.dataset['construction_category'].apply(lambda x: max(x, 190))

        # Initialize the OneHotEncoder with the updated parameter
        encoder = OneHotEncoder(sparse_output=False)
        
        # Fit and transform the zip_code column
        construction_category_encoded = encoder.fit_transform(self.dataset[['construction_category']])
        
        # Create a DataFrame from the encoded zip codes with appropriate column names
        encoded_df = pd.DataFrame(construction_category_encoded, columns=encoder.get_feature_names_out(['construction_category']))
        
        # Combine the original dataset with the new encoded columns
        self.dataset = pd.concat([self.dataset.reset_index(drop=True), encoded_df], axis=1)
        
        # Optionally drop the original 'zip_code' column if not needed
        # self.dataset.drop(['zip_code'], axis=1, inplace=True)

        # def year_category(year):
        #     if year < 1950:
        #         return 'veryold' 
        #     elif 1950 <= year < 1970:
        #         return 'old'  
        #     elif 1970 <= year < 1990:
        #         return 'midold' 
        #     elif 1990 <= year < 2010:
        #         return 'mid' 
        #     else:
        #         return 'new' 

        # self.dataset['construction_category'] = self.dataset['construction_year'].apply(year_category)
        # # print("Construction year categorized:\n", self.dataset[['construction_category']].head())
        
        # # Apply one-hot encoding to construction_category
        # self.dataset = pd.get_dummies(self.dataset, columns=['construction_category'], drop_first=True)
 
        # # Convert all one-hot encoded columns to integers (0 and 1) 
        # one_hot_columns = [col for col in self.dataset.columns if 'construction_category_' in col]
        # self.dataset[one_hot_columns] = self.dataset[one_hot_columns].astype(int)



    # ---------------------------------------------------------------------

    # def calculate_ratio(self):

    #     #--------------------- 
    #     # calculate_terrace_ratio_apartment
    #     if 'property_type' in self.dataset.columns:
    #         # Prevent division by zero
    #         self.dataset.loc[:, 'total_area_sqm'] = self.dataset['total_area_sqm'].replace(0, 1)

    #         # Calculate terrace ratio for apartments
    #         self.dataset.loc[:, 'terrace_ratio'] = np.where(
    #             self.dataset['property_type'] == 'apartment',
    #             self.dataset['terrace_sqm'] / self.dataset['total_area_sqm'],
    #             np.nan
    #         )
    #         # Replace inf and -inf with NaN and fill NaNs with 0
    #         self.dataset.loc[:, 'terrace_ratio'] = self.dataset['terrace_ratio'].replace([np.inf, -np.inf], np.nan)
    #         self.dataset.loc[:, 'terrace_ratio'] = self.dataset['terrace_ratio'].fillna(0)
    #     else:
    #         print("Warning: 'property_type' column not found.")

    #     #--------------------- 
    #     # calculate_land_area_ratio_house
    #     if 'property_type' in self.dataset.columns:
    #         # Prevent division by zero
    #         self.dataset.loc[:, 'total_area_sqm'] = self.dataset['total_area_sqm'].replace(0, 1)

    #         # Calculate land area ratio for houses
    #         self.dataset.loc[:, 'land_area_ratio'] = np.where(
    #             self.dataset['property_type'] == 'house',
    #             self.dataset['surface_land_sqm'] / self.dataset['total_area_sqm'],
    #             np.nan
    #         )
    #         # Replace inf and -inf with NaN and fill NaNs with 0
    #         self.dataset.loc[:, 'land_area_ratio'] = self.dataset['land_area_ratio'].replace([np.inf, -np.inf], np.nan)
    #         self.dataset.loc[:, 'land_area_ratio'] = self.dataset['land_area_ratio'].fillna(0)
    #     else:
    #         print("Warning: 'property_type' column not found.")

     # ---------------------------------------------------------------------

    def encode_zip_code(self):
        """
        Encodes the 'zip_code' column using OneHotEncoder, adds the encoded columns to the dataset,
        and creates a new 'zip_code_cut' column with the integer division of zip_code by 100.
        """
        if 'zip_code' not in self.dataset.columns:
            raise ValueError("The dataset does not have a 'zip_code' column.")
        
        # Create a new column 'zip_code_cut' with zip_code // 100
        self.dataset['zip_code_cut'] = self.dataset['zip_code'] // 100
        
        # Initialize the OneHotEncoder with the updated parameter
        encoder = OneHotEncoder(sparse_output=False)
        
        # Fit and transform the zip_code column
        zip_code_encoded = encoder.fit_transform(self.dataset[['zip_code_cut']])
        
        # Create a DataFrame from the encoded zip codes with appropriate column names
        encoded_df = pd.DataFrame(zip_code_encoded, columns=encoder.get_feature_names_out(['zip_code_cut']))
        
        # Combine the original dataset with the new encoded columns
        self.dataset = pd.concat([self.dataset.reset_index(drop=True), encoded_df], axis=1)
        
        # Optionally drop the original 'zip_code' column if not needed
        # self.dataset.drop(['zip_code'], axis=1, inplace=True)
        
        return self.dataset


    # ---------------------------------------------------------------------

    def process_advanced(self):

        self.create_new_features()                         
        self.categorize_construction_year() 
        self.encode_standardize_categories_features()
        #self.calculate_ratio()
        self.encode_zip_code()

        return self.dataset

    #------------------------------------------------------------------------
    
advanced_preprocessor = AdvancedPreprocessing(basic_preprocessed_data)
advance_preprocessed_data = advanced_preprocessor.process_advanced()
advance_preprocessed_data.to_csv(advance_preprocessed_dataset, index=False)




Data after encoding and standardization.


In [72]:
advance_preprocessed_data.columns
#advance_preprocessed_data.head()


Index(['price', 'property_type', 'zip_code', 'latitude', 'longitude',
       'construction_year', 'total_area_sqm', 'surface_land_sqm',
       'nbr_frontages', 'nbr_bedrooms',
       ...
       'zip_code_cut_90', 'zip_code_cut_91', 'zip_code_cut_92',
       'zip_code_cut_93', 'zip_code_cut_94', 'zip_code_cut_95',
       'zip_code_cut_96', 'zip_code_cut_97', 'zip_code_cut_98',
       'zip_code_cut_99'],
      dtype='object', length=126)

In [73]:
# Keep only rows with price less than or equal to 500,000 euros
dataset_for_model = advance_preprocessed_data[advance_preprocessed_data['price'] <= 500_000]
