<a href="https://colab.research.google.com/github/nachatjatu/pm25_northern_thailand/blob/main/GEE_data_processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
%pip install geetools

Collecting geetools
  Downloading geetools-1.13.0-py3-none-any.whl.metadata (6.5 kB)
Collecting anyascii (from geetools)
  Downloading anyascii-0.3.2-py3-none-any.whl.metadata (1.5 kB)
Collecting ee-extra (from geetools)
  Downloading ee_extra-0.0.15.tar.gz (224 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m224.7/224.7 kB[0m [31m4.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting xee>=0.0.11 (from geetools)
  Downloading xee-0.0.20-py3-none-any.whl.metadata (6.2 kB)
Collecting yamlable (from geetools)
  Downloading yamlable-1.1.1-py2.py3-none-any.whl.metadata (3.1 kB)
Collecting affine (from xee>=0.0.11->geetools)
  Downloading affine-2.4.0-py3-none-any.whl.metadata (4.0 kB)
Collecting lz4>=4.3.2 (from dask[complete]; extra == "parallel"->xarray[parallel]->xee>=0.0.11->geetools)
  Downloading lz4-4.4.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.8 kB)
Downloading geetools-1.13.0-py3-n

In [None]:
"""
--------------------------------------------------------------------------------
Set up GEE API
--------------------------------------------------------------------------------
"""
import ee
import geemap
import geetools
import time
from pprint import pprint

In [None]:
"""
--------------------------------------------------------------------------------
Authenticate and initialize project
--------------------------------------------------------------------------------
"""
ee.Authenticate()
ee.Initialize(project='ee-thailand-pm')

In [None]:
NULL_IMG = ee.Image.constant(0).updateMask(0)


def create_date_list(start_date, end_date):
    """
    creates a List of sequential Dates and returns the result

    Args:
    start_date   (ee.Date):  start date to filter by
    end_date     (ee.Date):  end date to filter by

    Returns:
    ee.List[ee.Date]: the list of sequential Dates
    """
    days_seq = ee.List.sequence(0, end_date.difference(start_date, 'day'))
    return days_seq.map(lambda day: start_date.advance(day, 'day'))


def truncate_log_precip(image):
    precip = image.select('total_precipitation_sum')
    trunc_precip = precip.max(0).add(1e-4).log()
    return image.addBands(
        trunc_precip.rename('log_precip'), overwrite=True
    )


def get_weather_img(weather_ic, date):
    """
    preprocesses ERA5-Land 9km weather data and returns image

    Args:
    weather_ic  (ee.ImageCollection): ERA5 climate ImageCollection
    date        (ee.Date):            data retrieval date

    Returns:
    ee.Image: multi-band image where each band is weather variable
    """
    input_bands = [
    'u_component_of_wind_10m', 'v_component_of_wind_10m',
    'dewpoint_temperature_2m', 'temperature_2m',
    'surface_pressure', 'total_precipitation_sum'
    ]

    filtered = (
        ee.ImageCollection(weather_ic)
        .filterBounds(REGION)
        .filterDate(date, date.advance(1, 'day'))
        .first()
    )

    valid_image = ee.Algorithms.If(
        filtered,
        truncate_log_precip(filtered.select(input_bands)),
        NULL_IMG.rename('null_weather')
    )

    return valid_image


def get_fire_img(fire_ic, date):
    filtered = (
        ee.ImageCollection(fire_ic)
        .filterBounds(REGION)
        .filterDate(date, date.advance(1, 'day'))
        .first()
    )

    return ee.Algorithms.If(
        filtered,
        (filtered.updateMask(
            filtered.select('FireMask').eq(9)
            ).select('MaxFRP')
            .unmask(0)
            .add(1)
            .log()
            .rename('log_max_frp')
        ),
        NULL_IMG.rename('null_fire')
    )


def create_delta_pm25(image):
    pm25_tdy = image.select('pm25_tdy')
    pm25_tmr = image.select('pm25_tmr')
    delta_pm25 = pm25_tmr.subtract(pm25_tdy)

    return image.addBands(delta_pm25.rename('delta_pm25'))

def create_log_pm25(image):
    pm25_tdy = image.select('pm25_tdy')
    log_pm25_tdy = pm25_tdy.add(1).log().toFloat()
    return image.addBands(log_pm25_tdy.rename('log_pm25_tdy'))

In [None]:
def process_data_date(pm25_ic, weather_ic, fire_ic, terrain_img, date):
    pm25_tdy_filtered = (
        pm25_ic
        .filterBounds(REGION)
        .filterDate(date, date.advance(1, 'day'))
        .first()
    )

    pm25_tdy = ee.Algorithms.If(
        pm25_tdy_filtered,
        pm25_tdy_filtered.rename('pm25_tdy'),
        NULL_IMG.rename('null_pm25_tdy')
    )

    pm25_tmr_filtered = (
        pm25_ic
        .filterBounds(REGION)
        .filterDate(date.advance(1, 'day'), date.advance(2, 'day'))
        .first()
    )

    pm25_tmr = ee.Algorithms.If(
        pm25_tmr_filtered,
        pm25_tmr_filtered.rename('pm25_tmr'),
        NULL_IMG.rename('null_pm25_tmr')
    )

    weather = ee.Image(get_weather_img(weather_ic, date)).resample('bicubic')

    fire = get_fire_img(fire_ic, date)

    return (
        ee.Image.cat(pm25_tdy, weather, fire, terrain_img, pm25_tmr)
                .set('system:time_start', date.millis())
                .set('date', date.format('YYYY-MM-dd'))
                .toFloat()
    )

In [None]:
# temporal variables
START_DATE, END_DATE = ee.Date('2022-12-16'), ee.Date('2023-01-01') # inclusive
CHUNK_SIZE = 1  # week

# spatial variables
CRS = 'EPSG:32647'
RES = 1000                        # resolution (meters)

WIDTH = 497                     # width of image (pixels)
WIDTH *= RES                      # convert to meters (RES / pixel)
HEIGHT = 497                      # height of image (pixels)
HEIGHT *= RES                     # convert to meters (RES / pixel)

CM_X = 580000
CM_Y = 2100000

REGION = ee.Geometry.Rectangle(
    coords = [
        CM_X - WIDTH / 2,
        CM_Y - HEIGHT / 2,
        CM_X + WIDTH / 2,
        CM_Y + HEIGHT / 2
    ],
    proj = CRS,
    evenOdd = False
)

# initialize map
Map = geemap.Map()
Map.centerObject(REGION, zoom = 6)

# load datasets
pm25_ic = ee.ImageCollection(
    'projects/sat-io/open-datasets/GHAP/GHAP_D1K_PM25'
)

weather_ic = ee.ImageCollection(
    'ECMWF/ERA5_LAND/DAILY_AGGR'
)

fire_ic = ee.ImageCollection(
    'NASA/VIIRS/002/VNP14A1'
)

terrain_img = ee.Image('CGIAR/SRTM90_V4')


# prepare image

# image = process_data_date(
#     pm25_ic=pm25_ic,
#     weather_ic=weather_ic,
#     fire_ic=fire_ic,
#     terrain_img=terrain_img,
#     date=START_DATE
# )


# pm25_tdy = image.select('pm25_tdy')

# pm25_tmr = image.select('pm25_tmr')

# image = create_delta_pm25(image)
# image = create_log_pm25(image)


# expected_bands = ['pm25_tdy', 'u_component_of_wind_10m',
#                     'v_component_of_wind_10m', 'dewpoint_temperature_2m',
#                     'temperature_2m', 'surface_pressure',
#                     'total_precipitation_sum', 'log_precip', 'log_max_frp',
#                     'elevation', 'pm25_tmr', 'log_pm25_tdy']

# wanted_bands = ['pm25_tdy', 'log_pm25_tdy', 'u_component_of_wind_10m',
#                 'v_component_of_wind_10m', 'dewpoint_temperature_2m',
#                 'temperature_2m', 'surface_pressure',
#                 'log_precip', 'log_max_frp', 'elevation', 'delta_pm25']

# rename_bands = ['pm25_tdy', 'log_pm25_tdy', 'u_wind_10m', 'v_wind_10m', 'dew_temp_2m',
#                 'temp_2m', 'surface_pressure', 'log_precip',
#                 'log_max_frp', 'elevation', 'delta_pm25']

# image = image.select(wanted_bands)

# pm_vis = {
#       'min': 0,
#       'max': 200,
#       'palette': ['#313695', '#4575b4', '#74add1', '#abd9e9',
#                   '#e0f3f8', '#ffffbf', '#fee090', '#fdae61',
#                   '#f46d43','#d73027','#a50026']
#   }

# log_vis = {
#       'min': 0,
#       'max': 2,
#       'palette': ['#313695', '#4575b4', '#74add1', '#abd9e9',
#                   '#e0f3f8', '#ffffbf', '#fee090', '#fdae61',
#                   '#f46d43','#d73027','#a50026']
#   }

# Map.addLayer(image.select('pm25_tdy').clip(REGION), pm_vis, 'pm25')

# Map.addLayer(image.select('log_pm25_tdy').clip(REGION), log_vis, 'log_pm25')
num_images = 0
current_start = START_DATE
while True:
    current_end = current_start.advance(CHUNK_SIZE, 'week')
    if current_end.millis().getInfo() > END_DATE.millis().getInfo():
        current_end = END_DATE

    if current_start.millis().getInfo() > END_DATE.millis().getInfo():
        break

    print(
        current_start.format('YYYY-MM-dd').getInfo(),
        current_end.format('YYYY-MM-dd').getInfo(),
        num_images
    )


    # create images for each date in date range
    dates = create_date_list(current_start, current_end)
    images = ee.ImageCollection(
        dates.map(
            lambda date: process_data_date(
                pm25_ic = pm25_ic,
                weather_ic = weather_ic,
                fire_ic = fire_ic,
                terrain_img = terrain_img,
                date = ee.Date(date)
            )
        )
    )

    # filter out images with incomplete bands and mask incomplete pixels
    expected_bands = ['pm25_tdy', 'u_component_of_wind_10m',
                    'v_component_of_wind_10m', 'dewpoint_temperature_2m',
                    'temperature_2m', 'surface_pressure',
                    'total_precipitation_sum', 'log_precip', 'log_max_frp',
                    'elevation', 'pm25_tmr']

    wanted_bands = ['log_pm25_tdy', 'u_component_of_wind_10m',
                'v_component_of_wind_10m', 'dewpoint_temperature_2m',
                'temperature_2m', 'surface_pressure',
                'log_precip', 'log_max_frp', 'elevation', 'delta_pm25']

    rename_bands = ['log_pm25_tdy', 'u_wind_10m', 'v_wind_10m', 'dew_temp_2m',
                    'temp_2m', 'surface_pressure', 'log_precip',
                    'log_max_frp', 'elevation', 'delta_pm25']

    images_cleaned = (images
                    .filter(ee.Filter.eq('system:band_names', expected_bands))
                    .map(lambda image: image.updateMask(
                        image.mask().reduce(ee.Reducer.min())
                        )
                    )
                    .map(lambda image: create_delta_pm25(image))
                    .map(lambda image: create_log_pm25(image))
                    .select(wanted_bands)
                    .map(lambda image: image.rename(rename_bands))
    )


    num_images += images_cleaned.size().getInfo()

    tasks = ee.batch.Export.geetools.imagecollection.toDrive(
        imagecollection = images_cleaned,
        index_property = 'date',
        description = 'dataset_6',
        scale = RES,
        crs = CRS,
        region = REGION,
        folder = 'dataset_6',
    )

    for task in tasks:
        time.sleep(0.01)
        task.start()

    current_start = current_end.advance(1, 'day')


Map

2022-12-16 2022-12-23 0
2022-12-24 2022-12-31 8
2023-01-01 2023-01-01 15


Map(center=[18.990540620155873, 99.75964904417972], controls=(WidgetControl(options=['position', 'transparent_…

In [None]:
# for task in ee.batch.Task.list():
#     task.cancel()

ee.batch.Task.list()

[<Task X4KYBU3FYQ5J3QHU7Y65VHLV EXPORT_IMAGE: dataset_6_2019-05-15 (READY)>,
 <Task SMQHBTQ2LX4TGHMOVRYW2BI3 EXPORT_IMAGE: dataset_6_2019-05-14 (READY)>,
 <Task PKMBYH7DP6RPX4IHJFONNEM2 EXPORT_IMAGE: dataset_6_2019-05-13 (READY)>,
 <Task QG4WKSP37DGDMV7YWW4DLFYQ EXPORT_IMAGE: dataset_6_2019-05-12 (READY)>,
 <Task S4NORBH5M74MTQP3LLDZGLLJ EXPORT_IMAGE: dataset_6_2019-05-11 (READY)>,
 <Task 6TYWJW4KFFDGGNFUUEWBVCU5 EXPORT_IMAGE: dataset_6_2019-05-10 (READY)>,
 <Task NVKANYKWETEAPOHTRJ7AV6FU EXPORT_IMAGE: dataset_6_2019-05-09 (READY)>,
 <Task PLRO74CSCBMDR45EN5JABKEO EXPORT_IMAGE: dataset_6_2019-05-08 (READY)>,
 <Task Y5J4HGMFS6ODDRLZ5WQTYAPZ EXPORT_IMAGE: dataset_6_2019-05-07 (READY)>,
 <Task OMAFOSJRT4PPJXGVFJCLFHLC EXPORT_IMAGE: dataset_6_2019-05-06 (READY)>,
 <Task 32POUCS6C4MYH2YWNVMGMNVH EXPORT_IMAGE: dataset_6_2019-05-05 (READY)>,
 <Task 6ZPFWJDRNQOBGJHMGWRBVR7V EXPORT_IMAGE: dataset_6_2019-05-04 (READY)>,
 <Task EC6VT43V54U5JMK77MOBQ72V EXPORT_IMAGE: dataset_6_2019-05-03 (READY)>,

In [None]:
status = ee.data.getTaskStatus(['2LEKTRI5L42KRSKVONXEI2A2'])
print(status)

[{'state': 'FAILED', 'description': 'dataset_6_2018-12-20', 'priority': 100, 'creation_timestamp_ms': 1741243869643, 'update_timestamp_ms': 1741243957610, 'start_timestamp_ms': 1741243957145, 'task_type': 'EXPORT_IMAGE', 'attempt': 1, 'error_message': 'Exported bands must have compatible data types; found inconsistent types: Float64 and Float32.', 'id': '2LEKTRI5L42KRSKVONXEI2A2', 'name': 'projects/ee-thailand-pm/operations/2LEKTRI5L42KRSKVONXEI2A2'}]


In [None]:
NULL_IMG = ee.Image.constant(0).updateMask(0)

"""
--------------------------------------------------------------------------------
Helper functions for data preprocessing
--------------------------------------------------------------------------------
"""
def create_date_list(start_date, end_date):
  """
  creates a List of sequential Dates and returns the result

  Args:
    start_date   (ee.Date):  start date to filter by
    end_date     (ee.Date):  end date to filter by

  Returns:
    ee.List[ee.Date]: the list of sequential Dates
  """
  days_seq = ee.List.sequence(0, end_date.difference(start_date, 'day'))
  return days_seq.map(lambda day: start_date.advance(day, 'day'))

"""
--------------------------------------------------------------------------------
Helper functions for weather data (02/01/1950 - present)
--------------------------------------------------------------------------------
"""
def truncate_precip(image):
  total_precip = image.select('total_precipitation_sum')
  corrected_precip = total_precip.max(0)
  return image.addBands(
      corrected_precip.rename('corrected_precipitation'), overwrite=True)

def get_weather_img(weather_ic, date):
  """
  preprocesses ERA5-Land 9km weather data and returns image

  Args:
    weather_ic  (ee.ImageCollection): ERA5 climate ImageCollection
    date        (ee.Date):            data retrieval date

  Returns:
    ee.Image: multi-band image where each band is weather variable
  """
  input_bands = [
    'u_component_of_wind_10m', 'v_component_of_wind_10m', 'dewpoint_temperature_2m',
    'temperature_2m', 'surface_pressure', 'total_precipitation_sum'
  ]
  filtered = (ee.ImageCollection(weather_ic)
          .filterBounds(REGION)
          .filterDate(date, date.advance(1, 'day')))
  return ee.Algorithms.If(
      filtered.size().gt(0),
      truncate_precip(filtered.first().select(input_bands)),
      NULL_IMG.rename('null_weather')
  )


"""
--------------------------------------------------------------------------------
Helper functions for land cover data (01/01/2017 - 12/31/2023)
--------------------------------------------------------------------------------
"""
def class_proportion(lc_img, class_value):
  """
  given a land class, compute its proportion of the area of a RES m pixel

  Args:
    lc_img       (ee.Image): ESRI land cover image
    class_value  (Int): land class number
  """
  class_value = ee.Number(class_value)
  class_name = ee.String('land_class_').cat(class_value.int8())
  mask = lc_img.eq(class_value)
  prop = mask.reduceNeighborhood(
      reducer = ee.Reducer.mean(),
      kernel = ee.Kernel.square(RES / 2, 'meters')
  ).rename(class_name)
  return prop


def get_lc_img(lc_ic, date):
  """
  preprocesses ESRI 10m land cover data and returns image

  Args:
    lc_ic     (ee.ImageCollection): ESRI land cover ImageCollection
    date      (ee.Date):            data retrieval date

  Returns:
    ee.Image: multi-band image where each band is prop. of land cover by class
  """
  year = date.get('year')
  lc_img = (ee.ImageCollection(lc_ic)
            .filterBounds(REGION)
            .filter(ee.Filter.calendarRange(year, year, 'year'))
            .mosaic()
  )
  classes = ee.List([1, 2, 4, 5, 7, 8, 9, 10, 11])
  class_proportions = classes.map(lambda c: class_proportion(lc_img, c))
  combined_image = ee.ImageCollection(class_proportions).toBands()
  return remove_prefixes(combined_image)


"""
--------------------------------------------------------------------------------
Helper functions for fire data (20/01/2012 - present)
--------------------------------------------------------------------------------
"""

def get_fire_img(fire_ic, date):
  filtered = (ee.ImageCollection(fire_ic)
          .filterBounds(REGION)
          .filterDate(date, date.advance(1, 'day')))
  return ee.Algorithms.If(
      filtered.size().gt(0),
      filtered.first().updateMask(
          filtered.first().select('FireMask').gte(7)
      ).select('MaxFRP').unmask(0),
      NULL_IMG.rename('null_fire')
  )



In [None]:
"""
--------------------------------------------------------------------------------
Helper functions for visualization
--------------------------------------------------------------------------------
"""
palette = [
          '000080', '0000d9', '4000ff', '8000ff', '0080ff', '00ffff',
          '00ff80', '80ff00', 'daff00', 'ffff00', 'fff500', 'ffda00',
          'ffb000', 'ffa400', 'ff4f00', 'ff2500', 'ff0a00', 'ff00ff',
      ]

def vis_fire_raw(image):
  fire_raw_vis = {
      'bands': ['MaxFRP'],
      'min': 0,
      'palette': ['black', 'yellow', 'orange', 'red']
  }
  Map.addLayer(image, fire_raw_vis, 'fire_raw')

def vis_land_cover(image):
  land_cover_vis = {
      'min': 0,
      'max': 1
  }
  bands = ['land_class_1', 'land_class_2', 'land_class_3',
           'land_class_4', 'land_class_5', 'land_class_6', 'land_class_9']
  for band in bands:
    Map.addLayer(image.select(band), land_cover_vis, band)

def vis_aerosol(image):
  aai_vis = {
      'bands': ['absorbing_aerosol_index'],
      'min': 0,
      'max': 1,
      'palette': palette
  }
  Map.addLayer(image, aai_vis, 'aai')

def vis_weather(image):
  temp_vis = {
      'bands': ['temperature_2m'],
      'min': 300,
      'max': 305,
      'palette': palette
  }
  Map.addLayer(image, temp_vis, 'temp')

  dewpoint_vis = {
      'bands': ['dewpoint_temperature_2m'],
      'min': 284,
      'max': 290,
      'palette': palette
  }
  Map.addLayer(image, dewpoint_vis, 'dewpoint')

  pressure_vis = {
      'bands': ['surface_pressure'],
      'min': 90000,
      'max': 96000,
      'palette': palette
  }
  Map.addLayer(image, pressure_vis, 'surf_pressure')

def vis_pm25(image):
  pm_vis = {
      'min': 0,
      'max': 100,
      'palette': ['#313695', '#4575b4', '#74add1', '#abd9e9',
                  '#e0f3f8', '#ffffbf', '#fee090', '#fdae61',
                  '#f46d43','#d73027','#a50026']
  }
  delta_vis = {
      'min': 0,
      'max': 30,
      'palette': ['#313695', '#4575b4', '#74add1', '#abd9e9',
                  '#e0f3f8', '#ffffbf', '#fee090', '#fdae61',
                  '#f46d43','#d73027','#a50026']
  }
  Map.addLayer(image.select('pm25_tdy'), pm_vis, 'pm25_tdy')
#   Map.addLayer(image.select('delta_pm25'), delta_vis, 'delta_pm25')

def vis_all(image):
  """
  visualize all datasets on a single image
  """
  image_clipped = image.clip(REGION)
  # vis_land_cover(image_clipped)
  vis_aerosol(image_clipped)
  vis_weather(image_clipped)
  vis_fire_raw(image_clipped)
  vis_pm25(image_clipped)

  Map.addLayer(REGION, {}, 'region')

[<Task NVW4QMVNXPCVHP7EN3EGG6EO EXPORT_IMAGE: dataset_4_2019-06-30 (CANCELLED)>,
 <Task YL2VCD43MYCV3WWCPZ3FZHOM EXPORT_IMAGE: dataset_4_2019-06-29 (CANCELLED)>,
 <Task EPDY3OW4QONILIPMLBJNM5EP EXPORT_IMAGE: dataset_4_2019-06-28 (CANCELLED)>,
 <Task 5JZODCZ4ZIGNQHZJAL7VFA3Z EXPORT_IMAGE: dataset_4_2019-06-27 (CANCELLED)>,
 <Task JCWCZ2NNL5FJ2SN2CUJ4RNRU EXPORT_IMAGE: dataset_4_2019-06-26 (CANCELLED)>,
 <Task MGOH3EBI7VL6N42S3J6LGHKW EXPORT_IMAGE: dataset_4_2019-06-25 (CANCELLED)>,
 <Task OX7YS44OJ64YASVCSNTFRVDN EXPORT_IMAGE: dataset_4_2019-06-24 (CANCELLED)>,
 <Task BY4GLJJ6NVSEBQNPECULVNPR EXPORT_IMAGE: dataset_4_2019-06-23 (CANCELLED)>,
 <Task RZOREWCMDSNOS2DN7UQ5K6ZC EXPORT_IMAGE: dataset_4_2019-06-22 (CANCELLED)>,
 <Task SZGGF3T7OFYEA5S7KNABEKD6 EXPORT_IMAGE: dataset_4_2019-06-21 (CANCELLED)>,
 <Task F5VEIFVRS3SDOHURX3XW4MYY EXPORT_IMAGE: dataset_4_2019-06-20 (CANCELLED)>,
 <Task F3RCJJXZAUP3FZ6V5PFQM4VO EXPORT_IMAGE: dataset_4_2019-06-19 (CANCELLED)>,
 <Task V6ICLBQV2NQ3FREENIVUQ