In [None]:
import os
from pathlib import Path
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
from joblib import parallel_backend
import seaborn as sns
import matplotlib.pyplot as plt
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import OneHotEncoder, StandardScaler
from sklearn.pipeline import Pipeline
from sklearn.compose import ColumnTransformer
from sklearn.ensemble import RandomForestRegressor
from sklearn.ensemble import GradientBoostingRegressor
from sklearn.model_selection import RandomizedSearchCV
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import cross_validate
from sklearn.metrics import make_scorer
from sklearn.base import BaseEstimator, TransformerMixin

In [None]:
PROJECT_ROOT = Path().resolve().parent
DATA_PATH = PROJECT_ROOT / 'data' / 'data.csv'

def load_data():
    df = pd.read_csv(DATA_PATH, delimiter=';')
    data = gpd.GeoDataFrame(df, crs="epsg:4326", geometry=[Point(xy) for xy in zip(df.Longitude, df.Latitude)])
    data = data.drop(['Latitude', 'Longitude'], axis=1)
    data.drop_duplicates(inplace=True)
    data.insert(2, 'PricePerSqM', (data['Price'] / data['Area']))
    return data

data = load_data()

In [None]:
pd.options.display.float_format = '{:.0f}'.format
print(data.describe(include=np.number, percentiles=[0.25, 0.75]))

In [None]:
plt.figure(figsize=(18, 12))

plt.subplot(3, 2, 1)
sns.histplot(data['Price'], bins=30, kde=True)
plt.title('Distribution of Price')

plt.subplot(3, 2, 2)
sns.boxplot(data=data, x='Price')
plt.title('Boxplot of Price')

plt.subplot(3, 2, 3)
sns.histplot(data['Area'], bins=30, kde=True)
plt.title('Distribution of Area')

plt.subplot(3, 2, 4)
sns.boxplot(data=data,  x='Area')
plt.title('Boxplot of Area')

plt.subplot(3, 2, 5)
sns.histplot(data['PricePerSqM'], bins=30, kde=True)
plt.title('Distribution of Price Per Sqauare Meter')

plt.subplot(3, 2, 6)
sns.boxplot(data=data,  x='PricePerSqM')
plt.title('Boxplot of Price Per Sqauare Meter')

plt.tight_layout()
plt.show()

We've identified there are outliers in our data. Thankfully, 70 million PLN for a flat is not a valid amount in Wrocław yet. To ensure our sample remains unbiased, let's remove the outliers using interquartile range method.

Besides that we can also observe right skewness of our data. In order to normalize our dataset I'll be using logarithmic transformation.

In [None]:
data = load_data()

def iqr_method(series, log=True, verbose=False):
    if log:
        series = np.log(series + 0.01)
    Q1 = np.percentile(series, 10)
    Q3 = np.percentile(series, 90)
    # interquartile range
    IQR = Q3 - Q1
    lower = Q1 - 1.5 * IQR
    upper = Q3 + 1.5 * IQR
    # print the range if verbose
    if log and verbose:
        print(np.exp(lower))
        print(np.exp(upper))
    elif verbose:
        print(lower)
        print(upper)
    return lower, upper

lower, upper = iqr_method(data['Price'])

plt.figure(figsize=(10, 15))

plt.subplot(3, 1, 1)
sns.histplot(np.log(data['Price']), bins=100)
plt.title('Log Price')

ax = plt.subplot(3, 1, 2)
sns.histplot(np.log(data['Price']), bins=100)
plt.title('Log Price Restricted')
ax.axvspan(min(np.log(data['Price'])), lower, facecolor='red', alpha=0.25)
ax.axvspan(upper, max(np.log(data['Price'])), facecolor='red', alpha=0.25)

plt.subplot(3, 1, 3)
sns.histplot(data['Price'], bins=100)
plt.title('Price')

# floor_no is restricted by the website
outlier_cols = [('Price', True), ('Area', True), ('PricePerSqM', True), ('Rooms_num', True), ('Build_year', False), ('Floors_num', True), ('Rent', True)]
verbose = False
for col, log in outlier_cols:
    lower, upper = iqr_method(data[col][data[col].notna()], log, verbose)
    series = np.log(data[col]) if log else data[col]
    data = data[data[col].isna() | series.between(lower, upper)]

print(data.describe(include=np.number))

In the example above, we applied a log transformation to the price data. The red areas represent the rejected outliers.

In [None]:
plt.figure(figsize=(18, 12))

plt.subplot(3, 2, 1)
sns.histplot(data['Price'], bins=30, kde=True)
plt.title('Distribution of Price')

plt.subplot(3, 2, 2)
sns.boxplot(data=data, x='Price')
plt.title('Boxplot of Price')

plt.subplot(3, 2, 3)
sns.histplot(data['Area'], bins=30, kde=True)
plt.title('Distribution of Area')

plt.subplot(3, 2, 4)
sns.boxplot(data=data,  x='Area')
plt.title('Boxplot of Area')

plt.subplot(3, 2, 5)
sns.histplot(data['PricePerSqM'], bins=30, kde=True)
plt.title('Distribution of Price Per Sqauare Meter')

plt.subplot(3, 2, 6)
sns.boxplot(data=data,  x='PricePerSqM')
plt.title('Boxplot of Price Per Sqauare Meter')

plt.tight_layout()
plt.show()

That's more like it.

In [None]:
f, ax = plt.subplots(figsize=(10, 8))
corr = data.corr(numeric_only=True)
sns.heatmap(corr, mask=np.zeros_like(corr, dtype=bool),
            cmap=sns.diverging_palette(220, 10, as_cmap=True),
            square=True, ax=ax)

In [None]:
def MAPE(y_true, y_pred):
    return np.mean(np.abs((y_true - y_pred) / y_true)) * 100
mape_scorer = make_scorer(MAPE)

In [None]:
class MissingBooleanImputer(BaseEstimator, TransformerMixin):
    def __init__(self):
        pass

    def fit(self, X, y=None):
        return self

    def transform(self, X):
        X_copy = X.copy()
        for feature in X_copy.columns:
            ser = X_copy[feature].astype("boolean")
            X_copy[feature] = ser.fillna(False)
        return X_copy

In [None]:
ALL_FEATURES=set(data.columns.values.tolist())
NUMERICAL_FEATURES={'Price', 'Area', 'PricePerSqM', 'Rooms_num', 'Rent', 'Build_year', 'Floors_num', 'Floor_no'} #[col for col in df.columns if data[col].dtype in ['int', 'float']]
CATEGORICAL_FEATURES={'District', 'Market', 'Construction_status'} #[col for col in data.columns if data[col].dtype == 'string']
BOOLEAN_FEATURES={'Garage', 'Lift', 'Basement', 'Balcony', 'Garden', 'Terrace'}

def get_pipeline(features, estimator):
    features_set = set(features)
     # preprocessing for numerical columns
    numerical_features = list(features_set.intersection(NUMERICAL_FEATURES))
    numerical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='mean')),  # replace NaN  values with the mean of the column
        #('scaler', StandardScaler())  # standardize
    ])
    # preprocessing for categorical columns
    categorical_features = list(features_set.intersection(CATEGORICAL_FEATURES))
    categorical_transformer = Pipeline(steps=[
        ('imputer', SimpleImputer(strategy='most_frequent')),  # replace NaN values with the most frequent value of the column
        ('onehot', OneHotEncoder(handle_unknown='ignore'))     # one-hot encode categorical features
    ])

    boolean_features = list(BOOLEAN_FEATURES)

    # combine preprocessing steps
    preprocessor = ColumnTransformer(
        transformers=[
            ('num', numerical_transformer, numerical_features),
            ('cat', categorical_transformer, categorical_features),
            ('bool', MissingBooleanImputer(), boolean_features)
        ])

    # define the model

    model = Pipeline(steps=[
        ('preprocessor', preprocessor),
        ('regressor', estimator)
    ])

    return model

In [None]:
def predict_price_random_forest(data, features, verbose, log):
    X = data[features] # features
    y = data['Price']  # target

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)

    estimator = RandomForestRegressor(random_state=1, max_depth=15, n_estimators=200)
    model = get_pipeline(features, estimator)

    # train the model
    model.fit(X_train, y_train)

    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)

    func = np.exp if log else lambda x: x

    X_train_predict = func(model.predict(X_train))
    X_test_predict  = func(model.predict(X_test))
    y_train_scaled  = func(y_train)
    y_test_scaled   = func(y_test)
    if verbose:
        print(f"Train MAE: {np.mean(np.abs(X_train_predict - y_train_scaled)):.2f}")
        print(f"Test MAE: {np.mean(np.abs(X_test_predict - y_test_scaled)):.2f}")
        print(f"Train MAPE: {np.mean(np.abs((X_train_predict - y_train_scaled) / y_train_scaled)):.2f}")
        print(f"Test MAPE: {np.mean(np.abs((X_test_predict - y_test_scaled) / y_test_scaled)):.2f}")
        print(f"Train R^2 score: {train_score:.2f}")
        print(f"Test R^2 score: {test_score:.2f}")

    y_pred = func(model.predict(X_test))
    results = X_test.copy()
    results['Actual'] = y_test_scaled.astype(int)
    results['Predicted'] = y_pred.astype(int)
    pd.options.display.max_colwidth = 100
    print(results[['Actual', 'Predicted']].join(data['Link']).head(10))

I'm not using the hyper-parameters found during Grid Search because it barely makes a difference and takes 10 times the time.

In [None]:
pd.options.display.float_format = '{:.2f}'.format
features = ['Area', 'District', 'Market', 'Rooms_num', 'Build_year', 'Garage', 'Lift', 'Basement', 'Balcony', 'Garden', 'Terrace', 'Floors_num', 'Floor_no', 'Construction_status']#, 'Rent']
predict_price_random_forest(data, features, True, False)

Again, our data is skewed to the right, thus logarithmic transformation will result with data closer to the normal distribution and hopefully a smaller error.

In [None]:
logData = data.copy()
logData[list(NUMERICAL_FEATURES)] = logData[list(NUMERICAL_FEATURES)].apply(lambda x: np.log(x + 0.01))
predict_price_random_forest(logData, features, True, True)

Tuning the hyper-parameters of the RandomForestRegressor through 3-fold cross validation to avoid overfitting.

In [None]:
def grid_search(features, data, estimator, param_grid):

    model = get_pipeline(features, estimator)

    X = data[features]
    y = data['Price']

    with parallel_backend('threading'):
        estim_search = GridSearchCV(estimator=model, param_grid=param_grid, cv=3, verbose=5, n_jobs=-1)
        estim_search.fit(X, y)
    return estim_search

In [None]:
rf = RandomForestRegressor(random_state=1)
n_estimators = np.concatenate(([50, 100], np.arange(start=200, stop=2100, step=400)))
max_depth = [None, 5, 6, 7] + [int(x) for x in np.arange(start=10, stop=110, step=30)]

param_grid = {
    'regressor__n_estimators':  n_estimators,
    'regressor__max_depth': max_depth,
}
search = grid_search(features, logData, rf, param_grid)
results = pd.DataFrame(search.cv_results_)
display(results[['param_regressor__n_estimators',
                 'param_regressor__max_depth',
                 'mean_test_score',
                 'std_test_score']])

In [None]:
gb = GradientBoostingRegressor(random_state=1)
learning_rate = np.linspace(0.1, 1.0, num=10)
n_estimators = np.concatenate(([50, 100], np.arange(start=200, stop=2100, step=200)))

param_grid = {
    'regressor__learning_rate': learning_rate,
    'regressor__n_estimators':  n_estimators,
}
search = grid_search(features + ['Rent'], logData, gb, param_grid)
results = pd.DataFrame(search.cv_results_)
display(results[['param_regressor__n_estimators',
                 'param_regressor__learning_rate',
                 'mean_test_score',
                 'std_test_score']])

In [None]:
def predict_price_gradient_boosting(data, features, verbose, log):
    X = data[features]
    y = data['Price']

    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1)

    gb = GradientBoostingRegressor(learning_rate=0.2, n_estimators=1000)
    model = get_pipeline(features, gb)

    model.fit(X_train, y_train)

    train_score = model.score(X_train, y_train)
    test_score = model.score(X_test, y_test)

    func = np.exp if log else lambda x: x

    X_train_predict = func(model.predict(X_train))
    X_test_predict  = func(model.predict(X_test))
    y_train_scaled  = func(y_train)
    y_test_scaled   = func(y_test)
    if verbose:
        print(f"Train MAE: {np.mean(np.abs(X_train_predict - y_train_scaled)):.2f}")
        print(f"Test MAE: {np.mean(np.abs(X_test_predict - y_test_scaled)):.2f}")
        print(f"Train MAPE: {np.mean(np.abs((X_train_predict - y_train_scaled) / y_train_scaled)):.2f}")
        print(f"Test MAPE: {np.mean(np.abs((X_test_predict - y_test_scaled) / y_test_scaled)):.2f}")
        print(f"Train R^2 score: {train_score:.2f}")
        print(f"Test R^2 score: {test_score:.2f}")

    y_pred = func(model.predict(X_test))
    results = X_test.copy()
    results['Actual'] = y_test_scaled.astype(int)
    results['Predicted'] = y_pred.astype(int)
    pd.options.display.max_colwidth = 100
    print(results[['Actual', 'Predicted']].join(data['Link']).head(10))

predict_price_gradient_boosting(logData, features + ['Rent'], True, True)