# Spatial join between regions and hospitals in Italy

Import necessary libraries and modules

In [None]:
from libadalina_core.readers import geopackage_to_dataframe
import pathlib
import os

Read the geopackages containing the location of hospitals and regions in Italy and returns geopandas DataFrames

In [None]:
base_path = pathlib.Path(os.environ.get("SAMPLES_DIR", ""))

hospitals = geopackage_to_dataframe(
    str(base_path / "healthcare" / "EU_healthcare.gpkg"),
    "EU"
)[["hospital_name", "geometry", "city", "cap_beds"]]

regions = geopackage_to_dataframe(
        str(base_path / "regions" / "NUTS_RG_20M_2024_4326.gpkg"),
        "NUTS_RG_20M_2024_4326.gpkg"
    )[["LEVL_CODE", "NUTS_NAME", "CNTR_CODE", "geometry"]]

Import libadalina-core spatial operators for performing spatial joins and aggregations.

In [None]:
from libadalina_core.spatial_operators import spatial_join, JoinType, spatial_aggregation, AggregationType, \
    AggregationFunction

For the sake of this example, filter the regions to select only those that correspond to the provinces of Milan and Cremona.
`regions` and `filtered_regions` are geopandas DataFrame at this step

In [None]:
# select province of Milan and Cremona
filtered_regions = regions[
    (regions['LEVL_CODE'] == 3) &
    (regions['CNTR_CODE'] == "IT") &
    (regions['NUTS_NAME'].str.contains('Milano|Cremona', case=False))
]

Join provinces and hospitals in such a way that for each province we get the hospitals that are located withing its boundaries.
The `result` is a PySpark DataFrame having an entry for each pair of province and hospital in that province.

In [None]:
result = (spatial_join(filtered_regions, hospitals, join_type=JoinType.LEFT)
          # join operator renames the geometries adding suffixes _left and _right to avoid conflicts
          .withColumnRenamed('geometry_left', 'geometry'))
result.show(truncate=False)

Aggregate the results to obtain the number of hospitals, the total number of beds and the average number of beds in each province.
Aggregation is performed based on the geometry.

In [None]:
result = spatial_aggregation(result, aggregate_functions=[
    AggregationFunction("hospital_name", AggregationType.COUNT, 'hospitals'),
    AggregationFunction("cap_beds", AggregationType.SUM, 'total_beds'),
    AggregationFunction("cap_beds", AggregationType.AVG, 'average_beds'),
])
result.show(truncate=False)