In [None]:
# Import python packages
import streamlit as st
import pandas as pd
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *
# We can also use Snowpark for our analyses!
from snowflake.snowpark.context import get_active_session
session = get_active_session()


# Calling an API to load Location Data
Snowflake allows users to securely create useful integrations to any rest api.  This example calls the **uk environmental agency apis** to retrieve the following:

-   location based flood areas along with their polygons 
-   Latest Flood Alerts
-   Water levels

All of which can be loaded into Snowflake using custom made functions. We will next explore how this works.

## Step 1 - Create a Network Rule
You will first need to create a newtwork rule to allow communication outside of snowflake - in this case, we are allowing the **Egress** of data from the environment agency to be loaded.

In [None]:
CREATE OR REPLACE NETWORK RULE ENVIRONMENTAL_AGENCY
MODE = EGRESS
TYPE = HOST_PORT
VALUE_LIST = ('environment.data.gov.uk:443');

## Step 2 - Create an External Integraion
Next, let's create an external integration we will use this integtation in any function that is created to load the data.  You can have multiple network rules for each integration.  This is useful if a function is collecting data from multiple sources.

In [None]:
CREATE OR REPLACE EXTERNAL ACCESS INTEGRATION ENV_INTEGRATION
ALLOWED_NETWORK_RULES = (ENVIRONMENTAL_AGENCY)
ENABLED=TRUE;

## Step 3 - Create Functions
Now we will create a function which will be used to load data.

### Function 1 - Current Flood Warnings
Th current flood warnings gives information on any recent alerts. 

**NB** This will only populate if flood workings currently exist.  In the summer months, this function might retrive zero results.

In [None]:
CREATE OR REPLACE FUNCTION DEFAULT_SCHEMA.FLOODS()
RETURNS variant
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'get_data'
EXTERNAL_ACCESS_INTEGRATIONS = (ENV_INTEGRATION)
PACKAGES = ('requests', 'pandas')
AS
$$
import _snowflake
import requests
import json
session = requests.Session()

def get_data():
    page = session.get(f'''https://environment.data.gov.uk/flood-monitoring/id/floods''')
    return page.json()
$$;

### Run the Flood warnings function
This function below loads all data into one field.  The example then uses **lateral flatten** to extract the data into multiple rows.  You will see that each flood warning is linked to a flood area **notation** code.  Flood areas along with their polygons are loaded from two seperate api's

In [None]:
select 
VALUE,
VALUE:"@id"::TEXT ID,
VALUE:description::TEXT DESCRIPTION,
VALUE:eaAreaName::TEXT AREA_NAME,
VALUE:eaRegionName::TEXT REGION_NAME,
VALUE:floodArea:"@id"::TEXT "Flood Area: ID",
VALUE:floodArea:"county"::TEXT "Flood Area: county",
VALUE:floodArea:"notation"::TEXT "Flood Area: notation",
VALUE:floodArea:"polygon"::TEXT "Flood Area: polygon",
VALUE:floodArea:"riverOrSea"::TEXT "Flood Area: River or Sea",
VALUE:floodAreaID ::TEXT "Flood Area ID",
VALUE:isTidal "is Tidal",
VALUE:message::TEXT "Message",
VALUE:severity::TEXT "severity",
VALUE:severityLevel "Severity Level",
VALUE:timeMessageChanged::DATETIME "Time Message Changed",
VALUE:timeRaised::DATETIME "Time Raised",
VALUE:timeSeverityChanged::DATETIME "Time Severity Changed"
from 

(select DEFAULT_SCHEMA.floods() V), LATERAL FLATTEN (V:items);

### Function 2 - Flood Areas
This function loads the flood areas.  There are api limits in each call, which is why this function limits the areas to 'X' rows.  There are also the capabilities to filter using other methods.

In [None]:
CREATE OR REPLACE FUNCTION DEFAULT_SCHEMA.FLOOD_AREAS(limit int)
RETURNS variant
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'get_data'
EXTERNAL_ACCESS_INTEGRATIONS = (ENV_INTEGRATION)
PACKAGES = ('requests', 'pandas')
AS
$$
import _snowflake
import requests
import json
session = requests.Session()

def get_data(limit):
    page = session.get(f'''https://environment.data.gov.uk/flood-monitoring/id/floodAreas?_limit={limit}''')
    return page.json()
$$;

### Function 3 - Flood Polygon
This function loads polygons for all the flood areas - you will note that this function has a parameter - it needs to know the area number before it can load the data.

The payload will return a **features** object.  Within each features object contains a **properties** object and a **geometry** object.

In [None]:
CREATE OR REPLACE FUNCTION DEFAULT_SCHEMA.FLOOD_POLYGON(notation text)
RETURNS variant
LANGUAGE PYTHON
RUNTIME_VERSION = 3.8
HANDLER = 'get_data'
EXTERNAL_ACCESS_INTEGRATIONS = (ENV_INTEGRATION)
PACKAGES = ('requests', 'pandas')
AS
$$
import _snowflake
import requests
import json
session = requests.Session()

def get_data(notation):
    page = session.get(f'''https://environment.data.gov.uk/flood-monitoring/id/floodAreas/{notation}/polygon''')
    return page.json()
$$;

SELECT DEFAULT_SCHEMA.FLOOD_POLYGON('112WAFTUBA')

### The Geometry Object - Handling the **MultiPolygon** data type

In the first section of this lab **Analyse Location Data in Snowflake**, you explored how to render polygons in the **pydeck** python tool.  You will also do the same here.

The snowflake custom function you have just created returns a multi polygon within the **geometry** object.  To use pydeck for this, we need put all the polygons on 1 level (no nested polygons), but produce multiple rows for each shape. 

Below is a python function which uses **Snowpark Dataframes** to convert all multi polygons into single polygons which will be split into multiple rows.

In [None]:
def polygon(data):
    # create a new data frame filter the dataframe where the type in each geography field contains the word 'Polygon'
    dataP = data.filter(call_function('ST_ASGEOJSON',col('GEOGRAPHY'))['type'].astype(StringType())=='Polygon')
    # create a new dataframe and Filter the dataframe where the type in each geography field contains the word 'Multi Polygon'
    dataM = data.filter(call_function('ST_ASGEOJSON',col('GEOGRAPHY'))['type'].astype(StringType())=='MultiPolygon')

    ## use the join table function to flatten the multi polygon into one row per polygon
    dataM = dataM.join_table_function('flatten',
                                        call_function('ST_ASGEOJSON',
                                        col('GEOGRAPHY'))['coordinates']).drop('SEQ',
                                                                               'KEY',
                                                                               'PATH',
                                                                               'INDEX',
                                                                               'THIS')                                                                                                        
    
    ## With the flattend results, create a new valid geography object with the type 'Polygon'
    dataM = dataM.with_column('GEOGRAPHY',
                                to_geography(object_construct(lit('coordinates'),
                                                        to_array('VALUE'),
                                                        lit('type'),
                                                        lit('Polygon')))).drop('VALUE')

    ### return both the converted polygons (dataM) as well as the already single polygons (dataP) into one dataframe

    return dataM.union(dataP).with_column_renamed('GEOGRAPHY','POLYGON')



### The Properties Object - Extracting Properties of the Flood Area

Just for information, below is an example of what properties are returned in the polygon.  You will note in the object, attributes such a area, description and river_sea are returned.
     
     "properties": {
        "AREA": "Wessex",
        "DESCRIP": "Upper River Avon and tributaries including Malmesbury, Dauntsey, Chippenham and Calne",
        "FWS_TACODE": "112WAFTUBA",
        "LA_NAME": "Gloucestershire, South Gloucestershire, Wiltshire",
        "QDIAL": "210013",
        "RIVER_SEA": "Bristol River Avon",
        "TA_NAME": "Upper Bristol Avon area"
      },
      "type": "Feature"

Here is an example tooltip to pickup key attributes from the properties payload.  This tooltip will be used in **Pydeck**

In [None]:
tooltip = {
   "html": """<b>Name:</b> {TA_NAME} <br> <b>Area:</b> {AREA} <br> <b>Description:</b> {DESCRIP} 
   <br> <b>QDIAL:</b> {DESCRIP}
   <br> <b>River Sea:</b> {RIVER_SEA}
   <br> <b>Local Authority Names:</b> {LA_NAME}""",
   "style": {
       "width":"50%",
        "backgroundColor": "steelblue",
        "color": "white",
       "text-wrap": "balance"
   }
}

### Rendering Flood Areas with Polygons in Pydeck

Here is a view of one set of polygons returned for a specific flood area.  This calls two APIs then visualises the results in pydeck.  You will notice that the tool tips are the same for each polygon.  This is because the **properties** are at **MultPolygon** level.

In [None]:
import pydeck as pdk
import json

data = session.sql(''' SELECT DEFAULT_SCHEMA.FLOOD_POLYGON('112WAFTUBA') GEO ''')
data = data.with_column('TA_NAME',col('GEO')['features'][0]['properties']['TA_NAME'].astype(StringType()))
data = data.with_column('AREA',col('GEO')['features'][0]['properties']['AREA'].astype(StringType()))
data = data.with_column('DESCRIP',col('GEO')['features'][0]['properties']['DESCRIP'].astype(StringType()))
data = data.with_column('RIVER_SEA',col('GEO')['features'][0]['properties']['RIVER_SEA'].astype(StringType()))
data = data.with_column('QDIAL',col('GEO')['features'][0]['properties']['QDIAL'].astype(StringType()))
data = data.with_column('LA_NAME',col('GEO')['features'][0]['properties']['LA_NAME'].astype(StringType()))
data = data.with_column('GEOGRAPHY',to_geography(col('GEO')['features'][0]['geometry'])).drop('GEO')






data = polygon(data)

center = data.select(call_function('ST_UNION_AGG',col('POLYGON')).alias('CENTROID'))
center = center.select(call_function('ST_CENTROID',col('CENTROID')).alias('CENTROID'))
center = center.with_column('LAT',call_function('ST_Y',col('CENTROID')).astype(FloatType()))
center = center.with_column('LON',call_function('ST_X',col('CENTROID')).astype(FloatType()))

LAT = center.select('LAT').to_pandas().LAT.iloc[0]
LON = center.select('LON').to_pandas().LON.iloc[0]

#st.write(LAT)

#st.write(LON)

datapd = data.to_pandas()
datapd["POLYGON"] = datapd["POLYGON"].apply(lambda row: json.loads(row)["coordinates"])
#st.write(datapd.head())

# Create data layer for each polygon
data_layer = pdk.Layer(
    "PolygonLayer",
    datapd,
    opacity=0.3,
    get_polygon="POLYGON", 
    filled=True,
    get_fill_color=[41, 181, 232],
    get_line_color=[0, 0, 0],
    auto_highlight=True,
    pickable=True,
)

# Set the view on the map
view_state = pdk.ViewState(
    longitude=LON,
    latitude=LAT,
    zoom=13,  # Adjust zoom if needed
    pitch=0,
)



# Render the map with layer and tooltip
r = pdk.Deck(
    layers=[data_layer],
    initial_view_state=view_state,
    map_style=None,
    tooltip=tooltip)
    
st.pydeck_chart(r, use_container_width=True)

### Load all polygons in a persisted table 

The next function calls the **FLOOD_POLYGON**  over and over for each flood area.  The flood area API is called once to get all the notation codes - this is to get the polygon for every area.  This might take some time to run as its calling the api several times.  It may take 10 minutes to load.  Changing the warehouse will not improve spead in this function as it's the api calls which take the majority of time.

In [None]:
CREATE OR REPLACE TABLE DEFAULT_SCHEMA.FLOOD_AREAS_WITH_POLYGONS AS

with areas as (
SELECT

VALUE,
VALUE:description::TEXT "Description",
VALUE:eaAreaName::TEXT "eaAreaName",
VALUE:fwdCode::Text "fwdCode",
VALUE:label::Text "label",
VALUE:lat::FLOAT LAT,
VALUE:long::FLOAT LON,
VALUE:notation::Text "notation", 
VALUE:polygon::Text POLYGON,
VALUE:riverOrSea::Text "River or Sea"
FROM (select DEFAULT_SCHEMA.flood_areas(5000) V), LATERAL FLATTEN (V:items)
)

select * exclude POLYGON

,TO_GEOGRAPHY(DEFAULT_SCHEMA.FLOOD_POLYGON("notation"):features[0]:geometry,1) GEOM
from areas

;


### Add Search Optimisation on GEO

We want the capability to use fast spatial joins on this dataset.  This is why **Search Optimisation** is switched on the **GEO** column

In [None]:
ALTER TABLE DEFAULT_SCHEMA.FLOOD_AREAS_WITH_POLYGONS ADD SEARCH OPTIMIZATION ON GEO(GEOM);

### Visualising All flood Areas with a filter
Here, you can now visualise the flood areas for each flood area.  **NB**. The code uses **ST_ISVALID** to filter out any areas that do not have valid polygons.

In [None]:
import pydeck as pdk
import json

data = session.table('DEFAULT_SCHEMA.FLOOD_AREAS_WITH_POLYGONS').drop('VALUE')
data = data.filter(call_function('ST_ISVALID',col('GEOM'))==1)
LABELS = data.select(col('"label"')).distinct().to_pandas()
selected_label = st.selectbox('Select Area:',LABELS)
data = data.filter(col('"label"')==selected_label)
data = data.with_column('AREA',col('"eaAreaName"'))
data = data.with_column('DESCRIP',col('"Description"'))
data = data.with_column('RIVER_SEA',col('"River or Sea"'))
data = data.with_column('fwdCode',col('"fwdCode"'))
data = data.with_column('NOTATION',col('"notation"'))

data = data.with_column('LAT',col('LAT'))
data = data.with_column('LON',col('LON'))
data = data.with_column('GEOGRAPHY',col('GEOM'))



data = polygon(data)

center = data.select('LAT','LON')

LAT = center.select('LAT').to_pandas().LAT.iloc[0]
LON = center.select('LON').to_pandas().LON.iloc[0]

st.write(LAT)

st.write(LON)

datapd = data.to_pandas()
datapd["POLYGON"] = datapd["POLYGON"].apply(lambda row: json.loads(row)["coordinates"])
st.write(datapd.head())

# Create data layer for each polygon
data_layer = pdk.Layer(
    "PolygonLayer",
    datapd,
    opacity=0.3,
    get_polygon="POLYGON", 
    filled=True,
    get_fill_color=[41, 181, 232],
    get_line_color=[0, 0, 0],
    auto_highlight=True,
    pickable=False,
)

# Set the view on the map
view_state = pdk.ViewState(
    longitude=LON,
    latitude=LAT,
    zoom=13,  # Adjust zoom if needed
    pitch=0,
)



# Render the map with layer and tooltip
r = pdk.Deck(
    layers=[data_layer],
    initial_view_state=view_state,
    map_style=None
    )
    
st.pydeck_chart(r, use_container_width=True)

Lets now join the flood areas with the previously created **Buildings** dataset.  Effectively what we will do is filter the buildings so they will only appear if they are inside a flood area.

In [None]:
CREATE OR REPLACE TABLE DEFAULT_SCHEMA.BUILDINGS_IN_FLOOD_AREAS AS 

select A.*, B."label",B."River or Sea",B."Description",B.LAT,B.LON from DATAOPS_EVENT_PROD.DEFAULT_SCHEMA.BUILDINGS_WITH_ROOF_SPECS A

INNER JOIN 

DEFAULT_SCHEMA.FLOOD_AREAS_WITH_POLYGONS B ON


ST_INTERSECTS(A.GEOGRAPHY,B.GEOM)

 Let's now visualise the buildings in the **Flood Area**

In [None]:

tooltip = {
   "html": """ 
   <br> <b>Theme:</b> {THEME} 
   <br> <b>Description:</b> {DESCRIPTION}
   <br> <b>Roof Material:</b> {ROOFMATERIAL_PRIMARYMATERIAL}
   <br> <b>Solar Panel Presence:</b> {ROOFMATERIAL_SOLARPANELPRESENCE}
   <br> <b>Roof Shape:</b> {ROOFSHAPEASPECT_SHAPE}
   <br> <b>Geometry Area M2:</b> {GEOMETRY_AREA_M2}
   
   """,
   "style": {
       "width":"50%",
        "backgroundColor": "steelblue",
        "color": "white",
       "text-wrap": "balance"
   }
}


# Populate dataframe from query

data = session.table('DEFAULT_SCHEMA.BUILDINGS_IN_FLOOD_AREAS')

LABELS = data.select(col('"label"')).distinct().to_pandas()
selected_label = st.selectbox('Select Area:',LABELS)

data = data.filter(col('"label"')==selected_label)
#st.write(data.limit(1))
LAT = data.select('LAT').to_pandas().LAT.iloc[0]
LON = data.select('LON').to_pandas().LON.iloc[0]


datapd = data.select('GEOGRAPHY',
                     'THEME',
                     'DESCRIPTION',
                     'ROOFMATERIAL_PRIMARYMATERIAL',
                     'ROOFMATERIAL_SOLARPANELPRESENCE',
                    'ROOFSHAPEASPECT_SHAPE',
                    'GEOMETRY_AREA_M2').to_pandas()
st.write(datapd.head(2))
datapd["GEOGRAPHY"] = datapd["GEOGRAPHY"].apply(lambda row: json.loads(row)["coordinates"])

st.write('Buildings in a town')

# Create data layer - this where the geometry is likely failing - column is now called geometry to match geopandas default
data_layer = pdk.Layer(
    "PolygonLayer",
    datapd,
    opacity=0.8,
    get_polygon="GEOGRAPHY", 
    filled=True,
    get_fill_color=[41, 181, 232],
    get_line_color=[0, 0, 0],
    get_line_width=0.1,
    auto_highlight=True,
    pickable=True,
)

# Set the view on the map
view_state = pdk.ViewState(
    longitude=LON,
    latitude=LAT,
    zoom=15,  # Adjust zoom if needed
    pitch=0,
)



# Render the map with layer and tooltip
r = pdk.Deck(
    layers=[data_layer],
    initial_view_state=view_state,
    map_style=None,
    tooltip=tooltip)
    
st.pydeck_chart(r, use_container_width=True)

### Only retrieve Flood Areas with Flood Warnings

Here we will call the flood warnings api again, but the results of this will filter the recently loaded flood area with polygons table.  This table also omits polygons which are **not valid**.

In [None]:
CREATE OR REPLACE TABLE DEFAULT_SCHEMA.LATEST_FLOOD_WARNINGS AS 

SELECT 
TO_GEOGRAPHY(A.GEOM,1) GEOGRAPHY,
ST_ISVALID(GEOGRAPHY) VALID,
A."River or Sea",
A.LAT,
A.LON,
B.* FROM DATAOPS_EVENT_PROD.DEFAULT_SCHEMA.FLOOD_AREAS_WITH_POLYGONS A 

INNER JOIN 

(
select 
VALUE:"@id"::TEXT ID,
VALUE:description::TEXT DESCRIPTION,
VALUE:eaAreaName::TEXT AREA_NAME,
VALUE:eaRegionName::TEXT REGION_NAME,
VALUE:floodArea:"@id"::TEXT "Flood Area: ID",
VALUE:floodArea:"county"::TEXT "Flood Area: county",
VALUE:floodArea:"notation"::TEXT "notation",
VALUE:floodArea:"polygon"::TEXT "Flood Area: polygon",
VALUE:floodArea:"riverOrSea"::TEXT "Flood Area: River or Sea",
VALUE:floodAreaID ::TEXT "Flood Area ID",
VALUE:isTidal "is Tidal",
VALUE:message::TEXT "Message",
VALUE:severity::TEXT "severity",
VALUE:severityLevel "Severity Level",
VALUE:timeMessageChanged::DATETIME "Time Message Changed",
VALUE:timeRaised::DATETIME "Time Raised",
VALUE:timeSeverityChanged::DATETIME "Time Severity Changed"
from 

(select DEFAULT_SCHEMA.floods() V), LATERAL FLATTEN (V:items)) B

ON

A."notation" = B."notation"
;

select * from default_schema.LATEST_FLOOD_WARNINGS;

### Viewing Active Flood Alerts
Below is an example of Pydeck visualising all current flood alerts

In [None]:

tooltip = {
   "html": """ 
   <br> <b>River or Sea: </b> {River or Sea} 
   <br> <b>Description: </b> {DESCRIPTION}
   <br> <b>Area Name: </b> {is Tidal}
   <br> <b>Message: </b> {Message}
   <br> <b>Severity Level:</b> {Severity Level}
   <br> <b>Time Raised:</b> {Time Raised}
   
   """,
   "style": {
       "width":"50%",
        "backgroundColor": "steelblue",
        "color": "white",
       "text-wrap": "balance"
   }
}



import pydeck as pdk
import json


data = session.table('DEFAULT_SCHEMA.LATEST_FLOOD_WARNINGS')
data = data.filter(col('VALID')==1)
data = data.with_column('GEOGRAPHY',to_geography('GEOGRAPHY'))


data = polygon(data)



center = data.select('LAT','LON')


LAT = center.select('LAT').to_pandas().LAT.iloc[0]
LON = center.select('LON').to_pandas().LON.iloc[0]



datapd = data.to_pandas()
datapd["POLYGON"] = datapd["POLYGON"].apply(lambda row: json.loads(row)["coordinates"])




# Create data layer for each polygon
data_layer = pdk.Layer(
    "PolygonLayer",
    datapd,
    opacity=0.3,
    get_polygon="POLYGON", 
    filled=True,
    get_fill_color=[255,159,54],
    get_line_color=[0, 0, 0],
    auto_highlight=True,
    pickable=True,
)

# Set the view on the map
view_state = pdk.ViewState(
    longitude=LON,
    latitude=LAT,
    zoom=13,  # Adjust zoom if needed
    pitch=0,
)



# Render the map with layer and tooltip
r = pdk.Deck(
    layers=[data_layer],
    initial_view_state=view_state,
    map_style=None,
    tooltip=tooltip)
    
st.pydeck_chart(r, use_container_width=True)