In [None]:
! pip install h3
! pip install matplotlib
! pip install contextily
! pip install pandas
! pip install pyarrow
! pip install geopandas

In [None]:
import h3.api.numpy_int as h3
import pandas as pd
from pyarrow import parquet
import shapely
from shapely.ops import transform
import geopandas as gpd
import contextily
import matplotlib.pyplot as plt

In [None]:
path = "data"
h3_partition=4
h3_resolution=6

percentiles = [50]
scenarios = ["ssp126"]
return_periods = [100]
years = [2050]

# H3
https://h3geo.org/

https://h3geo.org/docs/3.x/

H3 is a discrete global grid system for indexing geographies into a hexagonal grid, developed at Uber.

In this exercise we will load data from a dataset that has been indexed using H3 at different levels. The dataset has been partition at the H3 resolution 4, this means that each partition file covers an area of 1.770 Km^2 Inside each file, there is a column "lvl?_idx" that has the value of each individual cell, each of this index represents an area of 36 Km^2

The files that are part of the "temperature_max" dataset follow this format "lvl4_idx=595482595521724415/595482595521724415.parquet"


In [None]:
area_of_interest = shapely.Polygon((
    (41.29856899498296, 2.1317027216807674),
    (41.28759060834369, 2.150611938905854),
    (41.338592159365646, 2.1815542943640764),
    (41.37730082982284, 2.199603973338526),
    (41.406316080391235, 2.2288260129995194),
    (41.43724738511128, 2.238276899124429),
    (41.44033624237886, 2.1523800221088436),
    (41.36568972197628, 2.048369841630233),
    (41.29856899498296, 2.1317027216807674)
))

In [None]:
def polygon_to_h3_cells(polygon: shapely.Polygon, resolution: str) -> list[int]:
    """Convert a polygon to a list of H3 cell indices at the specified resolution.

    Args:
        polygon: A shapely Polygon object representing the area to cover.
        resolution: The H3 resolution level.

    Returns:
        A list of H3 cell indices covering the polygon at the given resolution.
    """
    raise NotImplementedError("Not implemented yet")

In [None]:
def h3_partitions_from_tiles(tiles: list[int], target_resolution: int) -> list[int]:
    """Convert a list of H3 tiles to partitions at the target resolution.

    Args:
        tiles: A list of H3 cell indices.
        target_resolution: The desired H3 resolution level for the partitions.

    Returns:
        A list of H3 cell indices at the target resolution.
    """
    raise NotImplementedError("Not implemented yet")

In [None]:
def h3_tile_to_geojson_area(tile: int) -> shapely.Polygon:
    """Convert an H3 tile index to a GeoJSON Polygon representing its area.

    Note:
        H3 uses (latitude, longitude) order for coordinates, while shapely Polygon uses (longitude, latitude) order.

    Args:
        tile: An H3 cell index.

    Returns:
        A shapely Polygon representing the area of the H3 cell.
    """
    raise NotImplementedError("Not implemented yet")

In [None]:
def load_data(
    h3_indexes, resolution, partition_resolution, percentiles, return_periods, scenarios, years
):
    all_df = []

    partitions = h3_partitions_from_tiles(tiles=h3_indexes, target_resolution=partition_resolution)
    print("loading data from:", len(partitions), "partitions")
    for partition in partitions:
        source = f"{path}/lvl{partition_resolution}_idx={partition}/{partition}.parquet"

        table = parquet.read_table(
            source,
            columns=["value", f"lvl{resolution}_idx"],
            filters=[
                ("percentile", "in", percentiles),
                ("year", "in", years),
                ("return_period", "in", return_periods),
                ("scenario", "in", scenarios),
            ],
        )

        all_df.append(table.to_pandas())

    signal_df = pd.concat(all_df).reset_index(drop=True)
    signal_df = signal_df.rename(columns={f"lvl{resolution}_idx": "h3"})
    return signal_df


In [None]:
def add_h3_polygons_to_dataframe(df, res):
    polygons = []
    for tile in df.h3:
        polygon = h3_tile_to_geojson_area(tile)
        polygons.append(polygon)

    df["geometry"] = polygons
    return gpd.GeoDataFrame(data=df, crs="epsg:4326", geometry="geometry")

In [None]:
h3_indexes = polygon_to_h3_cells(area_of_interest, h3_resolution)

In [None]:
df = load_data(
    h3_indexes, h3_resolution, h3_partition, percentiles, return_periods, scenarios, years
)

In [None]:
df = add_h3_polygons_to_dataframe(df, 6)

In [None]:
# Plot
fig, ax = plt.subplots(figsize=(10, 10))

df_web_mercator = df.to_crs(epsg=3857)

df_web_mercator.plot(
    column="value",
    ax=ax,
    legend=True,
    cmap='coolwarm',
    edgecolor=None,
    alpha=0.4,
    legend_kwds={'label': f"temperature (degC)", 'shrink': 0.7},
)
contextily.add_basemap(ax, source=contextily.providers.CartoDB.Positron)

ax.set_axis_off()
plt.tight_layout()
plt.show()


# Tests

In [None]:
aoi_1 = shapely.Polygon((
    (41.29856899498296, 2.1317027216807674),
    (41.28759060834369, 2.150611938905854),
    (41.338592159365646, 2.1815542943640764),
    (41.37730082982284, 2.199603973338526),
    (41.406316080391235, 2.2288260129995194),
    (41.43724738511128, 2.238276899124429),
    (41.44033624237886, 2.1523800221088436),
    (41.36568972197628, 2.048369841630233),
    (41.29856899498296, 2.1317027216807674)
))
aoi_1_tiles_lvl6 = [604489803903270911, 604489803769053183, 604489803634835455,
       604489803500617727]
aoi_1_tiles_lvl4 = [595482612701593599]

In [None]:
def check_polygon_to_h3(polygon, res, expected_output):
    result = polygon_to_h3_cells(polygon, res)
    assert len(result) > 0, "0 cells found"
    assert h3.get_resolution(result[0]) == res, f"incorrect cell resolution found {h3.get_resolution(result[0])} expected: {res}"
    assert expected_output == result.tolist(), f"cells do not match the expected output"

In [None]:
print("Checking polygon to H3 cells")
check_polygon_to_h3(aoi_1, 4, aoi_1_tiles_lvl4)
check_polygon_to_h3(aoi_1, 6, aoi_1_tiles_lvl6)
check_polygon_to_h3(aoi_2, 4, aoi_2_tiles_lvl4)
check_polygon_to_h3(aoi_2, 6, aoi_2_tiles_lvl6)
print("All tests OK")

In [None]:
def check_partitions_from_tiles(tiles: list[int], resolution: int, expected_output):
    partitions = h3_partitions_from_tiles(tiles, resolution)
    assert partitions == expected_output

In [None]:
print("Checking tiles to parent")
check_partitions_from_tiles([604489803903270911], 4, [595482612701593599])
check_partitions_from_tiles([604489803903270911, 627004010341785599], 4, [595482612701593599, 595478815950503935])
check_partitions_from_tiles([604489803903270911], 6, [604489803903270911])
print("All tests OK")

In [None]:
def check_tile_to_polygon(tile, expected_output_coords):
    expected_polygon = shapely.Polygon(expected_output_coords)
    result = h3_tile_to_geojson_area(tile)
    if expected_polygon != result:
        reversed_polygon = shapely.Polygon([(lng, lat) for lat, lng in expected_output_coords])
        if result == reversed_polygon:
            print("The generated polygon has (lat, lng) tuples, (lng, lat) expected")
        else:
            print("Incorrect polygon")
        raise ValueError("The generated polygon is invalid")

In [None]:
polygon_coords_lat_lng = (
    (2.1269044754339017, 41.37027964798073),
    (2.118627606630568, 41.33691535788342),
    (2.15130946864786, 41.316497568278464), 
    (2.1922884467862196, 41.32943910908087), 
    (2.2005990008882703, 41.362810486371764), 
    (2.1678968928543587, 41.38323324092303),
    (2.1269044754339017, 41.37027964798073),
)
tile = 604489803903270911
check_tile_to_polygon(tile, polygon_coords_lat_lng)
print("All tests OK")