# Bishkek Real Estate v3 + Road Network Features

**Scientific approach to road features in real estate valuation**

## Hypotheses

1. **Accessibility Hypothesis**: Proximity to main roads improves transport accessibility → higher prices
2. **Noise Hypothesis**: Too close to main roads → noise pollution → lower prices  
3. **Optimal Distance Theory**: Non-linear relationship - optimal distance is 100-500m from main roads
4. **Road Density**: Higher road density = better infrastructure → higher prices
5. **Street Hierarchy**: Access to higher-class roads → premium

## References
- Cervero & Duncan (2004): Land value impacts of rail transit
- Debrezion et al. (2007): Impact of railway stations on residential property values
- Li et al. (2019): Road network and housing prices in Chinese cities

In [None]:
import numpy as np
import pandas as pd
import json
from scipy.spatial import cKDTree
from math import radians, sin, cos, sqrt, atan2
import warnings
warnings.filterwarnings('ignore')

# ML
from sklearn.model_selection import train_test_split, cross_val_score
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.metrics import mean_absolute_error, mean_squared_error, r2_score
from sklearn.ensemble import StackingRegressor
from sklearn.linear_model import Ridge

from xgboost import XGBRegressor
from lightgbm import LGBMRegressor
from catboost import CatBoostRegressor

print("Libraries loaded")

## 1. Load Data

In [None]:
# Load apartments
df = pd.read_csv('/kaggle/input/bishkek-real-estate-2025/bishkek_apartments.csv')
print(f"Loaded {len(df)} apartments")

# Load roads GeoJSON
with open('/kaggle/input/bishkek-roads/bishkek_roads.geojson', 'r') as f:
    roads_data = json.load(f)
print(f"Loaded {len(roads_data['features'])} road segments")

## 2. Road Feature Engineering

Using KDTree spatial indexing for efficient nearest-neighbor queries.

**Road classification (OpenStreetMap):**
- `trunk`: Major highways, arterial roads
- `primary`: Primary urban roads
- `secondary`: Secondary roads
- `tertiary`: Local collector roads
- `residential`: Residential streets

In [None]:
class RoadFeatureExtractor:
    """Extract road network features using KDTree spatial indexing."""
    
    def __init__(self, roads_geojson):
        self.roads = roads_geojson
        self.trees = {}
        self.road_points = {}
        self._build_spatial_index()
    
    def _build_spatial_index(self):
        """Build KDTree spatial index for fast nearest neighbor queries"""
        road_classes = ['trunk', 'primary', 'secondary', 'tertiary', 'residential']
        
        for fclass in road_classes:
            self.road_points[fclass] = []
        
        for feature in self.roads['features']:
            fclass = feature['properties'].get('fclass', '')
            if fclass not in road_classes:
                continue
                
            geom = feature['geometry']
            coords = []
            
            if geom['type'] == 'MultiLineString':
                for line in geom['coordinates']:
                    coords.extend(line[::3])
            elif geom['type'] == 'LineString':
                coords = geom['coordinates'][::3]
            
            for lon, lat in coords:
                self.road_points[fclass].append((lat, lon))
        
        for fclass, points in self.road_points.items():
            if points:
                self.trees[fclass] = cKDTree(np.array(points))
                print(f"  {fclass}: {len(points)} points")
    
    def _query_distance(self, lat, lon, road_class):
        """Get distance to nearest road of given class (in km)"""
        if road_class not in self.trees:
            return np.nan
        dist_deg, _ = self.trees[road_class].query([lat, lon])
        return dist_deg * 85  # 1 degree ≈ 85 km at Bishkek latitude
    
    def _count_roads_in_radius(self, lat, lon, radius_km, road_classes=None):
        """Count road points within radius"""
        if road_classes is None:
            road_classes = self.trees.keys()
        radius_deg = radius_km / 85
        count = 0
        for fclass in road_classes:
            if fclass in self.trees:
                indices = self.trees[fclass].query_ball_point([lat, lon], radius_deg)
                count += len(indices)
        return count
    
    def extract_features(self, df):
        """Extract all road-related features"""
        df = df.copy()
        print("Extracting road features...")
        
        # Distance features
        for road_class in ['trunk', 'primary', 'secondary']:
            df[f'dist_to_{road_class}'] = df.apply(
                lambda r: self._query_distance(r['latitude'], r['longitude'], road_class)
                if pd.notna(r['latitude']) else np.nan, axis=1
            )
        
        df['dist_to_main_road'] = df[['dist_to_trunk', 'dist_to_primary']].min(axis=1)
        print("  Distance features done")
        
        # Density features
        df['road_density_500m'] = df.apply(
            lambda r: self._count_roads_in_radius(r['latitude'], r['longitude'], 0.5)
            if pd.notna(r['latitude']) else 0, axis=1
        )
        
        df['main_road_density_1km'] = df.apply(
            lambda r: self._count_roads_in_radius(
                r['latitude'], r['longitude'], 1.0, 
                ['trunk', 'primary', 'secondary']
            ) if pd.notna(r['latitude']) else 0, axis=1
        )
        print("  Density features done")
        
        # Zone features (hedonic pricing theory)
        df['noise_zone'] = (df['dist_to_main_road'] <= 0.05).astype(int)  # < 50m
        df['optimal_zone'] = ((df['dist_to_main_road'] >= 0.1) & 
                              (df['dist_to_main_road'] <= 0.4)).astype(int)  # 100-400m
        df['far_from_roads'] = (df['dist_to_main_road'] > 1.0).astype(int)  # > 1km
        
        # Road hierarchy score
        df['road_hierarchy_score'] = (
            5 / (df['dist_to_trunk'].fillna(10) + 0.1) +
            4 / (df['dist_to_primary'].fillna(10) + 0.1) +
            3 / (df['dist_to_secondary'].fillna(10) + 0.1)
        )
        print("  Zone features done")
        
        return df

print("Building spatial index...")
road_extractor = RoadFeatureExtractor(roads_data)

## 3. POI Features

In [None]:
BISHKEK_POI = {
    'bazaars': [
        ('osh_bazaar', 42.874823, 74.569599),
        ('dordoi_bazaar', 42.939732, 74.620613),
        ('ortosay_bazaar', 42.836209, 74.615931),
    ],
    'parks': [
        ('dubovy_park', 42.877681, 74.606759),
        ('ataturk_park', 42.839587, 74.595725),
    ],
    'malls': [
        ('bishkek_park', 42.875029, 74.590403),
        ('dordoi_plaza', 42.874685, 74.618469),
    ],
    'universities': [
        ('auca', 42.81132, 74.627743),
        ('krsu', 42.874862, 74.627114),
    ],
}

BISHKEK_CENTER = (42.8746, 74.5698)

def haversine_distance(lat1, lon1, lat2, lon2):
    R = 6371
    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat, dlon = lat2 - lat1, lon2 - lon1
    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    return R * 2 * atan2(sqrt(a), sqrt(1-a))

def add_poi_features(df):
    df = df.copy()
    df['dist_to_center'] = df.apply(
        lambda r: haversine_distance(r['latitude'], r['longitude'], 
                                     BISHKEK_CENTER[0], BISHKEK_CENTER[1])
        if pd.notna(r['latitude']) else np.nan, axis=1
    )
    
    for category, pois in BISHKEK_POI.items():
        df[f'dist_to_{category}'] = df.apply(
            lambda r: min([
                haversine_distance(r['latitude'], r['longitude'], lat, lon)
                for _, lat, lon in pois
            ]) if pd.notna(r['latitude']) else np.nan, axis=1
        )
    return df

print("POI features defined")

## 4. Feature Engineering

In [None]:
class FeatureEngineer:
    def __init__(self):
        self.target_encodings = {}
        self.label_encoders = {}
        self.year_median = None
    
    def fit(self, df, y):
        df = df.copy()
        self.year_median = df['year_built'].median()
        
        df['target'] = y
        for col in ['jk_name', 'district', 'house_type']:
            if col in df.columns:
                means = df.groupby(col)['target'].mean()
                counts = df.groupby(col)['target'].count()
                global_mean = y.mean()
                smoothed = (means * counts + global_mean * 10) / (counts + 10)
                self.target_encodings[col] = smoothed.to_dict()
        
        for col in ['condition', 'heating', 'bathroom']:
            if col in df.columns:
                le = LabelEncoder()
                le.fit(list(df[col].dropna().unique()) + ['unknown'])
                self.label_encoders[col] = le
        return self
    
    def transform(self, df):
        df = df.copy()
        df['year_built'] = df['year_built'].fillna(self.year_median)
        
        df['floor_ratio'] = df['floor'] / df['total_floors'].replace(0, 1)
        df['is_first_floor'] = (df['floor'] == 1).astype(int)
        df['is_last_floor'] = (df['floor'] == df['total_floors']).astype(int)
        df['building_age'] = 2025 - df['year_built']
        df['is_new_building'] = (df['year_built'] >= 2020).astype(int)
        df['area_per_room'] = df['area'] / df['rooms'].replace(0, 1)
        
        for col, encoding in self.target_encodings.items():
            if col in df.columns:
                global_mean = np.mean(list(encoding.values()))
                df[f'{col}_encoded'] = df[col].map(encoding).fillna(global_mean)
        
        for col, le in self.label_encoders.items():
            if col in df.columns:
                df[f'{col}_encoded'] = df[col].fillna('unknown').apply(
                    lambda x: le.transform([x])[0] if x in le.classes_ else 0
                )
        
        df['is_monolith'] = (df['house_type'] == 'монолит').astype(int)
        df['is_brick'] = (df['house_type'] == 'кирпич').astype(int)
        df['is_panel'] = (df['house_type'] == 'панель').astype(int)
        
        return df
    
    def fit_transform(self, df, y):
        self.fit(df, y)
        return self.transform(df)

print("Feature engineer defined")

## 5. Data Preparation

In [None]:
TARGET = 'price_per_m2'

# Clean data
df_clean = df[
    (df[TARGET] >= 500) &
    (df[TARGET] <= 5000) &
    (df['area'] >= 15) &
    (df['area'] <= 300)
].copy()
print(f"After cleaning: {len(df_clean)} apartments")

# Add road features
df_clean = road_extractor.extract_features(df_clean)

# Add POI features
df_clean = add_poi_features(df_clean)

print(f"\nDataset ready with {len(df_clean)} samples")

## 6. Road Feature Analysis

In [None]:
road_features = ['dist_to_main_road', 'dist_to_primary', 'dist_to_secondary',
                 'dist_to_trunk', 'road_density_500m', 'main_road_density_1km', 
                 'optimal_zone', 'noise_zone', 'road_hierarchy_score']

print("Road feature correlations with price:")
print("="*50)
for feat in road_features:
    if feat in df_clean.columns:
        corr = df_clean[feat].corr(df_clean[TARGET])
        print(f"{feat:25s}: {corr:+.4f}")

print("\nRoad feature statistics:")
print("="*50)
df_clean[road_features].describe().round(3)

In [None]:
# Visualize price by distance to main road
import matplotlib.pyplot as plt

fig, axes = plt.subplots(1, 3, figsize=(15, 4))

# Distance to main road vs price
ax = axes[0]
df_sample = df_clean.sample(min(2000, len(df_clean)))
ax.scatter(df_sample['dist_to_main_road'], df_sample[TARGET], alpha=0.3, s=10)
ax.set_xlabel('Distance to main road (km)')
ax.set_ylabel('Price per m² ($)')
ax.set_title('Price vs Distance to Main Road')
ax.set_xlim(0, 2)

# Optimal zone comparison
ax = axes[1]
zone_prices = df_clean.groupby('optimal_zone')[TARGET].mean()
ax.bar(['Outside optimal\n(0-100m or >400m)', 'Optimal zone\n(100-400m)'], 
       zone_prices.values, color=['#ff6b6b', '#4ecdc4'])
ax.set_ylabel('Mean Price per m² ($)')
ax.set_title('Price by Optimal Zone')

# Noise zone comparison  
ax = axes[2]
noise_prices = df_clean.groupby('noise_zone')[TARGET].mean()
ax.bar(['Outside noise zone\n(>50m)', 'Noise zone\n(<50m)'], 
       noise_prices.values, color=['#4ecdc4', '#ff6b6b'])
ax.set_ylabel('Mean Price per m² ($)')
ax.set_title('Price by Noise Zone')

plt.tight_layout()
plt.show()

# Print statistics
print(f"\nOptimal zone (100-400m from main road):")
print(f"  Mean price: ${zone_prices.get(1, 0):.0f}/m²")
print(f"  Outside: ${zone_prices.get(0, 0):.0f}/m²")
print(f"  Difference: ${zone_prices.get(1, 0) - zone_prices.get(0, 0):+.0f}/m²")

print(f"\nNoise zone (<50m from main road):")
print(f"  Noise zone: ${noise_prices.get(1, 0):.0f}/m²")
print(f"  Outside: ${noise_prices.get(0, 0):.0f}/m²")
print(f"  Difference: ${noise_prices.get(1, 0) - noise_prices.get(0, 0):+.0f}/m²")

## 7. Model Training

In [None]:
# Split data
X = df_clean.drop(columns=[TARGET, 'price_usd', 'price_local'], errors='ignore')
y = df_clean[TARGET]

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)
print(f"Train: {len(X_train)}, Test: {len(X_test)}")

# Feature engineering
fe = FeatureEngineer()
X_train_fe = fe.fit_transform(X_train, y_train)
X_test_fe = fe.transform(X_test)

# Define feature columns
base_features = [
    'area', 'rooms', 'floor', 'total_floors', 'year_built', 'ceiling_height',
    'latitude', 'longitude',
    'floor_ratio', 'is_first_floor', 'is_last_floor',
    'building_age', 'is_new_building', 'area_per_room',
    'is_monolith', 'is_brick', 'is_panel',
]

poi_features = [
    'dist_to_center', 'dist_to_bazaars', 'dist_to_parks', 
    'dist_to_malls', 'dist_to_universities',
]

road_features = [
    'dist_to_main_road', 'dist_to_primary', 'dist_to_secondary',
    'road_density_500m', 'main_road_density_1km',
    'optimal_zone', 'noise_zone', 'road_hierarchy_score',
]

encoded_features = [
    'jk_name_encoded', 'district_encoded', 'house_type_encoded',
    'condition_encoded', 'heating_encoded', 'bathroom_encoded',
]

all_features = base_features + poi_features + road_features + encoded_features
feature_cols = [c for c in all_features if c in X_train_fe.columns]

print(f"\nTotal features: {len(feature_cols)}")
print(f"  Base: {len([f for f in base_features if f in feature_cols])}")
print(f"  POI: {len([f for f in poi_features if f in feature_cols])}")
print(f"  Road: {len([f for f in road_features if f in feature_cols])}")
print(f"  Encoded: {len([f for f in encoded_features if f in feature_cols])}")

# Prepare final matrices
X_train_final = X_train_fe[feature_cols].fillna(0)
X_test_final = X_test_fe[feature_cols].fillna(0)

scaler = StandardScaler()
X_train_scaled = scaler.fit_transform(X_train_final)
X_test_scaled = scaler.transform(X_test_final)

In [None]:
def calculate_metrics(y_true, y_pred):
    return {
        'MAE': mean_absolute_error(y_true, y_pred),
        'RMSE': np.sqrt(mean_squared_error(y_true, y_pred)),
        'MAPE': np.mean(np.abs((y_true - y_pred) / y_true)) * 100,
        'MedAPE': np.median(np.abs((y_true - y_pred) / y_true)) * 100,
        'R2': r2_score(y_true, y_pred)
    }

results = {}

# XGBoost
print("Training XGBoost...")
xgb_model = XGBRegressor(
    n_estimators=500, max_depth=8, learning_rate=0.05,
    subsample=0.8, colsample_bytree=0.8, random_state=42
)
xgb_model.fit(X_train_scaled, y_train)
xgb_pred = xgb_model.predict(X_test_scaled)
results['XGBoost'] = calculate_metrics(y_test, xgb_pred)
print(f"  MAE: ${results['XGBoost']['MAE']:.2f}/m², R²: {results['XGBoost']['R2']:.4f}")

# LightGBM
print("\nTraining LightGBM...")
lgb_model = LGBMRegressor(
    n_estimators=500, max_depth=8, learning_rate=0.05,
    num_leaves=31, random_state=42, verbose=-1
)
lgb_model.fit(X_train_scaled, y_train)
lgb_pred = lgb_model.predict(X_test_scaled)
results['LightGBM'] = calculate_metrics(y_test, lgb_pred)
print(f"  MAE: ${results['LightGBM']['MAE']:.2f}/m², R²: {results['LightGBM']['R2']:.4f}")

# CatBoost
print("\nTraining CatBoost...")
cat_model = CatBoostRegressor(
    iterations=500, depth=8, learning_rate=0.05,
    random_state=42, verbose=0
)
cat_model.fit(X_train_scaled, y_train)
cat_pred = cat_model.predict(X_test_scaled)
results['CatBoost'] = calculate_metrics(y_test, cat_pred)
print(f"  MAE: ${results['CatBoost']['MAE']:.2f}/m², R²: {results['CatBoost']['R2']:.4f}")

In [None]:
# Stacking Ensemble
print("Training Stacking Ensemble...")
estimators = [
    ('xgb', XGBRegressor(n_estimators=500, max_depth=8, learning_rate=0.05, random_state=42)),
    ('lgb', LGBMRegressor(n_estimators=500, max_depth=8, learning_rate=0.05, random_state=42, verbose=-1)),
    ('cat', CatBoostRegressor(iterations=500, depth=8, learning_rate=0.05, random_state=42, verbose=0))
]

stacking = StackingRegressor(
    estimators=estimators,
    final_estimator=Ridge(alpha=1.0),
    cv=5
)
stacking.fit(X_train_scaled, y_train)
ensemble_pred = stacking.predict(X_test_scaled)
results['Ensemble'] = calculate_metrics(y_test, ensemble_pred)

print("\n" + "="*60)
print("FINAL RESULTS")
print("="*60)
print(f"MAE: ${results['Ensemble']['MAE']:.2f}/m²")
print(f"RMSE: ${results['Ensemble']['RMSE']:.2f}/m²")
print(f"MAPE: {results['Ensemble']['MAPE']:.2f}%")
print(f"MedAPE: {results['Ensemble']['MedAPE']:.2f}%")
print(f"R²: {results['Ensemble']['R2']:.4f}")

## 8. Feature Importance Analysis

In [None]:
importance = pd.DataFrame({
    'feature': feature_cols,
    'importance': xgb_model.feature_importances_
}).sort_values('importance', ascending=False)

print("Top 20 features:")
print("="*50)
for i, row in importance.head(20).iterrows():
    is_road = "[ROAD]" if row['feature'] in road_features else ""
    print(f"{row['feature']:25s}: {row['importance']:.4f} {is_road}")

# Road features contribution
road_importance = importance[importance['feature'].isin(road_features)]['importance'].sum()
total_importance = importance['importance'].sum()
print(f"\nRoad features contribution: {road_importance/total_importance*100:.1f}%")

In [None]:
# Visualize feature importance
import matplotlib.pyplot as plt

fig, ax = plt.subplots(figsize=(10, 8))

top20 = importance.head(20)
colors = ['#ff6b6b' if f in road_features else '#4ecdc4' for f in top20['feature']]

ax.barh(range(len(top20)), top20['importance'].values, color=colors)
ax.set_yticks(range(len(top20)))
ax.set_yticklabels(top20['feature'].values)
ax.invert_yaxis()
ax.set_xlabel('Importance')
ax.set_title('Feature Importance (Red = Road Features)')
plt.tight_layout()
plt.show()

## 9. Ablation Study: With vs Without Road Features

In [None]:
# Train model WITHOUT road features
feature_cols_no_roads = [c for c in feature_cols if c not in road_features]

X_train_no_roads = X_train_fe[feature_cols_no_roads].fillna(0)
X_test_no_roads = X_test_fe[feature_cols_no_roads].fillna(0)

scaler_no_roads = StandardScaler()
X_train_no_roads_scaled = scaler_no_roads.fit_transform(X_train_no_roads)
X_test_no_roads_scaled = scaler_no_roads.transform(X_test_no_roads)

print(f"Features without roads: {len(feature_cols_no_roads)}")
print(f"Features with roads: {len(feature_cols)}")

# Train XGBoost without road features
print("\nTraining XGBoost WITHOUT road features...")
xgb_no_roads = XGBRegressor(
    n_estimators=500, max_depth=8, learning_rate=0.05,
    subsample=0.8, colsample_bytree=0.8, random_state=42
)
xgb_no_roads.fit(X_train_no_roads_scaled, y_train)
pred_no_roads = xgb_no_roads.predict(X_test_no_roads_scaled)
metrics_no_roads = calculate_metrics(y_test, pred_no_roads)

print("\n" + "="*60)
print("ABLATION STUDY: ROAD FEATURES IMPACT")
print("="*60)
print(f"\nWithout road features:")
print(f"  MAE: ${metrics_no_roads['MAE']:.2f}/m²")
print(f"  R²: {metrics_no_roads['R2']:.4f}")

print(f"\nWith road features:")
print(f"  MAE: ${results['XGBoost']['MAE']:.2f}/m²")
print(f"  R²: {results['XGBoost']['R2']:.4f}")

mae_improvement = metrics_no_roads['MAE'] - results['XGBoost']['MAE']
r2_improvement = results['XGBoost']['R2'] - metrics_no_roads['R2']

print(f"\nImprovement from road features:")
print(f"  MAE: ${mae_improvement:+.2f}/m²")
print(f"  R²: {r2_improvement:+.4f}")

## 10. Conclusions

### Summary

This notebook tested the hypothesis that road network features can improve real estate price prediction in Bishkek.

### Road Features Tested:
- **Distance features**: dist_to_main_road, dist_to_primary, dist_to_secondary
- **Density features**: road_density_500m, main_road_density_1km  
- **Zone features**: optimal_zone (100-400m), noise_zone (<50m)
- **Hierarchy score**: weighted score based on road importance

### Key Findings:
1. Road feature correlations with price are relatively weak
2. The optimal zone hypothesis (100-400m from main roads) shows minimal price premium
3. Road features contribute modestly to overall model performance
4. Traditional features (area, location, building characteristics) remain dominant predictors