<img src="https://wherobots.com/wp-content/uploads/2023/12/Inline-Blue_Black_onWhite@3x.png" alt="drawing" width="600"/>

# Portfollio Risk Assessmemnt



<table border="0">
 <tr>
         <!-- <td><b style="font-size:30px"><img style="display: block; margin: auto; width:500px" src="./assets/ca-fire.jpg"/></b></td>-->
    <td width="75%" style="font-size: 19px;">Today We Will:
        
1. Read Vector and Raster Data
1. Visualizing with SedonaKepler
1. Scaled Spatial Joins between vector and multiple raster layers
1. Hot/Cold Spot Analysis
   

Data sets:
- Mock Policy Holders (Thank you Annex Risk)
- Multi event River Inundation
- Cat 5 Storm Surge

 
Process:
1. Policy Holder Hot Spot Analysis
1. Storm Risk Assessment
1. Profit


Along the way we'll introduce some important concepts for working with WherobotsDB like the Spatial DataFrame structure and querying WherobotsDB databases and tables using Spatial SQL.

Let's get started!
</td>

 </tr>
 <tr>
 </tr>
</table>


<table border="0" align='left' width="100%">


 <tr>
         <td width="75%px" style="font-size:22px">
        When using Apache Sedona (in Wherobots or not) the first step is to create the `Context`. This context represents the connection to the cluster. The first time we set the config and create the `Context` all of the nodes in the cluster are brought on line
    </td>
         <td align='right'>
             <b style="font-size:30px"><img align='right' style="display: block; margin: auto; width:250px" src="https://25.media.tumblr.com/ba7e79ff0c1fd792cd1fe2236e25f184/tumblr_mszn71jdjl1qmgfhzo1_1280.gif"/></b></td>
 </tr>
 <tr>
 </tr>
</table>

In [1]:
%%time
from sedona.spark import *
from pyspark.sql.functions import col, count, when, expr,lit,explode,broadcast, desc
from datetime import datetime
from sedona.maps.SedonaKepler import SedonaKepler
start = datetime.now()

config = SedonaContext.builder(). \
    config("spark.hadoop.fs.s3a.bucket.wherobots-examples.aws.credentials.provider","org.apache.hadoop.fs.s3a.AnonymousAWSCredentialsProvider"). \
    getOrCreate()


sedona = SedonaContext.create(config)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
                                                                                

CPU times: user 885 ms, sys: 259 ms, total: 1.14 s
Wall time: 28.9 s


### Load the Policy Holder locations and Hexagons. 

#### Reading the parquet files is straight forward using the dataframe API

In [2]:
%%time
policy_holders = sedona.read.option("recursiveFileLookup", "true").format("geoparquet") \
                            .load("s3://wherobots-examples/data/examples/risk_summit/locations/")
_hex = sedona.read.option("recursiveFileLookup", "true").format("geoparquet") \
                            .load("s3://wherobots-examples/data/examples/risk_summit/joined_hex/")
_hex.createOrReplaceTempView("hex")
policy_holders.createOrReplaceTempView("policies")

                                                                                

CPU times: user 2.35 ms, sys: 4.57 ms, total: 6.92 ms
Wall time: 3.73 s


In [3]:
%%time
#Wall time: 20.5 s
print(f"COUNT POLICY HOLDERS: {policy_holders.count()}")
print(f"COUNT HEX: {_hex.count()}")

                                                                                

COUNT POLICY HOLDERS: 7955




COUNT HEX: 1163
CPU times: user 31.1 ms, sys: 4.32 ms, total: 35.4 ms
Wall time: 30 s


                                                                                

#### SQL in Sedona works just like it does with other SQL engines. 

In [4]:
%%time
#Wall time: 32 s
enriched_hex = sedona.sql("""
SELECT 
    t1.h3,
    t1.geometry,
    t1.total_cov,
    count(t2.points) as cnt_pnts 
FROM 
    hex t1 
LEFT JOIN
    policies t2 
WHERE
    ST_INTERSECTS(t1.geometry,t2.points) group by 1,2,3""")
enriched_hex.printSchema()
enriched_hex.count()

root
 |-- h3: long (nullable = true)
 |-- geometry: geometry (nullable = true)
 |-- total_cov: double (nullable = true)
 |-- cnt_pnts: long (nullable = false)





CPU times: user 34.1 ms, sys: 0 ns, total: 34.1 ms
Wall time: 34.9 s


                                                                                

1163

In [3]:
%%time
#Wall time: 30.9 s
kmap = SedonaKepler.create_map()

SedonaKepler.add_df(
    kmap,
    policy_holders,
    "policy_holders"
)
SedonaKepler.add_df(
    kmap,
    enriched_hex,
    "enriched_hex"
)

kmap

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter




CPU times: user 468 ms, sys: 13.4 ms, total: 482 ms
Wall time: 30.9 s


                                                                                

<body>
    <p>
        The Getis-Ord <b>G<sup>*</sup></b> statistic is a spatial analysis tool used to identify statistically significant 
        <span style="color: red; font-weight: bold;">hot spots</span> and 
        <span style="color: blue; font-weight: bold;">cold spots</span> in geospatial data.
    </p>
    <p>
        It measures the degree of clustering of high or low values within a specified spatial context, such as neighborhoods or regions, by comparing local sums to the overall data distribution.
    </p>
    <p>
        Results are typically expressed as z-scores, where high positive values indicate clustering of 
        <span style="color: red; font-weight: bold;">high values</span> (<span style="color: red;">hot spots</span>) and low negative values indicate clustering of 
        <span style="color: blue; font-weight: bold;">low values</span> (<span style="color: blue;">cold spots</span>).
    </p>
</body>

In [5]:
%%time
#Wall time: 14.8 s
from sedona.stats.hotspot_detection.getis_ord import g_local
from sedona.stats.weighting import add_distance_band_column

distance_band_radius = 1.0

gi_df = g_local(
    add_distance_band_column( # adds the "weights" columns
        enriched_hex,
        distance_band_radius,
        binary=True,
        include_self=True,
        self_weight=1.0
    ),
    "total_cov",
    "weights",
    star=True
).drop("weights")




CPU times: user 29.6 ms, sys: 9.99 ms, total: 39.6 ms
Wall time: 33.3 s


                                                                                

In [52]:
gi_df.printSchema()

root
 |-- h3: long (nullable = true)
 |-- geometry: geometry (nullable = true)
 |-- total_cov: double (nullable = true)
 |-- cnt_pnts: long (nullable = true)
 |-- G: double (nullable = true)
 |-- EG: double (nullable = true)
 |-- VG: double (nullable = true)
 |-- Z: double (nullable = true)
 |-- P: double (nullable = true)



In [53]:
kmap = SedonaKepler.create_map()

SedonaKepler.add_df(
    kmap,
    gi_df,
    "cells"
)

kmap

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


                                                                                

<table border="0" align='left' width="100%">


 <tr>
         <td width="60%px" style="font-size:22px">
         <div style="background-color: #ffffff; padding: 20px; border-radius: 8px; box-shadow: 0 4px 6px rgba(0, 0, 0, 0.1); font-family: Arial, sans-serif; color: #333; line-height: 1.6;">
        <h1 style="font-size: 24px; color: #555;">Policy Holder Hot Spots and Flood Risk Analysis</h1>
        <p>
            Now that we can see our policy holder <span style="color: red; font-weight: bold;">hot spots</span>, let's map 
            <span style="color: blue; font-weight: bold;">flood risk</span> so we can overlay the two.
        </p>
        <p>
            Looking at the <span style="color: blue; font-weight: bold;">flood risk</span> alongside our investment 
            <span style="color: red; font-weight: bold;">hot spots</span> will tell us if our overinvestment in these areas is truly a high risk.
        </p>
        <p>
            Let's start by selecting a subset of our <span style="color: red; font-weight: bold;">hot spots</span> to focus on.
        </p>
    </div>
    </td>
         <td align='right'>
             <b style="font-size:30px"><img align='right' style="display: block; margin: auto; width:350px" src="https://images-wixmp-ed30a86b8c4ca887773594c2.wixmp.com/f/7fd19610-cc88-4e2e-a929-ca0a008d6bb0/dc41zjt-7c4d88e6-8f23-4761-92aa-166374658e14.gif?token=eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiJ1cm46YXBwOjdlMGQxODg5ODIyNjQzNzNhNWYwZDQxNWVhMGQyNmUwIiwiaXNzIjoidXJuOmFwcDo3ZTBkMTg4OTgyMjY0MzczYTVmMGQ0MTVlYTBkMjZlMCIsIm9iaiI6W1t7InBhdGgiOiJcL2ZcLzdmZDE5NjEwLWNjODgtNGUyZS1hOTI5LWNhMGEwMDhkNmJiMFwvZGM0MXpqdC03YzRkODhlNi04ZjIzLTQ3NjEtOTJhYS0xNjYzNzQ2NThlMTQuZ2lmIn1dXSwiYXVkIjpbInVybjpzZXJ2aWNlOmZpbGUuZG93bmxvYWQiXX0.Orj_FsxTdGTbQ2pbH3Q8YgBqs6knaCXhXmQm_VWDvA8"/></b></td>
 </tr>
 <tr>
 </tr>
</table>

In [6]:

from ipyleaflet import Map, GeomanDrawControl, GeoData, Choropleth, linear
import geopandas as gpd
my_lovely_map = Map(center = (28.584522,-82.30957), zoom = 6, min_zoom = 1, max_zoom = 20)
draw_control = GeomanDrawControl()

draw_control.rectangle = {
    "pathOptions": {
        "fillColor": "#0957c3",
        "color": "#4b93f7",
        "fillOpacity": .5
    }
}

feature_collection_all = {
    'type': 'FeatureCollection',
    'features': []
}

last_feature = {
    'type': 'FeatureCollection',
    'features': []
}

def handle_draw(self, action, geo_json): 
    feature_collection_all['features'].append(geo_json)
    last_feature['features'] = geo_json

draw_control.on_draw(handle_draw)
my_lovely_map.add(draw_control)

#sedona conversions
df = gi_df.toPandas()
gdf = gpd.GeoDataFrame(df, geometry="geometry")
gdf.index = gdf['h3']
geo_data = gdf.__geo_interface__
choro_data = dict(zip(gdf["h3"].astype(str).to_list(), gdf["Z"].to_list()))


layer = Choropleth(
    geo_data=geo_data,
    choro_data=choro_data,
    colormap=linear.Paired_06,
    border_color='grey',
    style={'fillOpacity': 0.8})

my_lovely_map.add(layer)


my_lovely_map

                                                                                

Map(center=[28.584522, -82.30957], controls=(ZoomControl(options=['position', 'zoom_in_text', 'zoom_in_title',…

Let’s add some flood data.

The European Commission’s Joint Research Centre publishes **River Flood Hazard Maps**.

These datasets are gridded and depict flood-prone areas in Europe and worldwide for river flood events of varying magnitudes (from 1-in-10-year to 1-in-500-year), representing inundation along the river network for five different flood return periods.

Here, we are loading the 10-, 20-, 50-, 75-, and 100-year inundation predictions.

**Source:** [European Commission Flood Hazard Data](https://data.jrc.ec.europa.eu/collection/id-0054)


In [72]:
# Read directory of Flood Risk 
flood_layers_dir = f"""s3://wherobots-examples/data/examples/global_flood_hazard_layers/florida/"""


# Create a DataFrame containing raster data
df_geotiff = sedona.read.format("binaryFile")\
    .option("pathGlobFilter", "*.tif")\
    .option("recursiveFileLookup", "true")\
    .load(flood_layers_dir).drop("content").withColumn("rast", expr("RS_FromPath(path)"))

_aoi = str(last_feature['features'][0]).replace("'",'"')

In [73]:
df_geotiff.createOrReplaceTempView("flood_raster")

In [74]:
flood_raster_tiled = sedona.sql(f"""
with tiled as (
SELECT 
    RS_TileExplode(rast, 256, 256) AS (x, y, tile), 
    reverse(split(RS_BandPath(rast), '/'))[0] as path,
    int(REPLACE(SPLIT(REVERSE(SPLIT(RS_BandPath(rast), '/'))[0],'_')[3],'RP','')) as event
    FROM flood_raster)
SELECT * from tiled 
    where RS_Intersects(ST_GeomFromGeoJSON('{_aoi}'), tile) """)

In [75]:
flood_raster_tiled.printSchema()

root
 |-- x: integer (nullable = false)
 |-- y: integer (nullable = false)
 |-- tile: raster (nullable = false)
 |-- path: string (nullable = true)
 |-- event: integer (nullable = true)



In [76]:
flood_raster_tiled.count()

70

In [77]:
flood_raster_tiled.createOrReplaceTempView("flood_raster_tiled")

In [78]:
flood_raster_tiled.selectExpr("x","y","path","event").show(truncate=False)

+---+---+----------------------------+-----+
|x  |y  |path                        |event|
+---+---+----------------------------+-----+
|34 |5  |ID43_N29_W90_RP100_depth.tif|100  |
|35 |5  |ID43_N29_W90_RP100_depth.tif|100  |
|36 |5  |ID43_N29_W90_RP100_depth.tif|100  |
|37 |5  |ID43_N29_W90_RP100_depth.tif|100  |
|38 |5  |ID43_N29_W90_RP100_depth.tif|100  |
|39 |5  |ID43_N29_W90_RP100_depth.tif|100  |
|40 |5  |ID43_N29_W90_RP100_depth.tif|100  |
|34 |6  |ID43_N29_W90_RP100_depth.tif|100  |
|35 |6  |ID43_N29_W90_RP100_depth.tif|100  |
|36 |6  |ID43_N29_W90_RP100_depth.tif|100  |
|37 |6  |ID43_N29_W90_RP100_depth.tif|100  |
|38 |6  |ID43_N29_W90_RP100_depth.tif|100  |
|39 |6  |ID43_N29_W90_RP100_depth.tif|100  |
|40 |6  |ID43_N29_W90_RP100_depth.tif|100  |
|34 |5  |ID43_N29_W90_RP75_depth.tif |75   |
|35 |5  |ID43_N29_W90_RP75_depth.tif |75   |
|36 |5  |ID43_N29_W90_RP75_depth.tif |75   |
|37 |5  |ID43_N29_W90_RP75_depth.tif |75   |
|38 |5  |ID43_N29_W90_RP75_depth.tif |75   |
|39 |5  |I

Now that we have a the flood depths layers in our database, we will extract that inundation percent coverage for each mapping unit.  

In [79]:
%%time
gi_df.createOrReplaceTempView("gi")
hex_river_inundation_prcnt_df = sedona.sql(f""" 

WITH reducedhex as (select * from gi where ST_INTERSECTS(geometry,ST_GeomFromGeoJSON('{_aoi}'))), 
matched_tile AS (
  SELECT 
      t2.tile,
      t2.path,
      t2.event,
      t1.h3,
      t1.geometry
  FROM 
      reducedhex t1
   JOIN 
      flood_raster_tiled t2
  ON RS_Intersects(t1.geometry, t2.tile))
  
select 
    event,
    h3,
    geometry,
    RS_ZonalStats(tile,geometry,'count') / 4460.0 as inundation_percent


from matched_tile

""")

CPU times: user 1.93 ms, sys: 385 μs, total: 2.32 ms
Wall time: 34.1 ms


In [80]:
%%time
hex_river_inundation_prcnt_df = hex_river_inundation_prcnt_df.cache()
hex_river_inundation_prcnt_df.where("inundation_percent>0.2").show()

                                                                                

+-----+------------------+--------------------+-------------------+
|event|                h3|            geometry| inundation_percent|
+-----+------------------+--------------------+-------------------+
|  100|604680457501540351|POLYGON ((-82.232...| 0.2024663677130045|
|  100|604690519670390783|POLYGON ((-81.838...| 0.4437219730941704|
|  100|604690519133519871|POLYGON ((-81.802...|0.27376681614349774|
|   75|604690519670390783|POLYGON ((-81.838...|0.43430493273542603|
|   75|604690519133519871|POLYGON ((-81.802...| 0.2697309417040359|
|   50|604690519670390783|POLYGON ((-81.838...| 0.4246636771300448|
|   50|604690519133519871|POLYGON ((-81.802...| 0.2556053811659193|
|   20|604690519670390783|POLYGON ((-81.838...| 0.3647982062780269|
|   10|604690519670390783|POLYGON ((-81.838...| 0.3087443946188341|
+-----+------------------+--------------------+-------------------+

CPU times: user 25.8 ms, sys: 4.57 ms, total: 30.4 ms
Wall time: 36 s


                                                                                

In [81]:
hex_river_inundation_prcnt_df.count()

395

In [82]:
hex_river_inundation_prcnt_pivot_df.printSchema()

root
 |-- h3: long (nullable = true)
 |-- geometry: geometry (nullable = true)
 |-- ten_year: double (nullable = true)
 |-- twenty_year: double (nullable = true)
 |-- fifty_year: double (nullable = true)
 |-- seventyfive_year: double (nullable = true)
 |-- onehundred_year: double (nullable = true)



In [83]:
hex_river_inundation_prcnt_df.createOrReplaceTempView("hex_river_inundation_prcnt")

In [84]:
%%time
hex_river_inundation_prcnt_pivot_df = sedona.sql("""

SELECT 
*
FROM hex_river_inundation_prcnt
    PIVOT (
        SUM(inundation_percent) AS a
        FOR event IN ('10' AS ten_year,'20' as twenty_year,'50' as fifty_year,'75' as seventyfive_year,'100' as onehundred_year)
    )

""")
hex_river_inundation_prcnt_pivot_df.where("onehundred_year >0").show()

+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|                h3|            geometry|            ten_year|         twenty_year|          fifty_year|    seventyfive_year|     onehundred_year|
+------------------+--------------------+--------------------+--------------------+--------------------+--------------------+--------------------+
|604680442871807999|POLYGON ((-82.450...| 0.05560538116591928| 0.06905829596412556| 0.08834080717488789| 0.09730941704035874|  0.1015695067264574|
|604680457233104895|POLYGON ((-82.232...|                 0.1| 0.10627802690582959| 0.10919282511210762|  0.1109865470852018| 0.11165919282511211|
|604680410659553279|POLYGON ((-82.436...|0.019730941704035873| 0.02466367713004484| 0.04596412556053812| 0.05022421524663677|0.051121076233183856|
|604690519804608511|POLYGON ((-81.865...| 0.04798206278026906|  0.0515695067264574|0.054260089686098655| 0.05515695067

With our 1- to 100-year flood event inundation depths captured let’s bring in storm surge data.

NOAA publishes the results of their **Sea, Lake, and Overland Surges from Hurricanes (SLOSH)** model. This data is also presented in gridded (.tif) format.

**Source:** [NOAA Storm Surge Data](https://www.nhc.noaa.gov/nationalsurge/#data)


In [85]:
storm_surge_source = "s3://wherobots-examples/data/examples/storm_surge_risk/us_Category5_MOM_Inundation_HIGH_FLORIDA.tif"
storm_surge_df = sedona.sql(f"""
SELECT 
    'us_Category5_MOM_Inundation_HIGH.tif' AS path, 
    RS_FromPath("{storm_surge_source}") AS rast""")
storm_surge_df.createOrReplaceTempView("storm_surge_raster")
storm_surge_df_tiled = sedona.sql(""" 
SELECT 
    RS_TileExplode(rast,128,128) as (x, y, tile),
    path
from storm_surge_raster"""
                                )
storm_surge_df_tiled.createOrReplaceTempView("storm_surge_raster_tiled")

In [86]:
storm_surge_df_tiled.show()

+---+---+--------------------+--------------------+
|  x|  y|                tile|                path|
+---+---+--------------------+--------------------+
|  0|  0|OutDbGridCoverage...|us_Category5_MOM_...|
|  1|  0|OutDbGridCoverage...|us_Category5_MOM_...|
|  2|  0|OutDbGridCoverage...|us_Category5_MOM_...|
|  3|  0|OutDbGridCoverage...|us_Category5_MOM_...|
|  4|  0|OutDbGridCoverage...|us_Category5_MOM_...|
|  5|  0|OutDbGridCoverage...|us_Category5_MOM_...|
|  6|  0|OutDbGridCoverage...|us_Category5_MOM_...|
|  7|  0|OutDbGridCoverage...|us_Category5_MOM_...|
|  8|  0|OutDbGridCoverage...|us_Category5_MOM_...|
|  9|  0|OutDbGridCoverage...|us_Category5_MOM_...|
| 10|  0|OutDbGridCoverage...|us_Category5_MOM_...|
| 11|  0|OutDbGridCoverage...|us_Category5_MOM_...|
| 12|  0|OutDbGridCoverage...|us_Category5_MOM_...|
| 13|  0|OutDbGridCoverage...|us_Category5_MOM_...|
| 14|  0|OutDbGridCoverage...|us_Category5_MOM_...|
| 15|  0|OutDbGridCoverage...|us_Category5_MOM_...|
| 16|  0|Out

                                                                                

In [87]:
%%time
#Wall time: 1min 36s
hex_storm_surge_inundation_max_df = sedona.sql(f""" 

WITH reducedhex AS (
    select h3, geometry from gi where ST_INTERSECTS(geometry,ST_GeomFromGeoJSON('{_aoi}'))
), 
 matched_tile AS (
  SELECT 
      t2.tile,
      t1.h3,
      t1.geometry
  FROM 
      reducedhex t1
  LEFT JOIN  
      storm_surge_raster_tiled t2
  WHERE RS_Intersects(t1.geometry, t2.tile))
  
select 
    h3,
    FIRST(geometry),
    MAX(RS_ZonalStats(tile,geometry,'max')) as max_inundation_depth
from matched_tile
group by h3
""")
#hex_storm_surge_inundation_percent_df.where("inundation_percent>0").show()

CPU times: user 1.64 ms, sys: 328 μs, total: 1.97 ms
Wall time: 19 ms


In [92]:
%%time
kmap = SedonaKepler.create_map()

SedonaKepler.add_df(
    kmap,
    hex_storm_surge_inundation_max_df,
    "Storm Surge"
)
SedonaKepler.add_df(
    kmap,
    hex_river_inundation_prcnt_pivot_df,
    "River Inundation"
)
SedonaKepler.add_df(
    kmap,
    gi_df,
    "Policy Hotspots"
)

kmap

User Guide: https://docs.kepler.gl/docs/keplergl-jupyter


Out of range float values are not JSON compliant
Supporting this message is deprecated in jupyter-client 7, please make sure your message is JSON-compliant
  content = self.pack(content)
Out of range float values are not JSON compliant
Supporting this message is deprecated in jupyter-client 7, please make sure your message is JSON-compliant
  content = self.pack(content)
[Stage 303:>                                                        (0 + 1) / 1]

CPU times: user 192 ms, sys: 24.7 ms, total: 216 ms
Wall time: 1min 54s


Out of range float values are not JSON compliant
Supporting this message is deprecated in jupyter-client 7, please make sure your message is JSON-compliant
  content = self.pack(content)


[Stage 303:>                                                        (0 + 1) / 1]