In [1]:
import os
import sys
from pathlib import Path
import logging
import time
import shapely
import pandas as pd
import geopandas as gpd
import seaborn
import dask
import numpy as np
import dask.dataframe as dd
from dask.distributed import Client

sys.path.insert(0, '..')
import src.hotspot_utils as util 
import src.process_nearest_hotspots as nearest_process

In [2]:
logging.basicConfig(
    format='%(asctime)s [%(levelname)s] %(name)s - %(message)s',
    level=logging.DEBUG,
    datefmt='%Y-%m-%d %H:%M:%S',
    stream=sys.stdout,
)
_LOG = logging.getLogger(__name__)

In [3]:
# include n_workers equal or less than the number of core
client = Client(n_workers=8)
client


+---------+--------+-----------+---------+
| Package | client | scheduler | workers |
+---------+--------+-----------+---------+
| tornado | 6.1    | 6.1       | 6.0.4   |
+---------+--------+-----------+---------+


0,1
Client  Scheduler: tcp://127.0.0.1:39413  Dashboard: http://127.0.0.1:8787/status,Cluster  Workers: 8  Cores: 16  Memory: 66.57 GB


In [4]:
# This is the output directory outputs and itermediary files from this notebook examples will be stored.
outdir = Path("/home/jovyan/s3vt_dask/s3vtdata/workdir")

# Processing Parameter used in Sub-setting Spatial Extent and Temporal Range for Area of Interest
##### The FRP data from nasa, esa, eumetsat and landgate are merged, sub-setted and neareast hotspots csv files are generated based on the parameters in `processing_parameters`  
##### The parameter `chunks` in blocking FRP data to enable multi-processing. If you encounter memory issues then higher the number.
##### The `start_time` and `end_time` can be used to subset for solar_day (3:00-22:00), solar_night (22:00-03:00) and solar_all(0:00-24:00) hours.

In [5]:
processing_parameters = {
    "nasa_frp": "s3://s3vtaustralia/nasa_hotspots_gdf.geojson",
    "esa_frp": "s3://s3vtaustralia/s3vt_hotspots.geojson",
    "eumetsat_frp": "s3://s3vtaustralia/s3vt_eumetsat_hotspots.geojson",
    "landgate_frp": "s3://s3vtaustralia/landgate_hotspots_gdf.geojson",
    "dea_frp": None,
    "lon_west": 147.0,
    "lat_south": -38.0,
    "lon_east": 154.0,
    "lat_north": -27.,
    "start_date": "2019-11-01",
    "end_date": "2020-10-08",
    "start_time": "03:00",
    "end_time": "22:00",
    "chunks": 500,
    "outdir": outdir,
    "compare_field": "solar_day",
    "swath_config_file": Path("/home/jovyan/s3vtconfig.yaml"),
}

In [None]:
# This is to generate nearest .csv files. If .csv files already exists then skip this process. Takes around ~5-6 hours in this sandbox environment with 2-core and 16 GB RAM
nearest_hotspots_product_files = nearest_process.process_nearest_points(**processing_parameters)

2021-08-16 00:50:53,558: INFO: Processing FRP Hotspots Datasets
2021-08-16 00:50:53,572: INFO: Found credentials in environment variables.
2021-08-16 00:50:53,708: INFO: Fetching FRP datasets...
2021-08-16 00:50:53,710: INFO: s3://s3vtaustralia/nasa_hotspots_gdf.geojson exists: skipped download
2021-08-16 00:50:53,710: INFO: s3://s3vtaustralia/s3vt_hotspots.geojson exists: skipped download
2021-08-16 00:50:53,711: INFO: s3://s3vtaustralia/s3vt_eumetsat_hotspots.geojson exists: skipped download
2021-08-16 00:50:53,712: INFO: s3://s3vtaustralia/landgate_hotspots_gdf.geojson exists: skipped download
2021-08-16 00:50:53,712: INFO: dea Hotspots FRP  is None. excluding from analysis.
2021-08-16 00:50:53,713: INFO: Reading...
2021-08-16 00:50:53,713: INFO: reading and subsetting GeoDataFrame for nasa: /home/jovyan/s3vt_dask/s3vtdata/workdir/nasa_hotspots_gdf.geojson
2021-08-16 00:52:35,885: INFO: reading and subsetting GeoDataFrame for esa: /home/jovyan/s3vt_dask/s3vtdata/workdir/s3vt_hotspot

## Nearest Hotspots DataFrame merged from neareast hotspots csv files

In [None]:
# csv directory is where nearest hotspots csv files are stored. 
csv_directory = outdir
print(csv_directory)
# This is read all the .csv files if name starts with `nearest_points` and ends with `compare_field` value from processing parameters.
nearest_hotspots_csv_files = [
    fp for fp in csv_directory.iterdir()
    if (fp.name.startswith("nearest_points"))
    and (fp.name.endswith("csv"))
]

In [None]:
# nearest points csv files that will be used analysis from here on.
nearest_hotspots_csv_files

In [None]:
# returns a dask DataFrame with index set at column `compare_field` from processing_parameters.
nearest_points_ddf = util.csv_to_dataframe(nearest_hotspots_csv_files, processing_parameters["compare_field"])

In [None]:
nearest_points_ddf.head()

# Results
## Co-occurrence metrics

In [None]:
region_alias = "nsw"
output_directory = processing_parameters["outdir"]
comparison_prefix = (
    f"{processing_parameters['start_date'].replace('-', '')}"
    f"_{processing_parameters['end_date'].replace('-', '')}"
    f"_{processing_parameters['start_time'].replace(':','')}"
    f"_{processing_parameters['end_time'].replace(':','')}"
    f"_{region_alias}"
)

In [None]:
# set the nearest distance threshold between two hotspots to confine the analysis within the distance threshold.  
dist_threshold = 5000  # units in meters

In [None]:
nearest_ddf_dist_subset = client.persist(nearest_points_ddf[nearest_points_ddf["dist_m"] < dist_threshold])

In [None]:
# Count of hotspot matches < dist_threshold
numerator = util.dask_pivot_table(
    nearest_ddf_dist_subset,
    index="2_satellite_sensor_product",
    column="satellite_sensor_product",
    values="count",
    aggfunc="count"
).compute()

In [None]:
numerator.astype(int).to_csv(output_directory.joinpath(f"{comparison_prefix}_matches_{dist_threshold}.csv"))
numerator.astype(int)

In [None]:
# Count of hotspot matches - total  
denominator = util.dask_pivot_table(
        nearest_points_ddf,
        index="2_satellite_sensor_product",
        column="satellite_sensor_product",
        values="count",
        aggfunc="count",
    ).compute()

In [None]:
denominator.astype(int).to_csv(output_directory.joinpath(f"{comparison_prefix}_matches_count.csv"))
denominator.astype(int)

In [None]:
# Difference of matched points closer than 5000m
difference = denominator - numerator

In [None]:
difference.astype(int).to_csv(output_directory.joinpath(f"{comparison_prefix}_count_difference.csv"))
difference.astype(int)

In [None]:
# Percentage of matched points closer than dist_threshold
percentage = (numerator / denominator) * 100
percentage = np.round(percentage, 2)

In [None]:
percentage.to_csv(output_directory.joinpath(f"{comparison_prefix}_percentage.csv"))
percentage

In [None]:
# Maximum time between match points < dist_threshold
timemax = util.pandas_pivot_table(
    nearest_ddf_dist_subset.compute(),
    index=["satellite_sensor_product"],
    columns=["2_satellite_sensor_product"],
    values=["timedelta"],
    aggfunc={"timedelta": np.max}
    
)

In [None]:
timemax.to_csv(output_directory.joinpath(f"{comparison_prefix}_max_time_matched_points.csv"))
timemax

In [None]:
# Minimum time between match points < dist_threshold
timemin = util.pandas_pivot_table(
    nearest_ddf_dist_subset.compute(),
    index=["satellite_sensor_product"],
    columns=["2_satellite_sensor_product"],
    values=["timedelta"],
    aggfunc={"timedelta": np.min}
    
)

In [None]:
timemin.to_csv(output_directory.joinpath(f"{comparison_prefix}_min_time_matched_points.csv"))
timemin

In [None]:
# Average distance (m) between matched points < dist_threshold
averagedist = util.dask_pivot_table(
    nearest_ddf_dist_subset,
    index="2_satellite_sensor_product",
    column="satellite_sensor_product",
    values="dist_m",
    aggfunc="mean",
).compute()

In [None]:
averagedist = np.round(averagedist, 2)
averagedist.to_csv(output_directory.joinpath(f"{comparison_prefix}_average_distance_{dist_threshold}m.csv"))
averagedist

In [None]:
client.close() # close dask.distributed client