
#### Description

This notebook is part of a proof of concept developed to test the capabilities in the QESD platform to process large geospatial data and replicate workflows from ArcGIS Pro. In particular, this notebook shows how to perform spatial joins and counts using two geospatial datasets.

#### Datasets and methods

The datasets were manually ingested as there is not a current integration between SIR or Open Data and the QESD platform.
The protected areas of Queensland geospatial dataset was downloaded from [SIR](https://spatial.information.qld.gov.au/arcgis/rest/services/SIR-QGov/ProtectedAreasTerrestrial/FeatureServer/5) in geojson format and then read with geopandas locally to preserve the geometry. It was then saved as parquet format and uploaded to Blob Storage. Alternatively, the json file can be read directly with geopandas if the file is stored in Databricks Workspace, for example: 
example = gpd.read_file('/Workspace/Users/maria.riveraaraya@des.qld.gov.au/spatial autocorrelation/pa_all.json'). This dataset contains 1926 polygons (~ 7 MB).

The second file is the WildNet wildlife records (until 2023-09-21) sourced from [Open data](https://qldspatial.information.qld.gov.au/catalogue/custom/detail.page?fid={40D75ED6-3959-41EB-A5C8-E563FA5B66CA}) in gpkg format. The file was read with geopandas locally to preserve the geometry. The geometry column was transformed to string to make it compatible with the expected datatypes in parquet. It was then saved as parquet format and uploaded to blob Storage in the platform. This dataset contains ~2.2 million records (1.4 GB).

#### Findings

It was possible to replicate the process of joining both datasets and producing total wildlife counts per protected area and per polygon using the Python module geopandas. Further granularity (filter, select) can be achieved based on future requirements. In summary, spatial joins, total wildlife counts in 1926 polygons and 1047 protected areas (from ~1770 to 2023-09-21) including those with multipolygons were performed. 


#### Prerequisites
- Familiarity with spatial data structures and objects
- Familiarity with Databricks
- Understanding of Azure Data Lake Storage (ADLS)
- Familiarity with Python
- Basic understanding of Spark


#### Setup 

1. Make sure you have access to a Databricks environment in the QESD platform.
2. Navigate to sbox-databricks-prd
3. Navigate to Workspace and open the folder platform_repo/geospatial. This folder contains several examples demonstrating the use of Azure Databricks for different geospatial analysis and operations in vector data, including building choropleth maps, spatial joins, counts and calculation of polygon areas in small and large datasets.
4. Start a cluster. We recommend 'singlenode' as we do not need distributed computing and our datasets are small. 



In [None]:
# Install modules

%pip install geopandas==0.14.0 # Pandas equivalent to process geospatial data
%pip install folium==0.14.0 # To produce interactive maps

In [None]:
# These libraries are pre-installed in a Databricks 12.2 LTS runtime cluster (except geopandas and folium)

import geopandas as gpd # Pandas equivalent to process geospatial data
from shapely.geometry import Polygon, Point # To create geometries 
import folium # To produce interactive maps
from json import loads  # To load json files
from shapely import wkt # To convert wkt geometries
import pandas as pd # To process data    
import os # To interact with the operating system
from folium import plugins # To produce interactive maps

In [None]:
# Import datasets from Azure Data Lake Storage (ADLS)

# Get the environment name from the system environment variables


env = os.getenv('tfenvironmentnameshort')

# Load the 'pa_all_srt.parquet' dataset from ADLS into a Spark DataFrame
# The file path is constructed using the environment name

protected_areas = (
  spark.read
    .format("parquet")
    .load(f'abfss://lake-userupload@sboxlake{env}.dfs.core.windows.net/geospatial_test/pa_all_srt.parquet')
)

# Load the 'wildnet_pp_str.parquet' dataset from ADLS into a Spark DataFrame
# The file path is constructed using the environment name
# The 'header' option is set to 'true' which means the first row of the dataset is considered as the header

wild = (
  spark.read
    .option("header", "true")
    .format("parquet")
    .load(f'abfss://lake-userupload@sboxlake{env}.dfs.core.windows.net/geospatial_test/wildnet_pp_str.parquet')
)


In [None]:
# Convert Spark DataFrames to pandas DataFrames 
# This is necessary to convert the geometries from wkt to shapely geometries

pa_pd = protected_areas.toPandas()
wild_pd = wild.toPandas()

# Convert pandas DataFrames to GeoPandas DataFrames
# This is necessary to perform spatial operations

pa_gdf = gpd.GeoDataFrame(pa_pd)
wild_gpd = gpd.GeoDataFrame(wild_pd)

In [None]:
# Define geometries

# The 'geometry' column in the original data is a string
# We need to convert this string representation of geometry into a Shapely geometry object
# The 'from_wkt' function from GeoPandas is used for this conversion

pa_gdf['geometry2'] = gpd.GeoSeries.from_wkt(pa_gdf['geometry'])

# Now, we initialize a new GeoDataFrame using the updated 'geometry2' column as the geometry
# Note: 'geometry' is the default geometry column name in GeoPandas
pa_gdf = gpd.GeoDataFrame(pa_gdf).set_geometry("geometry2")

# replace string geometry representations with shapely geometries
wild_gpd['geometry2'] = gpd.GeoSeries.from_wkt(wild_gpd['geometry'])

# We repeat the same steps for the 'wild' dataset

wild_gpd= gpd.GeoDataFrame(wild_gpd).set_geometry("geometry2")

In [None]:
# Perform spatial join on right to retain all records from observation points
# The 'contains' operation is used to find all protected areas that contain the observation points

joined_inner = gpd.sjoin(pa_gdf, wild_gpd, how='right', op='contains')

In [None]:
# Count by protected area and polygon
# The 'groupby' function is used to group the data by protected area and polygon

point_count = joined_inner.groupby(['geometry_left', 'estatename']).size().reset_index(name='total number of sightings')


In [None]:
# Rejoin with protected areas
# The 'merge' function is used to join the 'point_count' and 'pa_gdf' GeoDataFrames
merged_counts_pa = point_count.merge(pa_gdf, left_on = ['geometry_left'], right_on= ['geometry'], how = 'outer')

# Convert to dataframe
# The 'set_geometry' function is used to set the 'geometry2' column as the geometry column
merged_gdf = gpd.GeoDataFrame(merged_counts_pa).set_geometry('geometry2')

In [None]:
# Calculate total number of sightings per protected area
# The 'groupby' function is used to group the data by protected area
# The 'aggregate' function is used to calculate the sum of the 'total number of sightings' column
# The 'rename' function is used to rename the 'total number of sightings' column to 'aggregated sightings'

aggregation_functions = {'total number of sightings': 'sum'}
merged_agg = merged_gdf.groupby(merged_gdf['estatename_y']).aggregate(aggregation_functions)
merged_agg = merged_agg.rename(columns={"total number of sightings": "aggregated sightings"})

In [None]:
# Set correct indexes to join aggregations with previous dataframe
# The 'set_index' function is used to set the 'estatename_y' column as the index
# The 'join' function is used to join the 'merged_gdf_index' and 'merged_agg' GeoDataFrames
# The 'reset_index' function is used to reset the index of the GeoDataFrame

merged_gdf_index = merged_gdf.set_index('estatename_y')
merged = merged_gdf_index.join(merged_agg).reset_index()


In [None]:
# Merge resulting dataframe with raw protected areas to include polygons with zero sightings
# The 'merge' function is used to join the 'merged' and 'pa_gdf' GeoDataFrames
# The 'left_on' and 'right_on' parameters are used to specify the join columns

final_merged = merged.merge(pa_gdf, left_on= 'geometry2', right_on= 'geometry2', how = 'outer')

# Include polygons with zero sighthings. Set zero in the polygon count as they show as NAs and replace NA's in aggregated sightings in those polygons with the value already calculated.
# The 'fillna' function is used to replace all NaN values with 0
# The 'groupby' function is used to group the data by protected area
# The 'apply' function is used to apply a lambda function to the 'aggregated sightings' column
# The lambda function is used to replace all NaN values with the maximum value in the 'aggregated sightings' column

final_merged['total number of sightings'] = final_merged['total number of sightings'].fillna(0)
final_merged['aggregated sightings'] = final_merged.groupby('estatename')['aggregated sightings'].apply(lambda x: x.fillna(x.max()))
final_merged['aggregated sightings'] = final_merged['aggregated sightings'].fillna(0)
final_merged = final_merged.drop(['estatename_x', 'geometry_x', 'geometry_y', 'geometry_left', 'estatename_y'], axis=1)

In [None]:
# Test with three protected areas to compare with ArcGIS Pro
# The 'loc' function is used to select the rows where the 'estatename' column is equal to 'Great Sandy National Park', 'Minerva Hills National Park', or 'Tinana Creek Conservation Park'

filtered_df = final_merged.loc[(final_merged['estatename'] == 'Great Sandy National Park') | (final_merged['estatename'] == 'Minerva Hills National Park') | (final_merged['estatename'] == 'Tinana Creek Conservation Park')]

filtered_df

In [None]:
# Use folium to create a map. Only three protected areas are included for demonstration
# The 'Map' function is used to create a map
# The 'location' parameter is used to specify the center of the map
# The 'zoom_start' parameter is used to specify the zoom level of the map

m = folium.Map(location = [-24.0807, 148.0641], zoom_start = 6)


# Use subset of main dataframe to build map

final_merged_gdf = gpd.GeoDataFrame(filtered_df.set_geometry('geometry2')) 
final_merged_gdf = final_merged_gdf.set_crs('EPSG:4283')


# Convert to geojson so plugin from folium can be used
# The 'to_json' function is used to convert the GeoDataFrame to a GeoJSON object

points_gjson= folium.features.GeoJson(final_merged_gdf, name="wildlife counts")
points_gjson.add_to(m)


In [None]:
# Use folium plugin to create tooltip
# The 'plugins.MarkerCluster' function is used to create a marker cluster
# The 'add_to' function is used to add the marker cluster to the map
folium.features.GeoJson(final_merged_gdf,  
                        name='Labels',
                        style_function=lambda x: {'color':'transparent','fillColor':'transparent','weight':0},
                        tooltip=folium.features.GeoJsonTooltip(fields=['aggregated sightings', 'estatename', 'total number of sightings'], 
                                                                aliases = ['Total number of sightings in protected area', 'Protected area', 'Number of sightings in polygon'],
                                                                labels=True,
                                                                sticky=False
                                                                            )
                       ).add_to(m)

m

In [None]:
# Save map to workspace - replace with your own path

#m.save(outfile='/Workspace/Users/maria.riveraaraya@des.qld.gov.au/spatial autocorrelation/sandy_minerva_tinana.html')

In [None]:
# Check number of polygons is correct
# The 'count' function is used to count the number of rows in the GeoDataFrame

# To store in parquet - geometry column needs to be changed

merged_pq = final_merged.copy()

merged_pq['geometry2'] = merged_pq['geometry2'].astype(str)

merged_pq = merged_pq.rename(columns = {'total number of sightings': 'number of sightings in polygon', 'aggregated sightings':'total number of sightings in protected area', 'geometry2': 'geometry'})

merged_pq.count()


In [None]:
# Browse total number of sightings per protected area
# The 'drop_duplicates' function is used to drop duplicate rows based on the 'estatename' and 'total number of sightings in protected area' columns
# The 'createDataFrame' function is used to create a Spark DataFrame from a pandas DataFrame


merged_pq_sum = merged_pq[['estatename', 'total number of sightings in protected area']].drop_duplicates()
display(spark.createDataFrame(merged_pq_sum))

In [None]:
# To store csv
  
 #spark.createDataFrame(test).repartition(1).write.mode("overwrite").option("header", "true").csv(f'abfss://lake-userupload@sboxlake{env}.dfs.core.windows.net/geospatial/sandy_minerva_test.csv')

In [None]:
# Write to blob 
# The 'write' function is used to write the Spark DataFrame to the specified file path

spark.createDataFrame(merged_pq).write.mode("overwrite").parquet(f'abfss://lake-userupload@sboxlake{env}.dfs.core.windows.net/geospatial_test/pr_areas_counts.parquet')