# Raster acquisition, processing and analysis with Databricks

Continuing from the previous notebook, this notebook will demonstrate how to:
- Read the freshly downloaded imagery into a Spark Dataframe with Mosaic, reproject each raster and collate them into multiband files;
- Join this imagery dataset to the areas of interest and use the AoI geometries to clip the rasters, retaining only the pixels that fall inside each AoI

## Install the libraries and prepare the environment

For this demo we will require a few spatial libraries that can be easily installed via pip install. We will be using gdal, rasterio, pystac and databricks-mosaic for data download and data manipulation. We will use planetary computer as the source of the raster data for the analysis.

In [0]:
import os

notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
project_path = os.path.dirname(notebook_path)
os.environ["PROJECTCWD"] = project_path

%pip install /Workspace$PROJECTCWD/databricks_mosaic-0.4.3-py3-none-any.whl
%pip install --quiet rasterio==1.3.5 gdal==3.4.1 pystac pystac_client planetary_computer tenacity rich osdatahub

In [0]:
%reload_ext autoreload
%autoreload 2

In [0]:
import library
import mosaic as mos
import os
from tqdm import tqdm

from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import Window

data_product = "OpenGreenspace"

current_user = spark.sql("select current_user() as user").first()["user"]
data_root = f"/tmp/{current_user}/{data_product}/data"
output_path = data_root.replace("/data", "/outputs")

dbutils.fs.mkdirs(data_root)
dbutils.fs.mkdirs(output_path)

os.environ["DATADIR"] = f"/dbfs{data_root}"
os.environ["OUTDIR"] = f"/dbfs{output_path}"

CATALOG = "your_catalog"
SCHEMA = "your_schema"

spark.sql(f"CREATE CATALOG IF NOT EXISTS {CATALOG}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {CATALOG}.{SCHEMA}")

In [0]:
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "false")
mos.enable_mosaic(spark, dbutils)
mos.enable_gdal(spark, with_checkpoint_path=f"/dbfs{output_path}/checkpoint/{datetime.now().isoformat()}")

## Catalogue the imagery with Mosaic

Since we have the direct paths where the data has been downloaded, we can use Mosaic's `rst_fromfile()` method to create pointers within a Spark dataframe to the source files.

If this wasn't the case (say, for example, we just had one big folder full of imagery), we could instead use the `GDAL` Spark Data Source in Mosaic[↗︎](https://databrickslabs.github.io/mosaic/api/raster-format-readers.html#spark-read-format-gdal) to achieve the same outcome.

In [0]:
imagery_root = f"/tmp/{current_user}/{data_product}/imagery"
dbutils.fs.mkdirs(imagery_root)

imagery_table_ref = f"{CATALOG}.{SCHEMA}.imagery"

In [0]:
spark.table(imagery_table_ref).display()

Here we check if any files are corrupt and/or can't be opened by GDAL, and remove them for simplicity.

In [0]:
import os
from osgeo import gdal

directory = f"/dbfs/tmp/{current_user}/{data_product}/imagery"
files = os.listdir(directory)

for filename in tqdm(files):
    filepath = os.path.join(directory, filename)
    try:
        dataset = gdal.Open(filepath)
        if dataset:
            driver = dataset.GetDriver().ShortName
        else:
            print(f"File: {filename}, could not be opened by GDAL.")
            dbutils.fs.rm(f"dbfs:/tmp/{current_user}/{data_product}/imagery" + filename)
    except Exception as e:
        print(f"Error opening file {filename}: {e}")

In [0]:
df_files = (spark.createDataFrame(dbutils.fs.ls(f"dbfs:/tmp/{current_user}/{data_product}/imagery"))
            .withColumn("downloaded_path", F.expr("replace(path, 'dbfs:', '/dbfs')"))
            .withColumn("present", F.lit(1))
            .select("downloaded_path", "present")
)
df_t = (spark.table(imagery_table_ref).alias("r").join(df_files.alias("listing"), on="downloaded_path", how="left")
        .filter("present IS NOT NULL")
)

In [0]:
rasters_raw = (
  df_t
  .repartition(sc.defaultParallelism * 10)
  .withColumn("tile", mos.rst_fromfile(F.col("downloaded_path")))
  )


rasters_raw.display()

## Reproject the rasters into WGS84
In preparation for joining this dataset with the Areas of Interest data, we need to reproject it into a common CRS.

In [0]:
transformed_raster_table_ref = f"{CATALOG}.{SCHEMA}.transformed"

(
  rasters_raw
  .withColumn("original_projection", mos.rst_srid("tile"))
  .withColumn("tile", mos.rst_transform("tile", F.lit(4326)))
  .write
  .mode("overwrite")
  .saveAsTable(transformed_raster_table_ref)
)

raster_4326 = spark.table(transformed_raster_table_ref)
raster_4326.display()

In [0]:
raster_4326.count()

We can use the `last_error` metadata field to check if any of the processing failed.

In [0]:
raster_4326.where("tile.metadata['last_error'] <> ''").count()

Let's also check that we have at least one image for every part of the country.

In [0]:
coverage = (
  raster_4326
  .select(mos.rst_boundingbox("tile").alias("bbox"))
  .distinct()
  .groupBy()
  .agg(mos.st_union_agg("bbox").alias("wkb"))
)

Here's an example of the map the following command should produce.
<img src='./assets/coverage-map.png'/>

In [0]:
%%mosaic_kepler
coverage wkb geometry

## Assemble the single-band rasters into multiband rasters

We'll use Mosaic's `rst_frombands()` method to collect our 12 individual raster bands into a single raster. The default behaviour is to upsample the bands to the resultion of the highest.

In order to do this, we'll need to reshape our dataframe: representing each sweep's bands as columns single dataframe row.

In [0]:
raster_4326.display()

In [0]:
bands = [
    "B01",
    "B02",
    "B03",
    "B04",
    "B05",
    "B06",
    "B07",
    "B08",
    "B8A",
    "B09",
    "B11",
    "B12",
]

raster_multiband_table_ref = f"{CATALOG}.{SCHEMA}.multiband"

filter_expr = " OR ".join([f"{b} IS NOT NULL" for b in bands])

(
  raster_4326
    .select("item_id", "datetime", "name", "tile")
    .groupBy("item_id", "datetime")
    .pivot("name", bands)
    .agg(F.first("tile"))
    .filter(filter_expr)
    .withColumn("tile", F.expr("try_sql(rst_frombands(ARRAY(B01, B02, B03, B04, B05, B06, B07, B08, B8A, B09, B11, B12)))"))
    .drop(*bands)
    .write
    .mode("overwrite")
    .saveAsTable(raster_multiband_table_ref)
)

raster_multiband = spark.table(raster_multiband_table_ref)

In [0]:
raster_multiband.display()

## Join our vector and raster datasets and clip the rasters

In order to compute statistics for each area of interest, we need to create subsets of the raster pixels that correspond to each AoI's area.

We can achieve this by
  - joining the multiband raster data with the ID sets computed earlier;
  - exploding these sets and looking up the corresponding geometries (so we have an image and geometry per ID / granule combination); then
  - clipping the image by the geometry using the `rst_clip()` method.

While we're doing this, we'll use standard Spark SQL functions to filter our imagery to only include the latest image available for each AoI.

In [0]:
aoi_type = "Golf Course"
aoi_table_ref = f"{CATALOG}.{SCHEMA}.aois"

aois = spark.table(aoi_table_ref)
aois.count()

In [0]:
latest_raster_by_aoi_table_ref = f"{CATALOG}.{SCHEMA}.latest_aoi_raster"

windowSpec = Window.partitionBy("id").orderBy(F.desc("datetime"))

(
  raster_multiband
  .join(
    other=(
      raster_4326
      .select("item_id", "ids")
      .distinct()
    ),
    how="inner", 
    on="item_id"
  )
  .withColumn("id", F.explode("ids"))
  .withColumn("rank", F.row_number().over(windowSpec))
  .filter("rank = 1")
  .drop("rank")
  .select("item_id", "datetime", "id", "tile")
  .write
  .mode("overwrite")
  .saveAsTable(latest_raster_by_aoi_table_ref)
)

raster_latest = spark.table(latest_raster_by_aoi_table_ref)
raster_latest.display()

In [0]:
raster_latest.count()

In [0]:
raster_latest.display()

In [0]:
clipped_image_table_ref = f"{CATALOG}.{SCHEMA}.clipped_rasters"

(
  raster_latest
  .join(
    other=aois,
    how="inner",
    on="id"
  )
  .withColumn("geometry_4326", mos.st_aswkt("geometry_4326"))
  .withColumn("tile", mos.rst_clip("tile.result", "geometry_4326"))
  # .withColumn("geometry_4326", mos.st_aswkt("geometry_4326"))
  .write
  .mode("overwrite")
  .saveAsTable(clipped_image_table_ref)
)

clipped_images = spark.table(clipped_image_table_ref)
clipped_images.display()

In [0]:
clipped_images.count()