Portions of the code in this work were adapted from the Google Cloud Platform Python documentation samples, available at: https://github.com/GoogleCloudPlatform/python-docs-samples, and are licensed under the Apache License, Version 2.0.





In [None]:
import ee
import folium
ee.Authenticate()
ee.Initialize(project='geowildfire')

In [None]:
import geemap.core as geemap


start = '2020-01-01'
end = '2020-12-31'
naip = ee.ImageCollection('USDA/NAIP/DOQQ').filterDate(start, end)

naip_crs = naip.first().projection().crs()
naip_crs_transform = naip.first().projection().getInfo()['transform']
naip = naip.median().reproject(crs=naip_crs, crsTransform=naip_crs_transform)

#california geometry
gaul = ee.FeatureCollection("FAO/GAUL/2015/level1");
california = gaul.filter(ee.Filter.eq('ADM1_NAME', 'California'));
aoi = california.geometry()

#add ndvi values
ndvi = naip.normalizedDifference(['N', 'R']).rename('NDVI')
naip = naip.addBands(ndvi)
naip = naip.addBands(topo)
naip = naip.addBands(landsat_temp)
print(naip.getInfo())

In [None]:
whp = ee.Image("projects/geowildfire/assets/whp2023").select(0)
whpPalette = [
  '00a300', #very low
  '2cff28', #low
  'ffe100', #moderate
  'fe6800', #high
  'fe0017', #very high
  '7a8f93', #nonburnable
  '1017f4' #water


]
params = {
    'min': 1,
    'max': 7,
    'palette': whpPalette
}
gmap = geemap.Map(center=[37.5010, -122.1899], zoom=10)
points = whp.stratifiedSample(
      numPoints=1000,
      region=naip.geometry(),
      scale=270,
      geometries=True,
  )
print(naip.geometry())
# Add the image layer to the map and display it.
gmap.add_layer(whp, params, 'whp')
display(gmap)


In [None]:
from typing import Dict, Iterable, List, Tuple
import numpy as np
from google.api_core import exceptions, retry
import requests
import io
def sample_random_points(region: ee.Geometry, points_per_class: int) -> Iterable[List]:
  """Get a generator of random points in the region."""
  points = whp.stratifiedSample(
      points_per_class,
      region=aoi,
      scale=270,
      geometries=True,
  )
  gmap.addLayer(points)
  display(gmap)
  for point in points.toList(points.size()).getInfo():
      yield point['geometry']['coordinates']
for point in sample_random_points(aoi, points_per_class=200):
    print(point)



In [None]:
@retry.Retry()
def get_patch(image: ee.Image, region: ee.Geometry, bands: List[str], patch_size: int) -> np.ndarray:

  url = image.getDownloadURL({
      'region': region,
      'dimensions': [patch_size, patch_size],
      'format': "NPY",
      'bands': bands,
  })

  return np.load(io.BytesIO(response.content), allow_pickle=True)

In [None]:
from numpy.lib.recfunctions import structured_to_unstructured


def get_input_patch(image: ee.Image, region: ee.Geometry, bands: List[str], patch_size: int) -> np.ndarray:
    patch = get_patch(naip, region, bands, patch_size)
    return np.float32(structured_to_unstructured(patch))

def get_label_patch(image: ee.Image, region: ee.Geometry, bands: List[str], patch_size: int) -> np.ndarray:
    patch = get_patch(whp, region, bands, patch_size)
    return structured_to_unstructured(patch)


In [None]:
def get_training_example(
    coords: List[float], patch_size: int = 450
) -> tuple[np.ndarray, np.ndarray]:
    point = ee.Geometry.Point(coords)
    region = point.buffer(135, 1).bounds(1)
    return (
        get_input_patch(naip, region, ['NDVI'], patch_size),
        get_label_patch(whp, region, ['b1'], patch_size/450),
    )

#-118.76193375968745, 37.867127218336286
point = (-118.31690054334553, 36.897865747337185)  # (lon, lat)
(inputs, labels) = get_training_example(point)
print(f"inputs : {inputs.dtype} {inputs.shape}")
print(f"labels : {labels.dtype} {labels.shape}")
label_value = labels.item()
print(f"Label value: {label_value}")
print(np.mean(inputs))

In [None]:
import tensorflow as tf


def serialize(inputs: np.ndarray, labels: np.ndarray) -> bytes:
    features = {
        name: tf.train.Feature(
            bytes_list=tf.train.BytesList(value=[tf.io.serialize_tensor(data).numpy()])
        )
        for name, data in {"inputs": inputs, "labels": labels}.items()
    }
    example = tf.train.Example(features=tf.train.Features(feature=features))
    return example.SerializeToString()


serialized = serialize(inputs, labels)
print(f"serialized: {len(serialized)} bytes")

In [None]:
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions



with beam.Pipeline() as pipeline:
    (
pipeline
  | "Create region" >> beam.Create([aoi])
  | "Sample points" >> beam.FlatMap(sample_random_points, points_per_class=50)
  | "Get examples" >> beam.Map(get_training_example)
  | "Serialize" >> beam.MapTuple(serialize)
  | "Write TFRecords" >> beam.io.WriteToTFRecord("part", file_name_suffix=".tfrecord.gz")
    )

In [None]:
path = "path/to/part-00000-of-00001.tfrecord.gz"

In [None]:
import tensorflow as tf

filenames = tf.data.Dataset.list_files(path)
dataset = tf.data.TFRecordDataset(filenames, compression_type="GZIP")

for x in dataset.take(1):
    print(f"{type(x.numpy()).__name__} {len(x.numpy())}")

NUM_INPUTS = 1
NUM_CLASSES = 7


def read_example(serialized: bytes) -> tuple[tf.Tensor, tf.Tensor]:
    features_dict = {
        "inputs": tf.io.FixedLenFeature([], tf.string),
        "labels": tf.io.FixedLenFeature([], tf.string),
    }
    example = tf.io.parse_single_example(serialized, features_dict)
    inputs = tf.io.parse_tensor(example["inputs"], tf.float32)
    labels = tf.io.parse_tensor(example["labels"], tf.uint8)

    inputs.set_shape([450, 450, 1])
    labels.set_shape([1, 1, 1])
    labels -= 1
    one_hot_labels = tf.one_hot(labels[:, :, 0], NUM_CLASSES)
    return (inputs, tf.squeeze(one_hot_labels))


filenames = tf.data.Dataset.list_files(path)
dataset = tf.data.TFRecordDataset(filenames, compression_type="GZIP").map(read_example)
for inputs, labels in dataset.take(1):
    print(f"inputs : {inputs.dtype.name} {inputs.shape}")
    print(f"labels : {labels.dtype.name} {labels.shape}")

for inputs, labels in dataset:
    print(labels)


In [None]:
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Conv2D, MaxPool2D, Flatten, Dense, Dropout, BatchNormalization, Activation
from tensorflow.keras.optimizers import Adam, SGD
import keras

input_shape = (270, 270, 1)

model = Sequential()
model.add(Conv2D(filters=32, kernel_size=3, activation='relu', input_shape=[450, 450, 1]))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=2, strides=2))

model.add(Conv2D(filters=16, kernel_size=3, activation='relu'))
model.add(BatchNormalization())
model.add(MaxPool2D(pool_size=2, strides=2))
model.add(Dropout(0.5))

model.add(Flatten())

model.add(Dense(16, activation='relu'))
model.add(BatchNormalization())

model.add(Dense(7, activation='softmax'))


model.compile(loss='categorical_crossentropy',
              optimizer=Adam(learning_rate=1e-4),
              metrics=['accuracy'])

model.summary()



callbacks = [
    keras.callbacks.ModelCheckpoint(
        filepath="path/to/models",
        save_best_only=True,
        monitor="val_loss",
        verbose=1,
    )
]

model.summary()
history = model.fit(train_dataset, validation_data = test_dataset, epochs = 1000, callbacks=callbacks)


In [None]:
import matplotlib.pyplot as plt

plt.plot(history.history['accuracy'])
plt.plot(history.history['val_accuracy'])
plt.title('Model Accuracy')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

plt.plot(history.history['loss'])
plt.plot(history.history['val_loss'])
plt.title('Model Loss')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(['Train', 'Validation'], loc='upper left')
plt.show()

In [None]:
best_model = tf.keras.models.load_model("path/to/mymodel_825")
best_model.summary()

In [None]:
test_files = tf.data.Dataset.list_files(path)
test = tf.data.TFRecordDataset(test_files, compression_type="GZIP").map(read_example)

for inputs, labels in test:

    print(labels)

In [None]:
import tensorflow as tf
from sklearn.metrics import confusion_matrix
import numpy as np

y_true = []

for inputs, labels in test:
    y_true.append(np.argmax(labels, axis=-1))


y_pred = (np.argmax(best_model.predict(test.batch(1)), axis=-1))


conf_matrix = confusion_matrix(y_true, y_pred)

print(conf_matrix)


In [None]:
import seaborn as sns
plt.figure(figsize=(10, 8))
sns.heatmap(conf_matrix, annot=True, fmt="d", cmap="Reds", cbar=True,
            xticklabels=['Very Low', 'Low', 'Moderate', 'High', 'Very High', 'Non-burnable', 'Water'],
            yticklabels=['Very Low', 'Low', 'Moderate', 'High', 'Very High', 'Non-burnable', 'Water'], annot_kws={"size": 20})
plt.title("Confusion Matrix", fontsize=20)
plt.xlabel("Predicted Class", fontsize=18)
plt.ylabel("True Class", fontsize=18)
plt.show()

In [None]:
from sklearn.metrics import classification_report
import pandas as pd

report_dict = classification_report(y_true, y_pred, target_names=['Very Low', 'Low', 'Moderate', 'High', 'Very High', 'Non-burnable', 'Water'], output_dict=True)
report_df = pd.DataFrame(report_dict).transpose()

plt.figure(figsize=(8, 6))
sns.heatmap(report_df.iloc[:-3, :-1], annot=True, fmt=".2f", cmap="Reds", annot_kws={"size": 20}, vmin=0, vmax=1)
plt.title("Classification Report")
plt.title("Classification Report", fontsize=20)
plt.xticks(fontsize=18)
plt.yticks(fontsize=18)

plt.show()