# SF Parking Finder

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, dayofweek, lit, count, avg, rand, when
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout
from tensorflow.keras.callbacks import EarlyStopping
import pandas as pd
import numpy as np
import requests
from datetime import datetime, timedelta, date
import folium
from folium.plugins import HeatMap
from IPython.display import display, IFrame, FileLink
from google.colab import files
import ipywidgets as widgets
from sklearn.model_selection import train_test_split
from sklearn.metrics import classification_report
from geopy.distance import geodesic
import gc

# Initialize Spark session with increased memory for full dataset
spark = SparkSession.builder \
    .appName("SmartParkingFinder") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "2g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .getOrCreate()

# Set memory fraction for better resource management
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled", "true")

def load_parking_meter_data():
    """Load ALL parking meter data without any limits or sampling."""
    try:
        # Get all records by setting a very high limit or using pagination
        all_data = []
        offset = 0
        batch_size = 50000  # Process in batches to handle large dataset

        while True:
            url = f"https://data.sfgov.org/resource/8vzz-qzz9.json?$limit={batch_size}&$offset={offset}"
            response = requests.get(url, timeout=30)
            response.raise_for_status()
            batch_data = response.json()

            if not batch_data:  # No more data
                break

            all_data.extend(batch_data)
            offset += batch_size
            print(f"📥 Loaded batch: {len(batch_data)} records (Total: {len(all_data)})")

            # Break if we get less than batch_size (indicates end of data)
            if len(batch_data) < batch_size:
                break

        # Keep all essential columns - no filtering
        df = spark.createDataFrame(pd.DataFrame(all_data)[['post_id', 'street_name', 'street_num', 'latitude', 'longitude', 'analysis_neighborhood']])
        df = df.withColumn("latitude", col("latitude").cast("float")).withColumn("longitude", col("longitude").cast("float"))

        # Remove any NULL filtering - keep all data
        df = df.filter(col("latitude").isNotNull() & col("longitude").isNotNull() & col("analysis_neighborhood").isNotNull())

        print(f"✅ Loaded {df.count()} parking meters")
        return df
    except requests.RequestException as e:
        print(f"❌ Error loading data: {e}")
        return None
    except Exception as e:
        print(f"❌ Unexpected error in load_parking_meter_data: {e}")
        return None

def validate_data(df):
    """Validate parking meter data for completeness."""
    if df is None:
        print("❌ DataFrame is None")
        return False
    required_columns = ['post_id', 'street_name', 'street_num', 'latitude', 'longitude', 'analysis_neighborhood']
    missing_cols = [col for col in required_columns if col not in df.columns]
    if missing_cols:
        print(f"❌ Missing columns: {missing_cols}")
        return False
    return True

def load_event_data():
    """Load realistic event data with moderate impacts on parking demand."""
    events = [
        # Moderate impact events - more realistic than large spikes
        {"date": "2025-05-27", "hour": 19, "neighborhood": "Mission", "impact": 0.08},        # Small concert
        {"date": "2025-05-28", "hour": 20, "neighborhood": "Downtown", "impact": 0.12},       # Business event
        {"date": "2025-05-27", "hour": 18, "neighborhood": "SoMa", "impact": 0.10},          # Tech meetup
        {"date": "2025-05-28", "hour": 19, "neighborhood": "Castro", "impact": 0.06},        # Community event
        {"date": "2025-05-27", "hour": 20, "neighborhood": "Marina", "impact": 0.08},        # Restaurant event
        {"date": "2025-05-28", "hour": 18, "neighborhood": "Nob Hill", "impact": 0.10},      # Hotel conference

        # Weekend events with slightly higher impact
        {"date": "2025-05-31", "hour": 14, "neighborhood": "Chinatown", "impact": 0.15},     # Saturday festival
        {"date": "2025-05-31", "hour": 20, "neighborhood": "North Beach", "impact": 0.12},   # Saturday nightlife
        {"date": "2025-06-01", "hour": 13, "neighborhood": "Marina", "impact": 0.18},        # Sunday farmers market
        {"date": "2025-06-01", "hour": 15, "neighborhood": "Golden Gate Park", "impact": 0.20}, # Sunday park event

        # Negative impact events (reduced demand)
        {"date": "2025-05-29", "hour": 8, "neighborhood": "Financial District", "impact": -0.15}, # Holiday - less business
        {"date": "2025-05-30", "hour": 16, "neighborhood": "SoMa", "impact": -0.10},         # Early Friday closure
    ]
    return spark.createDataFrame(pd.DataFrame(events))

def simulate_occupancy_batch(df, target_date, target_hour):
    """Simulate realistic occupancy with guaranteed availability across the city."""
    if df is None:
        print("❌ Input DataFrame is None")
        return None

    try:
        from pyspark.sql.functions import when, rand, lit, dayofweek

        # Create timestamp for all meters
        target_timestamp = datetime.combine(target_date, datetime.min.time()) + timedelta(hours=target_hour)

        # Add timestamp to ALL meters (no sampling)
        full_df = df.withColumn("timestamp", lit(target_timestamp))

        # Join with event data
        event_df = load_event_data()
        full_df = full_df.join(event_df,
                              (col("timestamp").cast("date") == col("date")) &
                              (col("analysis_neighborhood") == col("neighborhood")),
                              "left_outer")

        # FIXED: More realistic hourly occupancy patterns with guaranteed availability
        realistic_hour_occupancy = {
            # Late night/Early morning - lower occupancy, more availability
            0: 0.15,   # 15% occupied, 85% available
            1: 0.12,   # 12% occupied, 88% available
            2: 0.10,   # 10% occupied, 90% available
            3: 0.08,   # 8% occupied, 92% available
            4: 0.10,   # 10% occupied, 90% available
            5: 0.15,   # 15% occupied, 85% available

            # Morning - gradual realistic increase
            6: 0.25,   # 25% occupied, 75% available
            7: 0.40,   # 40% occupied, 60% available
            8: 0.55,   # 55% occupied, 45% available
            9: 0.65,   # 65% occupied, 35% available

            # Business hours - high but never complete occupancy
            10: 0.70,  # 70% occupied, 30% available
            11: 0.75,  # 75% occupied, 25% available
            12: 0.78,  # 78% occupied, 22% available (peak)
            13: 0.75,  # 75% occupied, 25% available
            14: 0.72,  # 72% occupied, 28% available
            15: 0.68,  # 68% occupied, 32% available

            # Afternoon/Evening - gradual decrease
            16: 0.60,  # 60% occupied, 40% available
            17: 0.50,  # 50% occupied, 50% available (turnover)
            18: 0.45,  # 45% occupied, 55% available
            19: 0.40,  # 40% occupied, 60% available
            20: 0.35,  # 35% occupied, 65% available
            21: 0.30,  # 30% occupied, 70% available
            22: 0.25,  # 25% occupied, 75% available
            23: 0.20   # 20% occupied, 80% available
        }

        # Get base occupancy rate for the hour
        base_occupancy = realistic_hour_occupancy.get(target_hour, 0.4)

        # FIXED: More realistic neighborhood multipliers that don't cause 100% occupancy
        neighborhood_multipliers = {
            # High-demand areas - but not extreme
            'Financial District/South Beach': 1.3,
            'South of Market': 1.25,
            'Downtown': 1.25,
            'Mission Bay': 1.2,
            'Tenderloin': 1.15,

            # Medium-high demand
            'Nob Hill': 1.1,
            'North Beach': 1.1,
            'Mission': 1.08,
            'Castro/Upper Market': 1.05,
            'Hayes Valley': 1.05,

            # Medium demand
            'Chinatown': 1.0,
            'Marina': 1.0,
            'Russian Hill': 1.0,
            'Pacific Heights': 1.0,
            'Potrero Hill': 0.95,

            # Lower demand areas
            'Inner Richmond': 0.85,
            'Inner Sunset': 0.80,
            'Outer Richmond': 0.75,
            'Sunset/Parkside': 0.70,
            'Bernal Heights': 0.75,
            'Excelsior': 0.65,
            'Visitacion Valley': 0.60,
            'Bayview Hunters Point': 0.65,
            'Outer Mission': 0.70
        }

        # Apply neighborhood-specific adjustments with fallback
        neighborhood_expr = lit(0.9)  # default multiplier for unlisted neighborhoods
        for neighborhood, multiplier in neighborhood_multipliers.items():
            neighborhood_expr = when(col("analysis_neighborhood") == neighborhood,
                                   lit(multiplier)).otherwise(neighborhood_expr)

        full_df = full_df.withColumn("neighborhood_multiplier", neighborhood_expr)

        # Calculate adjusted occupancy probability
        full_df = full_df.withColumn("occupancy_prob",
                                   col("neighborhood_multiplier") * lit(base_occupancy) +
                                   when(col("impact").isNotNull(), col("impact")).otherwise(0.0))

        # CRITICAL FIX: Cap probability to ensure availability ALWAYS exists
        # Never allow more than 85% occupancy to guarantee spots are available
        full_df = full_df.withColumn("occupancy_prob",
                                   when(col("occupancy_prob") > 0.85, 0.85)  # Max 85% occupied = 15% available
                                   .when(col("occupancy_prob") < 0.05, 0.05)  # Min 5% occupied
                                   .otherwise(col("occupancy_prob")))

        # FIXED: Day-of-week variations that are more moderate
        is_weekend = target_timestamp.weekday() >= 5

        if is_weekend:
            # Weekend adjustments - more moderate
            weekend_adjustments = {
                6: 0.8,   # Less early morning activity
                7: 0.85,  #
                8: 0.9,   #
                9: 0.95,  #
                10: 1.05, # Weekend activity picks up later
                11: 1.1,  #
                12: 1.15, # Weekend peak but moderate
                13: 1.1,  #
                14: 1.05, #
                15: 1.0,  #
                16: 0.95, #
                17: 0.9,  #
                18: 1.05, # Weekend dinner
                19: 1.1,  #
                20: 1.05, #
                21: 1.0   #
            }
            weekend_mult = weekend_adjustments.get(target_hour, 1.0)
            full_df = full_df.withColumn("occupancy_prob", col("occupancy_prob") * lit(weekend_mult))

            # RE-CAP after weekend adjustment
            full_df = full_df.withColumn("occupancy_prob",
                                       when(col("occupancy_prob") > 0.85, 0.85)
                                       .otherwise(col("occupancy_prob")))

        # FIXED: Use multiple random seeds to create realistic distribution
        # This prevents all meters from having similar occupancy patterns
        full_df = full_df.withColumn("random_val", rand())  # Remove fixed seed for more variation
        full_df = full_df.withColumn("occupied",
                                   when(col("random_val") < col("occupancy_prob"), 1).otherwise(0))

        # Add time-based columns
        full_df = full_df.withColumn("hour", lit(target_hour))
        full_df = full_df.withColumn("dayofweek", dayofweek(col("timestamp")))

        # Clean up intermediate columns
        full_df = full_df.drop("date", "neighborhood", "impact",
                             "neighborhood_multiplier", "occupancy_prob", "random_val")

        # Calculate and display statistics
        total_count = full_df.count()
        occupied_count = full_df.filter(col("occupied") == 1).count()
        available_count = total_count - occupied_count
        occupancy_rate = occupied_count / total_count if total_count > 0 else 0

        print(f"✅ Realistic occupancy simulation complete:")
        print(f"   📊 Total meters: {total_count:,}")
        print(f"   🚗 Occupied: {occupied_count:,} ({occupancy_rate:.1%})")
        print(f"   🅿️ Available: {available_count:,} ({1-occupancy_rate:.1%})")
        print(f"   ⏰ Time: {target_hour}:00 ({'Weekend' if is_weekend else 'Weekday'})")

        # VERIFICATION: Ensure we have availability in multiple neighborhoods
        neighborhood_stats = full_df.groupBy("analysis_neighborhood").agg(
            count("*").alias("total"),
            count(when(col("occupied") == 0, 1)).alias("available")
        ).collect()

        neighborhoods_with_spots = sum(1 for row in neighborhood_stats if row.available > 0)
        print(f"   🏘️ Neighborhoods with available spots: {neighborhoods_with_spots}")

        # Warning if too few neighborhoods have availability
        if neighborhoods_with_spots < 5:
            print("   ⚠️ Warning: Very few neighborhoods have availability - this may be unrealistic")

        return full_df

    except Exception as e:
        print(f"❌ Error in simulate_occupancy_batch: {e}")
        import traceback
        traceback.print_exc()
        return None

def train_improved_predictor(df):
    """FIXED: Train model that properly learns occupancy patterns without extreme predictions."""
    try:
        # Use ALL available data for training
        pandas_df = df.select("hour", "dayofweek", "latitude", "longitude", "occupied").toPandas()

        features = ['hour', 'dayofweek', 'latitude', 'longitude']
        X = pandas_df[features].astype(float)
        y = pandas_df['occupied'].astype(int)

        print(f"🧠 Training improved model: {len(X)} samples")

        # Check class distribution
        occupied_rate = y.mean()
        print(f"📊 Training data: {occupied_rate:.1%} occupied, {1-occupied_rate:.1%} available")

        # Train-test split with stratification to maintain class balance
        X_train, X_test, y_train, y_test = train_test_split(
            X, y, test_size=0.2, random_state=42, stratify=y
        )

        # FIXED: Better model architecture with dropout to prevent overfitting
        model = Sequential([
            Dense(64, activation='relu', input_shape=(4,)),
            Dropout(0.3),  # Prevent overfitting
            Dense(32, activation='relu'),
            Dropout(0.2),
            Dense(16, activation='relu'),
            Dense(1, activation='sigmoid')  # Output probability of being occupied
        ])

        # FIXED: Better compilation with class weights to handle imbalanced data
        # Calculate class weights to handle imbalanced dataset
        from sklearn.utils.class_weight import compute_class_weight
        classes = np.unique(y_train)
        class_weights = compute_class_weight('balanced', classes=classes, y=y_train)
        class_weight_dict = dict(zip(classes, class_weights))

        model.compile(
            optimizer='adam',
            loss='binary_crossentropy',
            metrics=['accuracy', 'precision', 'recall']
        )

        # Train with early stopping and class weights
        history = model.fit(
            X_train, y_train,
            validation_data=(X_test, y_test),
            epochs=30,  # Reduced epochs to prevent overfitting
            batch_size=256,
            verbose=1,
            class_weight=class_weight_dict,  # Handle class imbalance
            callbacks=[EarlyStopping(patience=5, monitor='val_loss', restore_best_weights=True)]
        )

        # Evaluate with detailed metrics
        _, accuracy, precision, recall = model.evaluate(X_test, y_test, verbose=0)
        print(f"🎯 Model Performance:")
        print(f"   Accuracy: {accuracy:.3f}")
        print(f"   Precision: {precision:.3f}")
        print(f"   Recall: {recall:.3f}")

        # Test prediction distribution on validation set
        val_predictions = model.predict(X_test, verbose=0)
        val_pred_rate = (val_predictions > 0.5).mean()
        print(f"   Predicted occupancy rate: {val_pred_rate:.1%}")

        return model
    except Exception as e:
        print(f"❌ Error in train_improved_predictor: {e}")
        import traceback
        traceback.print_exc()
        return None

def predict_availability_fixed(model, df, user_time, neighborhood):
    """FIXED: Properly interpret model predictions and ensure realistic availability."""
    try:
        # Get meters for the specific neighborhood and time
        input_df = df.filter(
            (col("timestamp") == user_time) &
            (col("analysis_neighborhood") == neighborhood)
        ).select("hour", "dayofweek", "latitude", "longitude", "street_name", "street_num", "post_id", "occupied")

        pandas_input = input_df.toPandas()
        if pandas_input.empty:
            print(f"❌ No meters found for {neighborhood}")
            return None

        # Get actual occupancy from simulation for comparison
        actual_available = (pandas_input['occupied'] == 0).sum()
        actual_occupied = (pandas_input['occupied'] == 1).sum()
        actual_rate = actual_occupied / len(pandas_input)

        print(f"📊 {neighborhood} - Actual: {actual_occupied} occupied, {actual_available} available ({actual_rate:.1%} occupied)")

        # Make predictions using the ML model
        X = pandas_input[['hour', 'dayofweek', 'latitude', 'longitude']].astype(float)
        occupation_probabilities = model.predict(X, verbose=0).flatten()

        # CRITICAL FIX: Use a dynamic threshold based on the expected occupancy rate
        # Instead of a fixed 0.4 or 0.5 threshold, use the median probability
        dynamic_threshold = np.percentile(occupation_probabilities, 50)  # 50th percentile

        # Ensure threshold is reasonable (between 0.3 and 0.7)
        dynamic_threshold = max(0.3, min(0.7, dynamic_threshold))

        print(f"🎯 Using dynamic threshold: {dynamic_threshold:.3f}")
        print(f"📈 Probability range: {occupation_probabilities.min():.3f} to {occupation_probabilities.max():.3f}")

        # Predict availability (0 = available, 1 = occupied)
        predicted_occupied = (occupation_probabilities >= dynamic_threshold).astype(int)
        predicted_available = 1 - predicted_occupied

        # Calculate confidence as distance from threshold
        confidence_scores = np.abs(occupation_probabilities - dynamic_threshold)

        # Create results dataframe
        result_df = pandas_input.copy()
        result_df['predicted_available'] = predicted_available
        result_df['occupation_probability'] = occupation_probabilities
        result_df['confidence'] = confidence_scores

        # Filter for predicted available spots
        available_spots = result_df[result_df['predicted_available'] == 1].copy()

        # Sort by confidence (higher confidence first)
        available_spots = available_spots.sort_values('confidence', ascending=False)

        predicted_available_count = len(available_spots)
        predicted_occupied_count = len(result_df) - predicted_available_count
        predicted_rate = predicted_occupied_count / len(result_df)

        print(f"🤖 ML Prediction: {predicted_occupied_count} occupied, {predicted_available_count} available ({predicted_rate:.1%} occupied)")

        # Quality check: If we predict 0 available spots but simulation shows availability, adjust
        if predicted_available_count == 0 and actual_available > 0:
            print("🔧 Adjusting: ML predicted 0 available but simulation shows availability")
            # Take the spots with lowest occupation probability as available
            n_to_flip = min(5, actual_available)  # Take up to 5 spots or actual available, whichever is smaller
            lowest_prob_indices = result_df.nsmallest(n_to_flip, 'occupation_probability').index

            available_spots = result_df.loc[lowest_prob_indices].copy()
            available_spots['predicted_available'] = 1
            available_spots['confidence'] = 1 - available_spots['occupation_probability']

            print(f"🔧 Adjusted to show {len(available_spots)} available spots")

        return available_spots if len(available_spots) > 0 else None

    except Exception as e:
        print(f"❌ Error in predict_availability_fixed: {e}")
        import traceback
        traceback.print_exc()
        return None

def recommend_spots(available_df, destination_lat, destination_lon, top_n=10):
    """Recommend top parking spots based on distance - increased default to 10."""
    if available_df is None or len(available_df) == 0:
        print("❌ No available spots found")
        return None

    try:
        # Calculate distances for ALL available spots
        available_df['distance'] = available_df.apply(
            lambda row: geodesic((row['latitude'], row['longitude']),
                               (destination_lat, destination_lon)).meters, axis=1)

        # Get top recommendations from full dataset
        top_spots = available_df.nsmallest(top_n, 'distance')

        print(f"✅ Top {len(top_spots)} recommended spots from {len(available_df)} available:")
        for i, (_, row) in enumerate(top_spots.iterrows(), 1):
            print(f"  {i}. {row['street_num']} {row['street_name']} ({row['distance']:.0f}m away, confidence: {row['confidence']:.2f})")

        return top_spots
    except Exception as e:
        print(f"❌ Error in recommend_spots: {e}")
        return None

def create_comprehensive_map(spots_df, destination_lat, destination_lon, map_filename='parking_map_fixed_ml.html'):
    """Create comprehensive map with ALL available spots and destination marker."""
    if spots_df is None or len(spots_df) == 0:
        print("❌ No spots to display on map")
        return

    try:
        # Center on destination
        center_lat = destination_lat
        center_lon = destination_lon

        m = folium.Map(location=[center_lat, center_lon], zoom_start=14)

        # Add destination marker
        folium.Marker(
            location=[destination_lat, destination_lon],
            popup="🎯 Your Destination",
            icon=folium.Icon(color='red', icon='star', prefix='fa')
        ).add_to(m)

        # Add markers for ALL available spots with color coding by distance
        for i, (_, row) in enumerate(spots_df.iterrows()):
            # Color code by distance
            if row['distance'] <= 100:
                color = 'green'
            elif row['distance'] <= 300:
                color = 'orange'
            else:
                color = 'blue'

            folium.Marker(
                location=[row['latitude'], row['longitude']],
                popup=f"#{i+1}: {row['street_num']} {row['street_name']}<br>Distance: {row['distance']:.0f}m<br>Confidence: {row['confidence']:.2f}<br>Prob: {row.get('occupation_probability', 'N/A'):.2f}",
                icon=folium.Icon(color=color, icon='car', prefix='fa')
            ).add_to(m)

        # Add a circle around destination to show walking distance
        folium.Circle(
            location=[destination_lat, destination_lon],
            radius=200,  # 200m walking radius
            popup="200m walking radius",
            color='red',
            fill=True,
            fillOpacity=0.1
        ).add_to(m)

        m.save(map_filename)

        print(f"✅ Map saved to '{map_filename}' with {len(spots_df)} parking spots")
        files.download(map_filename)
        display(FileLink(map_filename))

        return m
    except Exception as e:
        print(f"❌ Error creating map: {e}")
        return None

def comprehensive_parking_search_ml_fixed(neighborhood, search_date, search_hour, dest_lat, dest_lon):
    """FIXED: Comprehensive parking search with properly working ML model."""
    print(f"🔍 ML-FIXED Search: {neighborhood} on {search_date} at {search_hour}:00")
    print("📊 Loading parking meter dataset...")

    # Load ALL data without limits
    static_data = load_parking_meter_data()
    if not validate_data(static_data):
        return None

    # Generate realistic occupancy for ALL meters
    print("🎯 Generating realistic occupancy patterns...")
    occupancy_df = simulate_occupancy_batch(static_data, search_date, search_hour)
    if occupancy_df is None:
        return None

    # Train improved ML model
    print("🧠 Training improved ML model...")
    model = train_improved_predictor(occupancy_df)
    if model is None:
        return None

    # Use FIXED prediction method
    user_time = datetime.combine(search_date, datetime.min.time()) + timedelta(hours=search_hour)
    available_spots = predict_availability_fixed(model, occupancy_df, user_time, neighborhood)

    # If no spots in target neighborhood, search nearby neighborhoods
    if available_spots is None or len(available_spots) == 0:
        print(f"❌ No available spots in {neighborhood}")
        print("🔍 Searching nearby neighborhoods...")

        # Get neighborhoods with most similar characteristics
        all_neighborhoods = occupancy_df.select("analysis_neighborhood").distinct().rdd.flatMap(lambda x: x).collect()
        all_available = []

        for hood in all_neighborhoods[:10]:  # Check first 10 neighborhoods
            hood_spots = predict_availability_fixed(model, occupancy_df, user_time, hood)
            if hood_spots is not None and len(hood_spots) > 0:
                hood_spots['neighborhood'] = hood
                all_available.append(hood_spots)

        if all_available:
            available_spots = pd.concat(all_available, ignore_index=True)
            print(f"✅ Found {len(available_spots)} total available spots across multiple neighborhoods")
        else:
            print("❌ No available spots found in searched neighborhoods")
            return None

    if available_spots is not None and len(available_spots) > 0:
        # Get comprehensive recommendations
        recommendations = recommend_spots(available_spots, dest_lat, dest_lon, top_n=15)

        # Create comprehensive map
        if recommendations is not None:
            map_filename = f'ml_fixed_parking_map_{neighborhood}_{search_date}_{search_hour}.html'
            create_comprehensive_map(recommendations, dest_lat, dest_lon, map_filename)

        return recommendations
    else:
        print("❌ No available spots found")
        return None

def interactive_app_ml_fixed():
    """ML-FIXED interactive app with properly working machine learning model."""
    print("📊 Loading dataset to get all neighborhoods...")

    # Load full dataset to get all neighborhoods
    full_data = load_parking_meter_data()
    if full_data is None:
        return

    all_neighborhoods = sorted(full_data.select("analysis_neighborhood").distinct().rdd.flatMap(lambda x: x).collect())
    print(f"✅ Found {len(all_neighborhoods)} neighborhoods in dataset")

    neighborhood_dropdown = widgets.Dropdown(options=all_neighborhoods, description='Neighborhood:')
    date_picker = widgets.DatePicker(description='Date:', value=date.today())
    hour_slider = widgets.IntSlider(description='Hour:', value=10, min=0, max=23)
    lat_input = widgets.FloatText(description='Dest. Latitude:', value=37.7749)
    lon_input = widgets.FloatText(description='Dest. Longitude:', value=-122.4194)
    search_button = widgets.Button(description='🔍 Find Parking', button_style='success')
    output = widgets.Output()

    def on_search_click(b):
        with output:
            output.clear_output()
            try:
                results = comprehensive_parking_search_ml_fixed(
                    neighborhood_dropdown.value,
                    date_picker.value,
                    hour_slider.value,
                    lat_input.value,
                    lon_input.value
                )
                if results is not None:
                    print("🎉 Search completed successfully!")
                else:
                    print("❌ Search completed but no results found")
            except Exception as e:
                print(f"❌ Error during search: {e}")
                import traceback
                traceback.print_exc()

    search_button.on_click(on_search_click)

    # Display the interface
    display(widgets.VBox([
        widgets.HTML("<h2>🅿️ Smart Parking Finder - ML Enhanced</h2>"),
        neighborhood_dropdown,
        date_picker,
        hour_slider,
        lat_input,
        lon_input,
        search_button,
        output
    ]))

def main():
    """Main function to demonstrate the complete parking finder system."""
    print("🅿️ Smart Parking Finder with Fixed ML Implementation")
    print("=" * 60)

    try:
        # Option 1: Run interactive app
        print("Starting interactive app...")
        interactive_app_ml_fixed()

    except Exception as e:
        print(f"❌ Error in main: {e}")

        # Option 2: Run a sample search if interactive fails
        print("\n🔧 Running sample search instead...")
        sample_results = comprehensive_parking_search_ml_fixed(
            neighborhood="Mission",
            search_date=date.today(),
            search_hour=14,
            dest_lat=37.7599,
            dest_lon=-122.4148
        )

        if sample_results is not None:
            print("✅ Sample search completed successfully!")
        else:
            print("❌ Sample search failed")

def cleanup_resources():
    """Clean up Spark resources."""
    try:
        spark.stop()
        gc.collect()
        print("✅ Resources cleaned up")
    except:
        pass

# Ensure cleanup happens at end
import atexit
atexit.register(cleanup_resources)

# Run the application
if __name__ == "__main__":
    main()

Destination Latitude and Longitude Eaxmple:

* California Academy of Sciences: 37.7699° N, 122.4661° W
* UC Law San Francisco: 37.7811° N, 122.4158° W