In [None]:
import requests
import pandas as pd
import geopandas as gpd
import osmnx as ox
import h3pandas
import h3
from shapely import wkt
from shapely.geometry import Polygon, mapping, shape, Point
from shapely.wkt import dumps as shapely_to_wkt, loads as load_wkt
from azure.storage.blob import BlobServiceClient
import folium as fo
import numpy as np
from delta import *
from pyspark.sql import functions as F
from pyspark.sql.functions import udf, col, count, substring, to_json, lit, md5, pandas_udf, explode
from pyspark.sql.types import StructType, StructField, DoubleType, StringType, BooleanType
from pyspark.sql.window import Window
from pyproj import Transformer
from bs4 import BeautifulSoup

StatementMeta(, , -1, SessionError, , SessionError)

InvalidHttpRequestToLivy: [TooManyRequestsForCapacity] This spark job can't be run because you have hit a spark compute or API rate limit. To run this spark job, cancel an active Spark job through the Monitoring hub, choose a larger capacity SKU, or try again later. HTTP status code: 430 {Learn more} HTTP status code: 430.

In [None]:
# Functions
h3_level = 9
transformer = Transformer.from_crs("EPSG:27700", "EPSG:4326", always_xy=True)

def lat_lng_to_h3(lat, lng, resolution=7):
    # Convert latitude/longitude to H3 index at the given resolution
    return h3.latlng_to_cell(lat, lng, resolution)

def eas_nor_to_h3(eas, nor, resolution=7):
    # Convert latitude/longitude to H3 index at the given resolution
    lat_lng = convert_coordinates(eas,nor)
    lat = lat_lng[0]
    lng = lat_lng[1]
    return h3.latlng_to_cell(lat, lng, resolution)


def h3_to_geo(row):
    co = h3.cell_to_boundary (row.h3)
    return Polygon(co)

def cell_to_shapely(h3_index):
    coords = h3.cell_to_boundary(h3_index)
    flipped = tuple(coord[::-1] for coord in coords)
    return Polygon(flipped)

def coords_to_shape(row):
    cell = h3.latlng_to_cell(row.geometry.y, row.geometry.x, h3_level)
    coords = h3.cell_to_boundary(cell)
    flipped = tuple(coord[::-1] for coord in coords)
    return Polygon(flipped)

# Combined function:
# This will get the lat/lon fro the table, convert to a geometry, match to a H3 cell, flip the cell coords and return a polygon
def lat_lng_to_h3_geom(row):
    latlon = h3.latlng_to_cell(row.geometry.y, row.geometry.x, h3_level)
    coords = h3.cell_to_boundary(latlon)
    flipped = tuple(coord[::-1] for coord in coords)
    return Polygon(flipped)

# Define the UDF to convert Easting/Northing to Lat/Lng and return as a StructType
def convert_coordinates(easting, northing):
    # Define transformer from EPSG:27700 to EPSG:4326
    lng, lat = transformer.transform(easting, northing)
    return (float(lat), float(lng))

def h3_center(h3_index):
    lat, long = h3.cell_to_latlng(h3_index)
    return (float(lat), float(long))

def h3_to_geojson(h3_index):

    polygon = h3.cell_to_boundary(h3_index)
    flipped = tuple(coord[::-1] for coord in polygon)

    if flipped[0] != flipped[-1]:
        flipped = list(flipped)
        flipped.append(flipped[0])

    feature = {
        "type": "Feature",
        "geometry": {
            "type": "Polygon",
            "coordinates": [flipped]
        },
        "properties": {
            "h3_id": h3_index
        }
    }
  
    return json.dumps(feature)

def h3_to_wkt_polygon(h3_index):

    polygon = h3.cell_to_boundary(h3_index)
    flipped = tuple(coord[::-1] for coord in polygon)

    if flipped[0] != flipped[-1]:
        flipped = list(flipped)
        flipped.append(flipped[0])

    wkt = "POLYGON((" + ",".join([f"{x} {y}" for x, y in flipped]) + "))"
    return wkt

def get_parent_h3_index(h3_index, res):
    return h3.cell_to_parent(h3_index, res)

def lat_lng_to_h3_vectorized(lats,lons,resolution=7):
    h3_indices = np.vectorize(lambda lat, lon: h3.latlng_to_cell(lat,lon,resolution))(lats,lons)
    return h3_indices

def make_polygon_udf(broadcasted_polygons):
    def is_outside(lat, lon):
        point = Point(lon, lat)  # Shapely uses (x, y) -> (lon, lat)
        for poly in broadcasted_polygons.value:
            if poly.contains(point):
                return False
        return True

    return udf(is_outside, BooleanType())

def get_polygon_wkt(lat, lon):
    pt = Point(lon, lat)
    for poly_wkt in broadcast_polygons.value:
        polygon = load_wkt(poly_wkt)
        if polygon.contains(pt):
            return poly_wkt
    return None

def lat_long_to_wkt(lat, lon):
    if lat is None or lon is None:
        return None
    return f"POINT({lon} {lat})"  # Note: WKT POINT is "POINT(lon lat)"


In [3]:
schema = StructType([
    StructField("latitude", DoubleType()),
    StructField("longitude", DoubleType())
])

StatementMeta(, 12117b70-29f1-46c3-b1b4-d221604391c3, 7, Finished, Available, Finished)

In [6]:
@pandas_udf(schema)
def convert_bng_to_wgs84(easting: pd.Series, northing: pd.Series) -> pd.DataFrame:
    lons, lats = transformer.transform(easting.values, northing.values)
    return pd.DataFrame({
        "longitude": lons,
        "latitude": lats
    })

@pandas_udf(StringType())
def lat_long_to_h3_index(lat: pd.Series, long: pd.Series, resolution: pd.Series) -> pd.Series:
    return pd.Series([
            h3.latlng_to_cell(lat_val, long_val, int(res)) 
            for lat_val, long_val, res in zip(lat, long, resolution)
    ])

def make_point_in_polygon_udf(broadcasted_polygons):
    """
    Returns a Pandas UDF that checks if a (lat, lon) point is inside any polygon
    from the broadcasted list of polygons.
    """
    @pandas_udf(BooleanType())
    def is_inside(lat: pd.Series, lon: pd.Series) -> pd.Series:
        polygons = broadcasted_polygons.value
        points = [Point(lon_, lat_) for lon_, lat_ in zip(lon, lat)]
        return pd.Series([
            any(poly.contains(point) for poly in polygons)
            for point in points
        ])
   
    return is_inside

def add_matched_columns(df, udf_result, col_names):
    for col_name in col_names:
        df = df.withColumn(col_name, udf_result[col_name])
    return df

def extract_polygons_from_gdf(gdf: gpd.GeoDataFrame):
    gdf = gdf.to_crs("EPSG:4326")
    return_columns = [col for col in gdf.columns if col != "geometry"]
    return [
        {**{col: str(row[col]) for col in return_columns}, "geometry": row.geometry}
        for _, row in gdf.iterrows()
    ], return_columns

def make_polygon_match_udf(broadcasted_polygons, columns_to_return: list):
    # Always convert geometry to WKT if requested
    schema = StructType([
        StructField(col, StringType()) for col in columns_to_return
    ])

    @pandas_udf(schema)
    def match_polygon(lat: pd.Series, lon: pd.Series) -> pd.DataFrame:
        polygons = broadcasted_polygons.value
        points = [Point(lon_, lat_) for lon_, lat_ in zip(lon, lat)]

        results = []
        for pt in points:
            match = next((rec for rec in polygons if rec["geometry"].contains(pt)), None)
            if match:
                row = {}
                for col in columns_to_return:
                    if col == "geometry_wkt":
                        row[col] = match["geometry"].wkt
                    else:
                        row[col] = match.get(col, "")
                results.append(row)
            else:
                results.append({col: "" for col in columns_to_return})

        return pd.DataFrame(results)

    return match_polygon

StatementMeta(, 12117b70-29f1-46c3-b1b4-d221604391c3, 10, Finished, Available, Finished)