In [None]:
import torch
import numpy as np
import pandas as pd
import time
from datetime import datetime
from sklearn.preprocessing import OneHotEncoder
import re
from sklearn.preprocessing import MinMaxScaler, StandardScaler
from sklearn.model_selection import train_test_split
from torch.utils.data import TensorDataset, DataLoader
import matplotlib.pyplot as plt
import seaborn as sns
from itertools import product

In [None]:
data = pd.read_csv("./Bengaluru_House_Data.csv")
data

In [None]:
data.describe(include='all')  # For both numerical and categorical columns


In [None]:
data.isnull().sum()

In [None]:
data = data.drop(columns = ['society'])

In [None]:
# Assume `data` is your original DataFrame
df_cleaned = data.copy()  # Make a copy to avoid modifying the original data

for col in df_cleaned.columns:
    if df_cleaned[col].isnull().sum() > 0:
        if df_cleaned[col].dtype in [np.float64, np.int64]:
            median_val = df_cleaned[col].median()
            df_cleaned[col] = df_cleaned[col].fillna(median_val)  # Direct assignment
            print(f"Filled missing numeric values in '{col}' with median: {median_val}")
        else:
            mode_val = df_cleaned[col].mode()[0]
            df_cleaned[col] = df_cleaned[col].fillna(mode_val)  # Direct assignment
            print(f"Filled missing categorical values in '{col}' with mode: {mode_val}")

# Check if missing values are filled
print(df_cleaned.isnull().sum())  # Should print 0 for all columns


In [None]:
df_cleaned.info()

In [None]:
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns

# Load your dataframe (assuming it's already loaded as 'df')

# Define categorical columns
categorical_columns = ['area_type', 'availability', 'location', 'size']

# Set the figure size
plt.figure(figsize=(15, 10))

# Plot each categorical column
for i, col in enumerate(categorical_columns, 1):
    plt.subplot(3, 2, i)
    sns.countplot(y=df_cleaned[col], order=df_cleaned[col].value_counts().index, hue=df_cleaned[col], legend=False, palette="viridis")
    plt.title(f"Distribution of {col}")
    plt.xlabel("Count")
    plt.ylabel(col)
    plt.xticks(rotation=45)

plt.tight_layout()
plt.show()


In [None]:
# Select the columns of interest
columns = ["bath", "balcony", "price"]

# Plot histograms for each column
plt.figure(figsize=(12, 5))

for i, col in enumerate(columns, 1):
    plt.subplot(1, 3, i)
    df_cleaned[col].hist(bins=30, edgecolor="black", alpha=0.7)
    plt.xlabel(col)
    plt.ylabel("Count")
    plt.title(f"Distribution of {col}")

plt.tight_layout()
plt.show()


In [None]:
df_final = df_cleaned

In [None]:
df_final = df_final.drop(columns = ["location"])

In [None]:

#one hot encoding area_type
encoder = OneHotEncoder(sparse_output=False)  # Set sparse_output=False to get a dense array
encoded_array = encoder.fit_transform(data[['area_type']])
# Convert back to DataFrame with proper column names
encoded_df = pd.DataFrame(encoded_array, columns=encoder.get_feature_names_out(['area_type']))
# Concatenate with original DataFrame (if needed)
df_final = pd.concat([df_final, encoded_df], axis=1).drop(columns=['area_type'])

encoded_array = encoder.fit_transform(data[['availability']])
# Convert back to DataFrame with proper column names
encoded_df = pd.DataFrame(encoded_array, columns=encoder.get_feature_names_out(['availability']))
# Concatenate with original DataFrame (if needed)
df_final = pd.concat([df_final, encoded_df], axis=1).drop(columns=['availability'])

# encoded_array = encoder.fit_transform(data[['location']])
# # Convert back to DataFrame with proper column names
# encoded_df = pd.DataFrame(encoded_array, columns=encoder.get_feature_names_out(['location']))
# # Concatenate with original DataFrame (if needed)
# df_final = pd.concat([df_final, encoded_df], axis=1).drop(columns=['location'])

encoded_array = encoder.fit_transform(data[['size']])
# Convert back to DataFrame with proper column names
encoded_df = pd.DataFrame(encoded_array, columns=encoder.get_feature_names_out(['size']))
# Concatenate with original DataFrame (if needed)
df_final = pd.concat([df_final, encoded_df], axis=1).drop(columns=['size'])

df_final.info()



In [None]:
# Conversion function to encode the total_sqft
# Conversion factors to square feet
conversion_factors = {
    "Sq. Meter": 10.764,  # 1 square meter = 10.764 square feet
    "Sq. Yards": 9,       # 1 square yard = 9 square feet
    "Acres": 43560,       # 1 acre = 43560 square feet
    "Cents": 435.6,       # 1 cent = 435.6 square feet
    "Guntha": 1089,       # 1 guntha = 1089 square feet
    "Grounds": 2400,      # 1 ground = 2400 square feet
    "Perch": 272.25       # 1 perch = 272.25 square feet
}

def convert_to_sqft(value):
    try:
        value = str(value).strip()  # Ensure value is a string
        
        # Case 1: Direct number (integer or float) (e.g., '1056', '1034.45')
        if re.match(r'^\d+(\.\d+)?$', value):  
            return float(value)  # Convert to float
        
        # Case 2: Range (e.g., '1133 - 1384') → Take the average
        if '-' in value:
            low, high = value.split('-')
            return (float(low.strip()) + float(high.strip())) / 2

        # Case 3: Handling area units (e.g., '4125Perch', '1000Sq. Meter')
        match = re.match(r'([\d.]+)\s*([A-Za-z. ]+)', value)
        if match:
            num, unit = match.groups()
            num = float(num)
            unit = unit.strip()
            
            # Match known area units
            for key in conversion_factors.keys():
                if key in unit:
                    return num * conversion_factors[key]

        # Case 4: If nothing matched, return None
        print(value)
        return None

    except Exception as e:
        print(f"Error converting value: {value}, {e}")
        return None
# Apply conversion
df_final['total_sqft'] = df_final['total_sqft'].apply(convert_to_sqft).astype(float)

df_final.info()

In [None]:
numeric_cols = ['total_sqft', 'bath', 'balcony', 'price']

# Function to remove outliers using IQR
def remove_outliers_iqr(df, column):
    Q1 = df[column].quantile(0.25)  # First quartile (25th percentile)
    Q3 = df[column].quantile(0.75)  # Third quartile (75th percentile)
    IQR = Q3 - Q1  # Interquartile range
    lower_bound = Q1 - 1.5 * IQR  # Lower bound
    upper_bound = Q3 + 1.5 * IQR  # Upper bound

    # Keep only values within bounds
    return df[(df[column] >= lower_bound) & (df[column] <= upper_bound)]

# Apply the function to all numeric columns
for col in numeric_cols:
    df_final = remove_outliers_iqr(df_final, col)

# Print new shape after outlier removal
print(f"New dataset shape: {df_final.shape}")

In [None]:
scaler = MinMaxScaler()
# df_final['availability'] = scaler.fit_transform(df_final[['availability']])
# df_final['size'] = scaler.fit_transform(df_final[['size']])
df_final['bath'] = scaler.fit_transform(df_final[['bath']])
df_final['balcony'] = scaler.fit_transform(df_final[['balcony']])
df_final['total_sqft'] = scaler.fit_transform(df_final[['total_sqft']])

In [None]:


# Drop the 'price' column to get X (independent variables)     [['total_sqft', 'bath', 'balcony']]
X = df_final.drop(columns=['price'])

# Select only the 'price' column for y (target variable)
y = df_final['price']

# Splitting data into training (70%), validation (15%), and test (15%) sets
X_train, X_temp, y_train, y_temp = train_test_split(X, y, test_size=0.3, random_state=42)

# Further split temp data into validation (15%) and test (15%)
X_val, X_test, y_val, y_test = train_test_split(X_temp, y_temp, test_size=0.5, random_state=42)

# print(X_train.dtypes)  # Ensure all are numeric
# print(X_train.isna().sum())  # Ensure no missing values

# Convert to PyTorch tensors
X_train = torch.tensor(X_train.values, dtype=torch.float32)
X_val = torch.tensor(X_val.values, dtype=torch.float32)
X_test = torch.tensor(X_test.values, dtype=torch.float32)

y_train = torch.tensor(y_train.values, dtype=torch.float32).unsqueeze(1)  # Ensure it's 2D
y_val = torch.tensor(y_val.values, dtype=torch.float32).unsqueeze(1)
y_test = torch.tensor(y_test.values, dtype=torch.float32).unsqueeze(1)

print("Training Tensor Shape:", X_train.shape, y_train.shape)
print("Validation Tensor Shape:", X_val.shape, y_val.shape)
print("Test Tensor Shape:", X_test.shape, y_test.shape)

In [None]:

class MLP:
    def __init__(self, input_size, output_size, hidden_sizes, learning_rate, activation_function, loss_function):
        self.learning_rate = learning_rate
        self.activation = activation_function
        self.hidden_sizes = hidden_sizes
        self.loss_function = loss_function.lower()

    def _Initialize(self, input_size, output_size):
        self.weights = []
        self.bias = []
        prev_size = input_size
        for hidden_size in self.hidden_sizes:
            self.weights.append(torch.randn(hidden_size, prev_size, dtype=torch.float32) * 0.01)
            self.bias.append(torch.randn(hidden_size, 1, dtype=torch.float32) * 0.01)
            prev_size = hidden_size
    
        self.weights.append(torch.randn(output_size, prev_size, dtype=torch.float32) * 0.01)
        self.bias.append(torch.randn(output_size, 1, dtype=torch.float32) * 0.01)

    def _Activation(self, x):
        if self.activation == 'relu':
            return torch.relu(x)
        elif self.activation == 'sigmoid':
            return torch.sigmoid(x)
        elif self.activation == 'tanh':
            return torch.tanh(x)
        elif self.activation == 'linear':
            return x

    def _MSE(self, y, y_hat):
        return torch.mean((y - y_hat) ** 2)

    def _loss(self, y, y_hat):
        return self._MSE(y, y_hat)

    def _Forward(self, X):
        activations = X
        self.layer_inputs = []
        self.z = []

        for i in range(len(self.weights) - 1):
            z = torch.mm(activations, self.weights[i].T) + self.bias[i].T
            activations = self._Activation(z)
            self.layer_inputs.append(activations)
            self.z.append(z)

        z = torch.mm(activations, self.weights[-1].T) + self.bias[-1].T
        self.layer_inputs.append(z)
        return z
        
    def _Backward(self, X, y, y_hat):
        dz = 2 * (y_hat - y) / X.shape[0]
        grads_w = []
        grads_b = []

        for i in range(len(self.weights) - 1, -1, -1):
            dw = torch.mm(dz.T, self.layer_inputs[i - 1] if i > 0 else X)
            db = torch.sum(dz, dim=0, keepdim=True).T
            if i > 0:
                dz = torch.mm(dz, self.weights[i]) * (self.z[i - 1] > 0).float()
            grads_w.insert(0, dw)
            grads_b.insert(0, db)

        for i in range(len(self.weights)):
            self.weights[i] -= self.learning_rate * grads_w[i]
            self.bias[i] -= self.learning_rate * grads_b[i]
   
    def batch_fit(self, X, y, X_val, y_val, epochs, showloss=False):
        self._Initialize(X.shape[1], y.shape[1])
        train_losses = []
        val_losses = []
        for epoch in range(epochs):
            y_hat = self._Forward(X)
            train_loss = self._loss(y, y_hat)
            train_losses.append(train_loss.item())
            self._Backward(X, y, y_hat)
            
            # Compute validation loss
            with torch.no_grad():
                y_val_pred = self._Forward(X_val)
                val_loss = self._loss(y_val, y_val_pred)
                val_losses.append(val_loss.item())
            
            if epoch % 10 == 0 and showloss:
                print(f'Epoch {epoch}, Train Loss: {train_loss.item():.4f}, Val Loss: {val_loss.item():.4f}')
        return train_losses, val_losses
            
    def Mini_batch_fit(self, X, y, X_val, y_val, epochs, batch_size, showloss=False):
        dataset = TensorDataset(X, y)
        dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)
        self._Initialize(X.shape[1], y.shape[1])
        train_losses = []
        val_losses = []
        for epoch in range(epochs):
            for X_batch, y_batch in dataloader:
                y_hat = self._Forward(X_batch)
                self._Backward(X_batch, y_batch, y_hat)
            with torch.no_grad():
                y_pred = self.predict(X)
                train_loss = self._loss(y, y_pred)
                train_losses.append(train_loss.item())
                y_val_pred = self.predict(X_val)
                val_loss = self._loss(y_val, y_val_pred)
                val_losses.append(val_loss.item())
            if epoch % 10 == 0 and showloss:
                print(f'Epoch {epoch}, Train Loss: {train_loss.item():.4f}, Val Loss: {val_loss.item():.4f}')
        return train_losses, val_losses
            
    def SGD_fit(self, X, y, X_val, y_val, epochs, showloss=False):
        self._Initialize(X.shape[1], y.shape[1])
        train_losses = []
        val_losses = []
        for epoch in range(epochs):
            for i in range(X.shape[0]):
                X_batch = X[i].unsqueeze(0)
                y_batch = y[i].unsqueeze(0)
                y_hat = self._Forward(X_batch)
                self._Backward(X_batch, y_batch, y_hat)
            with torch.no_grad():
                y_pred = self.predict(X)
                train_loss = self._loss(y, y_pred)
                train_losses.append(train_loss.item())
                y_val_pred = self.predict(X_val)
                val_loss = self._loss(y_val, y_val_pred)
                val_losses.append(val_loss.item())
            if showloss:
                print(f'Epoch {epoch}, Train Loss: {train_loss.item():.4f}, Val Loss: {val_loss.item():.4f}')
        return train_losses, val_losses
    
    def predict(self, X):
        return self._Forward(X)
    
    def _RMSE(self, y_true, y_pred):
        return torch.sqrt(torch.mean((y_true - y_pred) ** 2))

    def _R2_score(self, y_true, y_pred):
        ss_total = torch.sum((y_true - torch.mean(y_true)) ** 2)
        ss_residual = torch.sum((y_true - y_pred) ** 2)
        return 1 - (ss_residual / ss_total)

In [None]:
def experiment(X_train, y_train, X_val, y_val, X_test, y_test, learning_rates, epochs_list,
               architectures, activations, optimizers,
               device='cuda' if torch.cuda.is_available() else 'cpu'):
    # Move all data to GPU at the start
    X_train, y_train = X_train.to(device), y_train.to(device)
    X_val, y_val = X_val.to(device), y_val.to(device)
    X_test, y_test = X_test.to(device), y_test.to(device)

    results = []
    act_opt_scores = {}  # Dictionary to store scores by (activation, optimizer) pair

    # Iterate over all hyperparameter combinations
    for lr, epoch, arch, act, opt in product(learning_rates, epochs_list, architectures, activations, optimizers):
        print(f"\nTraining with lr={lr}, epochs={epoch}, arch={arch}, act={act}, opt={opt}")
        mlp = MLP(X_train.shape[1], y_train.shape[1], arch, lr, act, 'mse')

        # Train the model based on optimizer
        if opt == "Batch":
            train_losses, val_losses = mlp.batch_fit(X_train, y_train, X_val, y_val, epoch)
        elif opt == "Mini_Batch":
            train_losses, val_losses = mlp.Mini_batch_fit(X_train, y_train, X_val, y_val, epoch, batch_size=32)
        else:  # SGD
            train_losses, val_losses = mlp.SGD_fit(X_train, y_train, X_val, y_val, epoch)

        # Evaluate on validation set
        with torch.no_grad():
            y_val_pred = mlp.predict(X_val)
            val_mse = mlp._MSE(y_val, y_val_pred).item()
            val_rmse = mlp._RMSE(y_val, y_val_pred).item()
            val_r2 = mlp._R2_score(y_val, y_val_pred).item()
        val_metrics = {'mse': val_mse, 'rmse': val_rmse, 'r2': val_r2}
        
        # Evaluate on test set
        with torch.no_grad():
            y_test_pred = mlp.predict(X_test)
            test_mse = mlp._MSE(y_test, y_test_pred).item()
            test_rmse = mlp._RMSE(y_test, y_test_pred).item()
            test_r2 = mlp._R2_score(y_test, y_test_pred).item()
        test_metrics = {'mse': test_mse, 'rmse': test_rmse, 'r2': test_r2}

        results.append((lr, epoch, arch, act, opt, val_metrics, train_losses, val_losses))

        print(f"Validation Metrics: {val_metrics}")
        print(f"Test Metrics: {test_metrics}")

        # Store score for (activation, optimizer) pair (using mse, lower is better)
        key = (act, opt)
        score = val_metrics['mse']  # Lower MSE is better
        if key not in act_opt_scores or score < act_opt_scores[key][0]:
            act_opt_scores[key] = (score, val_metrics, test_metrics)

        # Plot training curves
        plt.figure(figsize=(10, 6))
        plt.plot(train_losses, label='Train Loss')
        plt.plot(val_losses, label='Validation Loss')
        plt.xlabel('Epoch')
        plt.ylabel('Loss')
        plt.title(f'Config: lr={lr}, arch={arch}, act={act}, opt={opt}')
        plt.legend()
        plt.grid(True)
        plt.show()

    # Sort results by mse (lower is better)
    results.sort(key=lambda x: x[5]['mse'])

    # Print best configurations
    print("\nTop 3 Configurations (based on validation set):")
    for res in results[:3]:
        print(f"LR: {res[0]}, Epochs: {res[1]}, Arch: {res[2]}, Act: {res[3]}, Opt: {res[4]}, "
              f"Val Metrics: {res[5]}")

    # Report ordered scores for each (activation, optimizer) combination
    print("\nOrdered Scores by Activation and Optimizer (based on validation set):")
    sorted_act_opt = sorted(act_opt_scores.items(), key=lambda x: x[1][0])  # Ascending because score is mse
    for (act, opt), (score, val_metrics, test_metrics) in sorted_act_opt:
        print(f"Act: {act}, Opt: {opt}, Score (mse): {val_metrics['mse']:.4f}, "
              f"Val Metrics: {val_metrics}, Test Metrics: {test_metrics}")

    # Identify and train best configuration
    best_config = results[0]
    print(f"\nBest Configuration: LR={best_config[0]}, Epochs={best_config[1]}, Arch={best_config[2]}, "
          f"Act={best_config[3]}, Opt={best_config[4]}")

    best_mlp = MLP(X_train.shape[1], y_train.shape[1], best_config[2], best_config[0],
                   best_config[3], "mse")  # Changed from "bce" to "mse" to match class
    if best_config[4] == "Batch":
        train_losses, val_losses = best_mlp.batch_fit(X_train, y_train, X_val, y_val, best_config[1])
    elif best_config[4] == "Mini_Batch":
        train_losses, val_losses = best_mlp.Mini_batch_fit(X_train, y_train, X_val, y_val, best_config[1], batch_size=32)
    else:
        train_losses, val_losses = best_mlp.SGD_fit(X_train, y_train, X_val, y_val, best_config[1])

    # Final evaluation on test set for best config
    with torch.no_grad():
        y_test_pred = best_mlp.predict(X_test)
        test_mse = best_mlp._MSE(y_test, y_test_pred).item()
        test_rmse = best_mlp._RMSE(y_test, y_test_pred).item()
        test_r2 = best_mlp._R2_score(y_test, y_test_pred).item()
    best_test_metrics = {'mse': test_mse, 'rmse': test_rmse, 'r2': test_r2}
    print(f"Final Test Metrics for Best Config: {best_test_metrics}")

    # Plot training curves for best config
    plt.figure(figsize=(10, 6))
    plt.plot(train_losses, label='Train Loss')
    plt.plot(val_losses, label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.title(f'Best Config: {best_config[3]}-{best_config[4]}-LR{best_config[0]}')
    plt.legend()
    plt.grid(True)
    plt.show()

    return results, best_config

In [None]:
# Define hyperparameter ranges
learning_rates = [0.001, 0.01]
epochs_list = [100]
architectures = [[64, 32], [128, 64, 32]]
activations = ['relu', 'sigmoid', 'tanh']
optimizers = ['Batch', 'Mini_Batch']

# Run experiment
results, best_config = experiment(X_train, y_train, X_val, y_val, X_test, y_test, learning_rates, epochs_list, architectures, activations, optimizers)

# Define hyperparameter ranges for SGD
learning_rates = [0.001, 0.01]
epochs_list = [10]
architectures = [[64, 32], [128, 64, 32]]
activations = ['relu', 'sigmoid', 'tanh']
optimizers = ['SGD']

results, best_config = experiment(X_train, y_train, X_val, y_val, X_test, y_test, learning_rates, epochs_list, architectures, activations, optimizers)