<img src="https://wherobots.com/wp-content/uploads/2023/12/Inline-Blue_Black_onWhite@3x.png" alt="drawing" width="300"/>

# European Road Risk Analysis

This notebook demonstrates the process and speed of using Wherobots Cloud as your Spatial Analysis Engine.

Today we will see:
 
1. Reading and writing various data sources
2. Aggregating and analyzing raster and vector data
3. Calculating a basic risk score across several paramaters and at various scales
4. Visualizing analytical results
   
Let's get started!

<img src="road_risk_preview.jpg" alt="drawing" width="600"/>


In [None]:
%%time
from sedona.spark import *
from sedona.sql import st_functions as stf
from sedona.sql import st_constructors as stc
from sedona.sql import st_predicates as stp
from pyspark.sql.functions import expr, col, count, isnan, when, explode,broadcast,min as spark_min, monotonically_increasing_id, year, lit, to_timestamp
from pyspark.sql.window import Window

import os



config = SedonaContext.builder().\
     config("spark.hadoop.fs.s3a.bucket.wherobots-examples.aws.credentials.provider",
            "org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider").\
     getOrCreate()
sedona = SedonaContext.create(config)

## Data
1. Overture Maps Foundation Layers
    - Transportation 
    - Places
        - Service Stations
        - Housing
        - Emergency Services
    - Buildings
2. European Commision
    - River Flood Hazard Maps
    - Landslide Risk

#### Load Vector Assets 

In [None]:
nl_df= sedona.sql(f"""
SELECT
    geometry
FROM 
    wherobots_open_data.overture_2024_07_22.divisions_division_area 
WHERE
    country ='NL'
AND
    subtype = 'country'

 """)
nl_geom_wkt = nl_df.collect()[0].geometry


In [None]:
kepler_map = SedonaKepler.create_map()
SedonaKepler.add_df(kepler_map, nl_df, name="NL")
kepler_map

In [None]:
%%time
# read Overture Maps Foundation table.
nl_service_stations_df = sedona.table("""wherobots_open_data.overture_2024_07_22.places_place""")\
                               .where(expr(f"""ST_INTERSECTS(geometry,ST_GEOMFROMWKT("{nl_geom_wkt}"))"""))\
                               .where("""categories.primary in ('truck_repair_and_services_for_businesses','truck_repair','transmission_repair',
                                                                    'oil_change_station','brake_service_and_repair','automotive_services_and_repair',
                                                                    'automotive_repair','auto_glass_service','auto_body_shop')""")

nl_emergency_services_df  = sedona.table("""wherobots_open_data.overture_2024_07_22.places_place""")\
                               .where(expr(f"""ST_INTERSECTS(geometry,ST_GEOMFROMWKT("{nl_geom_wkt}"))"""))\
                               .where("""categories.primary in ('law_enforcement','police_department','air_ambulance',
                                                                'disaster_response_services','emergency_medicine','emergency_roadside_service',
                                                                'emergency_room','fire_department','hospital','towing_service','urgent_care_clinic')""")

nl_transportation_df = sedona.table("""wherobots_open_data.overture_2024_07_22.transportation_segment""")\
                          .where(expr(f"""ST_INTERSECTS(geometry,ST_GEOMFROMWKT("{nl_geom_wkt}"))"""))\
                          .where(expr("class = 'motorway'"))

nl_buildings_df = sedona.table("""wherobots_open_data.overture_2024_07_22.buildings_building""")\
                     .where(expr(f"""ST_INTERSECTS(geometry,ST_GEOMFROMWKT("{nl_geom_wkt}"))"""))\
                     .where("""class in ('residential','apartments') """)



# repartition, count and create temporary views
nl_emergency_services_df = nl_emergency_services_df.repartition(12)
print(f"Emergency Services: {nl_emergency_services_df.count()}")
nl_emergency_services_df.createOrReplaceTempView("nl_emergency_services")

nl_service_stations_df = nl_service_stations_df.repartition(12)
print(f"Service Stations: {nl_service_stations_df.count()}")
nl_service_stations_df.createOrReplaceTempView("nl_service_stations")

nl_transportation_df = nl_transportation_df.repartition(12)
print(f"Transportation: {nl_transportation_df.count()}")
nl_transportation_df.createOrReplaceTempView("nl_transportation")

nl_buildings_df = nl_buildings_df.repartition(12)
print(f"Buildings: {nl_buildings_df.count()}")
nl_buildings_df.createOrReplaceTempView("nl_buildings")

#### Load Raster Assets

In [None]:
%%time
#Wall time: 13.6 s
# Read directory of Flood Risk 
flood_layers_dir = f"""s3://wherobots-examples/data/examples/global_flood_hazard_layers/europe"""


# Create a DataFrame then temporary view containing raster data
flood_layers_df = sedona.read.format("binaryFile")\
    .option("pathGlobFilter", "*.tif")\
    .option("recursiveFileLookup", "true")\
    .load(flood_layers_dir).drop("content").withColumn("rast", expr("RS_FromPath(path)"))

flood_layers_df.createOrReplaceTempView("flood_raster")
flood_layers_df.show()
# tile the raster into 256x256 images and save as a temporary view 
flood_raster_tiled= sedona.sql(f"""
SELECT 
    RS_TileExplode(rast, 256, 256) AS (x, y, tile), 
    REVERSE(SPLIT(RS_BandPath(rast), '/'))[0] AS path,
    int(REPLACE(SPLIT(REVERSE(SPLIT(RS_BandPath(rast), '/'))[0],'_')[1],'RP','')) as event
FROM 
    flood_raster
WHERE 
    RS_Intersects(rast,ST_GEOMFROMWKT("{nl_geom_wkt}"))
AND
    int(REPLACE(SPLIT(REVERSE(SPLIT(RS_BandPath(rast), '/'))[0],'_')[1],'RP',''))  in (10, 50, 100)""")
flood_raster_tiled = flood_raster_tiled.repartition(12)
flood_raster_tiled.createOrReplaceTempView("flood_raster_tiled")
flood_raster_tiled.show()

In [None]:
%%time
#1.44 s
# Read the landslide risk tif
landslide_risk_tif = f"""s3://wherobots-examples/data/examples/EuropeanLandslideSusceptibilityMap/elsus_v2_4326_cog.tif"""
# 0 (-2147483647) = no data; 1 = very low; 2 = low; 3 = moderate; 4 = high; 5 = very high

# Create a DataFrame then temporary view containing raster data
landslide_risk_df = sedona.read.format("binaryFile")\
    .load(landslide_risk_tif).drop("content").withColumn("rast", expr("RS_FromPath(path)"))

landslide_risk_df.createOrReplaceTempView("land_slide_risk_raster")
landslide_risk_df.show()
# tile the raster into 256x256 images and save as a temporary view 
landslide_risk_tiled_df= sedona.sql(f"""
SELECT 
    RS_TileExplode(rast, 256, 256) AS (x, y, tile), 
    REVERSE(SPLIT(RS_BandPath(rast), '/'))[0] AS path 
FROM 
    land_slide_risk_raster
WHERE 
    RS_Intersects(rast,ST_GEOMFROMWKT("{nl_geom_wkt}"))
""")
landslide_risk_tiled_df= landslide_risk_tiled_df.repartition(12)
landslide_risk_tiled_df.createOrReplaceTempView("land_slide_risk_tiled")
landslide_risk_tiled_df.show()

#### Now we have all of our data assets loaded and ready to start our analysis. 
We are going to be creating a transportation risk index using H3 Hex bins as our mapping unit.
The process generaly looks like this. 
1. Isolate the transportation cooridors of Interest
2. Generate H3 hex bin map based on the intersection of these coordors
3. Engineer features based spatial relationships and add them to the H3 hex bin
4. Normalize feartures from 0-1 and then weight our features
5. Calcualte final risk score 

#### Our first step in the analysis process is to create our H3 hexagon grid over our road network (specifcally the motorway class) that we will report at. 

##### We are using H3 Level 6, each hex is approx. 36km<sup>2</sup> with each edge (and the disstance from the centere to any vertex) being 3.7km.
Note that we are caching (for faster access), repartationing (for better distribution) and creating and temporary view (for reuse in spatialSQL).

In [None]:
%%time
hex_scale= 6
road_hex = sedona.sql(f""" 
WITH h3_ids AS (SELECT 
   DISTINCT EXPLODE
               (
                   ST_H3CellIDs(geometry, {hex_scale}, true)
               ) AS h3 
FROM 
nl_transportation )


SELECT 
    h3,
    EXPLODE (ST_H3ToGeom(ARRAY(h3))) AS hex_geometry
FROM 
    h3_ids
""")
road_hex = road_hex.repartition(12)
road_hex.createOrReplaceTempView("hex_bins_d")

#### Our second step in the analysis is to join the max speed limit from our road network to out hexagons.

Note that we use a left join to retain _all_ mapping units and the join is predicated on the spatial relationship "Intersects". 

Again we repartition and create a temp view. 

In [None]:
road_hex.count()

In [None]:
%%time
# 13 s
hex_scale= 6
road_speed_hex = sedona.sql(f""" 
SELECT
    FIRST(ARRAY_MAX(t2.speed_limits.max_speed.value)) AS speed_limit,
    FIRST(t1.hex_geometry) AS geometry,
    t1.h3
FROM 
    hex_bins_d t1
LEFT JOIN 
    nl_transportation t2
ON
    ST_INTERSECTS(t1.hex_geometry, t2.geometry)
GROUP BY 
    t1.h3
""")
road_speed_hex = road_speed_hex.repartition(12)
road_speed_hex.createOrReplaceTempView("road_speed_hex")
road_speed_hex.count()

In [None]:
road_speed_hex.show()

In [None]:
road_speed_hex.printSchema()

In [None]:
kepler_map = SedonaKepler.create_map()
SedonaKepler.add_df(kepler_map, road_speed_hex, name="road_speed_hex")
kepler_map


#### Now we have our hex bins identified along our major road networks and as an added benefit we brought in the speed limits

We now start the process of aggregating the rest of our risk paramaters to our H3 cell
- Service Stations counts
- Landslide counts
- Emergency Services counts
- Landslide potential
- Flooding potential
  

In [None]:
sedona.sql(""" SELECT count(distinct h3) from hex_bins_d""").show()

In [None]:
%%time
#9.08 s
service_stations_df = sedona.sql("""
SELECT
    h.h3,
    count(p.id) as service_stations
FROM 
    hex_bins_d h
LEFT JOIN
    nl_service_stations p on ST_CONTAINS(h.hex_geometry,p.geometry)
GROUP BY 
    h.h3
""")
service_stations_df = service_stations_df.repartition(12)
service_stations_df.createOrReplaceTempView("hex_service_stations")
service_stations_df.count()

In [None]:
%%time
#8.76 s
emergency_services_df = sedona.sql("""
SELECT
    h.h3,
    count(p.id) as emergency_services
FROM 
    hex_bins_d h
LEFT JOIN
    nl_emergency_services p on ST_CONTAINS(h.hex_geometry,p.geometry)
GROUP BY 
    h.h3
""")
emergency_services_df = emergency_services_df.repartition(12)
emergency_services_df.createOrReplaceTempView("hex_emergency_services")
emergency_services_df.count()

In [None]:
%%time    
#Wall time: 8.69 s
res_buildings_df = sedona.sql("""


SELECT
    h.h3,
    count(b.id) as res_building
FROM 
    hex_bins_d h
LEFT JOIN
    nl_buildings b on ST_CONTAINS(h.hex_geometry,ST_CENTROID(b.geometry))
GROUP BY 
    h.h3
""")
res_buildings_df = res_buildings_df.repartition(12)
res_buildings_df.createOrReplaceTempView("hex_res_buildings")
res_buildings_df.count()

In [None]:
%%time
res_buildings_df.where("res_building>0").show()

In [None]:
%%time
#8.78 s
enriched_hex_df = sedona.sql("""
SELECT
    r.h3,
    r.geometry,
    r.speed_limit,
    s.service_stations,
    r.res_building,
    e.emergency_services
FROM
    road_speed_hex r
LEFT JOIN    
    hex_service_stations s using(h3) 
LEFT JOIN
    hex_res_buildings r using(h3) 
LEFT JOIN
    hex_emergency_services e using(h3)
""")

enriched_hex_df = enriched_hex_df.repartition(12)
enriched_hex_df.createOrReplaceTempView("enriched_hex")
enriched_hex_df.count()

In [None]:
%%time
enriched_hex_df.show()

In [None]:

kepler_map = SedonaKepler.create_map()
SedonaKepler.add_df(kepler_map, enriched_hex_df, name="roads")
kepler_map

##### Raster Data Joining with Zonal Statistics

In [None]:
%%time
# 8.54 s
landslide_df = sedona.sql("""
WITH matched_tile AS (
  SELECT 
      t2.tile,
      t1.h3,
      t1.geometry
  FROM 
      enriched_hex t1
  LEFT JOIN   
      land_slide_risk_tiled t2
  ON ST_OVERLAPS(RS_ENVELOPE(tile), t1.geometry)  
)

SELECT 
    max(RS_ZonalStats(tile, geometry, 'max')) AS max_landslide_class,
    matched_tile.h3 
FROM matched_tile
group by matched_tile.h3
""")
landslide_df = landslide_df.repartition(12)
landslide_risk_tiled_df.unpersist()
landslide_df.createOrReplaceTempView('hex_landslide')
landslide_df.count()

In [None]:
landslide_df.printSchema()

In [None]:
%%time
landslide_df.where("max_landslide_class is not null").show()

In [None]:
%%time
#Wall time: 6min 39s
flood_df = sedona.sql("""
WITH matched_tile AS (
  SELECT 
      t2.tile,
      t2.event,
      t1.h3,
      t1.geometry
  FROM 
       enriched_hex t1
  LEFT JOIN   
      flood_raster_tiled t2
  ON ST_OVERLAPS(RS_ENVELOPE(t2.tile), t1.geometry) 
)

SELECT 
    COUNT(RS_ZonalStats(matched_tile.tile,geometry, 'count')) AS flood_point_cnt,
    matched_tile.h3, 
    matched_tile.event
FROM matched_tile
GROUP BY 
h3, 
event
""")
flood_df = flood_df.repartition(12)
flood_df.createOrReplaceTempView('hex_flood')
flood_df.count()

In [None]:
flood_df.printSchema()

In [None]:
sedona.sql("SELECT * from hex_flood where h3=603929423816687615 order by event DESC").show()

In [None]:
%%time
flood_pivot_df = sedona.sql("""

SELECT * FROM hex_flood
    PIVOT (
        SUM(flood_point_cnt) AS a
        FOR event IN ('10' AS ten_year,'20' as twenty_year,'30' as thirty,'40' as fourty,'50' as fifty,'75' as seventyfive,'100' as one_hundred,'200' as two_hundred,'500' as five_hundred)
    )

""")
flood_pivot_df = flood_pivot_df.repartition(15)

flood_pivot_df.createOrReplaceTempView('hex_flood_pivot')

In [None]:
flood_pivot_df.show()

In [None]:
flood_pivot_df.count()

In [None]:
%%time
combined_df = sedona.sql("""
SELECT
    t1.geometry,
    .3 *(COALESCE((t1.res_building - 0) / (21042 -0 ),0)) AS res_building,
1 - (.5 *(COALESCE((t1.emergency_services - 0) / (113 - 0),0))) AS emergency_services,
1 - (.5 *(COALESCE((t1.service_stations - 0) / (59 - 0),0))) AS service_stations,
    .3 *(COALESCE((t1.speed_limit -30 ) / (130 - 30 ),0)) AS speed_limit,
    .6 *(COALESCE(1,0)) AS max_landslide_class,
    .4 *(COALESCE((t3.ten_year - 0) / (4 - 0),0)) AS ten_flood_point_cnt,
    .3 *(COALESCE((t3.twenty_year - 0) / (4 - 0),0)) AS twenty_flood_point_cnt,
    .2 *(COALESCE((t3.thirty - 0) / (4 - 0),0)) AS thirty_flood_point_cnt,
    .1 *(COALESCE((t3.fourty - 0) / (4 - 0),0)) AS fourty_flood_point_cnt,
    .09 *(COALESCE((t3.fifty - 0) / (4 - 0),0)) AS fifty_flood_point_cnt,
    .08 *(COALESCE((t3.seventyfive - 0) / (4 - 0),0)) AS seventyfive_flood_point_cnt,
    .07 *(COALESCE((t3.one_hundred - 0) / (4 - 0),0)) AS one_hundred_flood_point_cnt,
    .06 *(COALESCE((t3.two_hundred - 0) / (4 - 0),0)) AS two_hundred_flood_point_cnt,
    .05 *(COALESCE((t3.five_hundred - 0) / (4 - 0),0)) AS five_hundred_flood_point_cnt

FROM 
    enriched_hex t1
LEFT JOIN 
    hex_flood_pivot t3 USING (h3)
LEFT JOIN 
    hex_landslide t4 USING (h3)

""")


combined_df = combined_df.repartition(12)
combined_df.createOrReplaceTempView('combined_hex')
combined_df.count()

In [None]:
final_hex = sedona.sql("""

SELECT
    geometry,
res_building+emergency_services+service_stations+speed_limit+max_landslide_class+ten_flood_point_cnt+twenty_flood_point_cnt+thirty_flood_point_cnt+fourty_flood_point_cnt+fifty_flood_point_cnt+seventyfive_flood_point_cnt+one_hundred_flood_point_cnt+two_hundred_flood_point_cnt+five_hundred_flood_point_cnt as risk_score
FROM
    combined_hex
""")

In [None]:
final_hex.createOrReplaceTempView("final_risk")

Hex score Max: `4.250014257199886` 

Hex Score Min: `2.251052534627332`

In [None]:
final_hex_normal= sedona.sql("""
SELECT 
    geometry,
    (risk_score - 2.191 ) / (4.25-1.337) as norm_risk_score
FROM 
    final_risk 

""")
final_hex_normal=final_hex_normal.cache()
final_hex_normal.count()

In [None]:
kepler_map = SedonaKepler.create_map()
SedonaKepler.add_df(kepler_map, final_hex_normal, name="combined_df")
kepler_map