In [1]:
import pandas as pd
import random
import shapely
from shapely.geometry import Point, MultiPoint
import duckdb
import numpy as np
from typing import Any, Dict, Optional
from shapely.geometry import shape
import json

In [None]:
df = pd.read_csv("hf://datasets/joefee/service-data/np_obs_jit_jf_tf_tw.csv")

In [None]:
# Bin this into signal level categories
# UK Ofcom Reference URL: 
# https://www.ofcom.org.uk/siteassets/resources/documents/phones-telecoms-and-internet/comparing-service-quality/2025/map-your-mobile-2025-threshold-methodology.pdf
df["signal_level_category"] = pd.cut(df["signal_level"], bins=[-np.inf, -105, -95, -82, -74, np.inf], labels=["1. Very Weak", "2. Weak", "3. Moderate", "4. Strong", "5. Very Strong"])

In [None]:
# Identify cells with sufficient data points
# Cells must also have signal_level_category value
min_points_required = 30

sufficient_data_cells = duckdb.query(f"""
    SELECT unique_cell 
    FROM df
    WHERE signal_level_category IS NOT NULL
    GROUP BY unique_cell HAVING COUNT(*) >= {min_points_required}
""").to_df()['unique_cell'].tolist()

In [None]:
# Identify all unique cells
# remove outlier cells
outlier_cells = ["4dc7c9ec434ed06502767136789763ec11d2c4b7"]
sufficient_data_cells = [cell for cell in sufficient_data_cells if cell not in outlier_cells]

print(f"Number of cells with sufficient data: {len(sufficient_data_cells)}")
print(f"Total number of cells: {df['unique_cell'].nunique()}")

In [None]:
def generate_svm_boundary_geom(df, **args) -> Optional[Dict[str, Any]]:
    """
    Generate a valid MultiPolygon with true cut-out holes from One-Class SVM boundary.
    Converts input longitude/latitude columns to float type if they are not already.

    Args:
        df (pd.DataFrame): DataFrame with 'longitude' and 'latitude' columns.
                        These columns can contain numbers or Decimal objects.
        **args: Keyword arguments passed directly to svm.OneClassSVM.

    Returns:
        dict: GeoJSON mapping of the resulting MultiPolygon, or None if unsuccessful.
    """
    import numpy as np
    import matplotlib.pyplot as plt
    from sklearn import svm
    from shapely.geometry import LineString, Polygon, MultiPolygon, mapping, Point
    from shapely.ops import polygonize, unary_union
    import pandas as pd
    # Import Decimal for explicit type checking/conversion if needed, though astype(float) is usually sufficient
    from decimal import Decimal

    if not isinstance(df, pd.DataFrame) or not all(col in df.columns for col in ['longitude', 'latitude']):
        print("Error: Input df must be a pandas DataFrame with 'longitude' and 'latitude' columns.")
        return None

    if len(df) < 2:
        print("Warning: Need at least 2 data points for SVM.")
        return None

    # --- Convert coordinate columns to float type ---
    # This resolves the Decimal vs float TypeError
    try:
        df_copy = df.copy() # Work on a copy to avoid modifying the original DataFrame
        df_copy['longitude'] = df_copy['longitude'].astype(float)
        df_copy['latitude'] = df_copy['latitude'].astype(float)
        coords = df_copy[['longitude', 'latitude']].values
    except (TypeError, ValueError) as e:
        print(f"Error converting coordinate columns to float: {e}")
        return None

    # --- 1. Train the SVM ---
    try:
        clf = svm.OneClassSVM(**args)
        clf.fit(coords)
    except Exception as e:
        print(f"Error during SVM training: {e}")
        return None

    # --- 2. Create mesh grid for contouring ---
    # Now calculations will use standard floats
    x_min, x_max = coords[:, 0].min(), coords[:, 0].max()
    y_min, y_max = coords[:, 1].min(), coords[:, 1].max()

    x_range = x_max - x_min
    y_range = y_max - y_min
    x_margin = x_range * 1 if x_range > 1e-9 else 0.1
    y_margin = y_range * 1 if y_range > 1e-9 else 0.1

    x_min -= x_margin
    x_max += x_margin
    y_min -= y_margin
    y_max += y_margin

    resolution = 500
    xx, yy = np.meshgrid(np.linspace(x_min, x_max, resolution),
                        np.linspace(y_min, y_max, resolution))
    grid_points = np.c_[xx.ravel(), yy.ravel()]

    try:
        Z = clf.decision_function(grid_points).reshape(xx.shape)
    except Exception as e:
        print(f"Error during SVM decision function evaluation: {e}")
        return None

    # --- 3. Extract contour lines at level 0 (the boundary) ---
    fig, ax = plt.subplots()
    try:
        cs = ax.contour(xx, yy, Z, levels=[0])
    except Exception as e:
        print(f"Error during contour generation: {e}")
        plt.close(fig)
        return None
    plt.close(fig)

    if not cs.allsegs or not cs.allsegs[0]:
        print("Warning: No contour lines found at level 0.")
        return None

    segments = cs.allsegs[0]
    lines = [LineString(seg) for seg in segments if len(seg) >= 2]

    if not lines:
        print("Warning: No valid LineStrings created from contour segments.")
        return None

    # --- 4. Polygonize the lines ---
    try:
        all_polygons = list(polygonize(lines))
    except Exception as e:
        print(f"Error during polygonization: {e}")
        return None

    if not all_polygons:
        print("Warning: Polygonization did not yield any polygons.")
        return None

    # --- 5. Classify polygons and perform unary union ---
    positive_polygons = []
    for p in all_polygons:
        if p.is_valid and p.area > 1e-9:
            rep_point = p.representative_point()
            try:
                decision_val = clf.decision_function([[rep_point.x, rep_point.y]])[0]
                if decision_val >= 0:
                    # Ensure the polygon added is valid - unary_union can struggle with invalid inputs
                    if p.is_valid:
                        positive_polygons.append(p)
                    else:
                    # Attempt to buffer by 0 to fix potential self-intersections
                        buffered_p = p.buffer(0)
                        if buffered_p.is_valid and isinstance(buffered_p, Polygon):
                                positive_polygons.append(buffered_p)
                        else:
                            print(f"Warning: Skipping invalid polygon generated during classification step even after buffer(0). Area: {p.area}")

            except Exception as e:
                print(f"Warning: Error checking decision function for a polygon point: {e}")


    if not positive_polygons:
        print("Warning: No valid polygons were classified as inside the SVM boundary.")
        return None

    # --- 6. Unary Union to merge positive polygons and create holes ---
    try:
        # Filter again for validity just before union, as buffer(0) might create MultiPolygons
        valid_positive_polygons = [poly for poly in positive_polygons if poly.is_valid and isinstance(poly, Polygon)]
        if not valid_positive_polygons:
            print("Warning: No valid polygons remaining before unary union.")
            return None
        result_geom = unary_union(valid_positive_polygons)

    except Exception as e:
        # Catch potential errors during unary_union (often related to complex topology)
        print(f"Error during unary union: {e}")
        # As a fallback, try creating a MultiPolygon directly from the valid positive polygons
        # This might result in overlaps instead of proper union, but is better than nothing.
        print("Attempting fallback: creating MultiPolygon from individual positive polygons.")
        try:
            result_geom = MultiPolygon(valid_positive_polygons)
            if not result_geom.is_valid:
                print("Warning: Fallback MultiPolygon is invalid.")
                # Try buffer(0) on the multipolygon as a last resort
                buffered_result = result_geom.buffer(0)
                if buffered_result.is_valid:
                    print("Fallback MultiPolygon fixed with buffer(0).")
                    result_geom = buffered_result
                else:
                    print("Error: Fallback MultiPolygon remains invalid even after buffer(0). Cannot proceed.")
                    return None
        except Exception as fallback_e:
            print(f"Error during fallback MultiPolygon creation: {fallback_e}")
            return None


    # --- 7. Format output as MultiPolygon GeoJSON mapping ---
    final_multi_poly = None
    if result_geom is None: # Should not happen with current logic, but check anyway
        print("Error: Resulting geometry is None after union/fallback.")
        return None

    # Simplify handling by ensuring result_geom is always iterable (list of polygons)
    geoms_to_wrap = []
    if isinstance(result_geom, Polygon):
        if result_geom.is_valid:
            geoms_to_wrap = [result_geom]
    elif isinstance(result_geom, MultiPolygon):
        # Filter out invalid geoms within the MultiPolygon if any
        geoms_to_wrap = [g for g in result_geom.geoms if g.is_valid and isinstance(g, Polygon)]
    elif hasattr(result_geom, 'geoms'): # Handle GeometryCollection
        print(f"Warning: unary_union resulted in a GeometryCollection. Filtering for valid Polygons.")
        geoms_to_wrap = [g for g in result_geom.geoms if g.is_valid and isinstance(g, Polygon)]

    if not geoms_to_wrap:
        print("Warning: No valid polygons found in the final geometry after union/cleanup.")
        return None

    # Create the final MultiPolygon
    final_multi_poly = MultiPolygon(geoms_to_wrap)

    # Final validity check
    if final_multi_poly.is_valid:
        return mapping(final_multi_poly)
    else:
        # Try one last buffer(0) fix
        print("Warning: Final MultiPolygon is invalid. Attempting buffer(0) fix.")
        buffered_final = final_multi_poly.buffer(0)
        if buffered_final.is_valid and isinstance(buffered_final, (Polygon, MultiPolygon)):
            # Re-wrap if buffer resulted in a single Polygon
            if isinstance(buffered_final, Polygon):
                final_multi_poly = MultiPolygon([buffered_final])
            else:
                final_multi_poly = buffered_final
            print("Final MultiPolygon fixed with buffer(0).")
            return mapping(final_multi_poly)
        else:
            print("Error: Final MultiPolygon remains invalid even after buffer(0).")
            return None

In [None]:
# Main processing loop for each cell
buffer = 0.0


for cell_id in sufficient_data_cells:

    # Initialize empty geojson for this cell
    my_geojson = {
        "type": "FeatureCollection",
        "features": []
    }
    # Subset the dataframe to only this cell
    df_subset = df[df['unique_cell'] == cell_id]

    # Skip if not enough points
    if len(df_subset) < min_points_required:
        print(f"Skipping cell {cell_id} due to insufficient points ({len(df_subset)} < {min_points_required})")
        continue

    # Get indoors data excluding "Full Service Loss (>120s)"
    df_indoors = df_subset[ (df_subset["in_outdoor_state"].isin(["Probably Indoor", "Surely Indoor"])) & (df_subset['measurement_type_name'] != "Full Service Loss (>120s)") ]
    df_outdoors = df_subset[ (df_subset["in_outdoor_state"].isin(["Probably Outdoor", "Surely Outdoor"])) & (df_subset['measurement_type_name'] != "Full Service Loss (>120s)") ]

    print(f"---")
    print(f"Cell ID: {cell_id}, Total Records: {len(df_subset)}")
    print(f"Indoor Records (excluding Full Service Loss): {len(df_indoors)}")
    print(f"Outdoor Records (excluding Full Service Loss): {len(df_outdoors)}")

    # Construct a convex hull with shapely using df_subset
    points = [Point(xy) for xy in zip(df_subset['longitude'], df_subset['latitude'])]

    # Find center of mass among these points
    multipoint = MultiPoint(points)
    center_of_mass = multipoint.centroid

    # For each point, calculate distance to center of mass
    distances = [point.distance(center_of_mass) for point in points]
    
    # Find the 95% percentile distance
    threshold_distance = pd.Series(distances).quantile(0.95)

    # Filter points to only those within the threshold distance
    filtered_points = [point for point, distance in zip(points, distances) if distance <= threshold_distance]

    # Create new multipoint from filtered points
    multipoint_filtered = MultiPoint(filtered_points)
    
    # Calculate the convex hull
    convex_hull = multipoint_filtered.convex_hull

    # Append convex hull to list
    # my_geojson['features'].append({
    #     "type": "Feature",
    #     "properties": {
    #         "cell_id": cell_id,
    #         "total_records": len(df_subset),
    #         "fill": "#3300FF",
    #     },
    #     "geometry": shapely.geometry.mapping(convex_hull)
    # })

    # Now generate SVM boundary using filtered points
    svm_args = {
        'kernel': 'rbf',
        'nu': float(0.08),  # Lower value allow more flexible boundary
        'gamma': float(len(df_subset) * 50)  # Higher value allow more complex decision boundary
    }

    # Run SVM for different signal level categories
    # Starting from verry weak to very strong
    # Cumulative union of all these geoms to ensure coverage of all signal levels
    levels = [
        ["5. Very Strong"],
        ["4. Strong", "5. Very Strong"],
        ["3. Moderate", "4. Strong", "5. Very Strong"],
        ["2. Weak", "3. Moderate", "4. Strong", "5. Very Strong"],
        ["1. Very Weak", "2. Weak", "3. Moderate", "4. Strong", "5. Very Strong"],
    ]

    # Generate SVM boundaries for each level
    for level in levels:
        df_subset_level = df_subset[df_subset['signal_level_category'].isin(level)]
        if len(df_subset_level) < min_points_required:
            print(f"Skipping SVM for cell {cell_id} at levels {level} due to insufficient points ({len(df_subset_level)} < {min_points_required})")
            continue

        geom_level = generate_svm_boundary_geom(df_subset_level, **svm_args)
        
        if geom_level is not None:
            # Add buffer radius to have better coverage

            if buffer > 0:
                geom_level = shape(geom_level).buffer(buffer)

            # Ensure the geometry is serialized to GeoJSON mapping (dict) before appending
            my_geojson['features'].append({
                "type": "Feature",
                "properties": {
                    "cell_id": cell_id,
                    "total_records": len(df_subset_level),
                    "svm_boundary_levels": level,
                    "fill": "#00FF00",
                },
                "geometry": shapely.geometry.mapping(geom_level),
            })
        else:
            print(f"Warning: SVM boundary generation failed for cell {cell_id} at levels {level}")
        


    # Add points to the export
    for i, point in enumerate(points):
        my_geojson['features'].append({
            "type": "Feature",
            "properties": {
                "cell_id": cell_id,
                "point_index": i,
                "marker-color": "#FF0000",
                "marker-symbol": "circle",
            },
            "geometry": shapely.geometry.mapping(point)
        })
    
    # Export geojson 
    with open(f"cells/cell_{cell_id}.geojson", "w") as f:
        json.dump(my_geojson, f, indent=2)