In [1]:
import ee
import geemap
import hvplot.pandas
import hvplot.xarray
import xarray as xr
import geopandas as gpd
import pandas as pd
import numpy as np
from pathlib import Path
import shapely

import constants as c

## 1. Setup GEE API and Map

In [10]:
ee.Authenticate()
ee.Initialize()

Enter verification code:  4/1AeaYSHCX_XHIhnom-OvF5P7WSMiFAxcuaG9oH4GJG7NMsnhecYDov7Uyk_A



Successfully saved authorization token.


## 2. Get MRC Metadata and plot stations

In [11]:
Map

Map(bottom=7725.0005960197595, center=[16.61512525065161, 103.31540155635611], controls=(WidgetControl(options…

In [3]:
import json
import urllib.request

def get_mrc_metadata(return_gdf=True, verbose=False):
    """ Get Metadata from Mekong River Comission Data Portal. """
    url = r'https://api.mrcmekong.org/api/v1/ts/inventory/timeSeriesList'
    urllib.request.urlretrieve(url, 'timeSeriesList.json')
    
    if verbose:
        print(f'Downloaded time-series metadata from {url} .')
    
    f = open('timeSeriesList.json')
    data = json.load(f)
    df_metadata = pd.DataFrame([])
    for dataset in data:
        df_temp = pd.DataFrame([dict(dataset)])
        df_temp['longitude'] = df_temp['longitude'].astype(float)
        df_temp['latitude'] = df_temp['latitude'].astype(float)
        df_metadata = pd.concat([df_temp, df_metadata])
        f.close()
    df_metadata = df_metadata.reset_index().drop(columns=['index'])
    
    if verbose:
        print(f'Found a total of {df_metadata.shape[0]} time-series datasets from {len(df_metadata.stationCode.unique())} stations of the MRC Data Portal.')
        
    if return_gdf:
        gdf_metadata = gpd.GeoDataFrame(
            df_metadata, geometry=gpd.points_from_xy(df_metadata.longitude, df_metadata.latitude), crs="EPSG:4326"
        )
        return(gdf_metadata)
    else:
        return(df_metadata)

Map = geemap.Map(center=(40, -100), zoom=4)
Map

gdf_metadata = get_mrc_metadata(return_gdf=True)
gdf_metadata_sedi = gdf_metadata.loc[gdf_metadata.parameter=='Sediment Concentration']

# Get DSMP station metadata
gdf_metadata_dmsp = gdf_metadata_sedi.loc[gdf_metadata_sedi.label.str.contains('DSMP')]
gdf_stations_dsmp = gdf_metadata_dmsp.groupby('locationIdentifier').first()[['river', 'stationShortName', 'geometry']].set_crs('EPSG:4326')

# Get Hydromet stations metadata
gdf_metadata_hydromet = gdf_metadata_sedi.loc[~gdf_metadata_sedi.label.str.contains('DSMP')]
gdf_stations_hydromet = gdf_metadata_hydromet.groupby('locationIdentifier').first()[['river', 'stationShortName', 'geometry']].set_crs('EPSG:4326')

# # Add stations
Map.add_gdf(gdf_stations_dsmp, 'MRC DSMP stations', style={'fillColor': 'blue'})
Map.add_gdf(gdf_stations_hydromet, 'MRC Hydromet stations', style={'fillColor': 'red'})

In [None]:
# # load 3S basin
# json_data = 'geometries/geoms.geojson'
# fc_geoms = geemap.geojson_to_ee(json_data)
# roi_geom = fc_geoms.first().geometry()

# # load dams
# df = pd.read_csv('geometries/3SReservoirs.csv')
# gdf_dams = gpd.GeoDataFrame(df, geometry=gpd.GeoSeries.from_xy(df['X'], df['Y']), crs=4326).drop(columns=['X', 'Y']).set_index('id')
# fc_dams = geemap.geopandas_to_ee(gdf_dams)
# Map.add_gdf(gdf_dams, 'Dams', {'color': 'blue'})

In [None]:
# # Load data from local .csv files (see 01_insitu_preparation.ipynb)
# paths_data_s = list(Path(f'../mrc_webscrapper/outputs/csv/Sediment Concentration/').glob(f'*.csv'))
# paths_data_q = list(Path(f'../mrc_webscrapper/outputs/csv/Discharge/').glob(f'*.csv'))
# paths_data = paths_data_q + paths_data_s
# df_data = pd.DataFrame([])
# for path in paths_data_s:
#     df_temp = pd.read_csv(path, dtype={'station_code':'str'})
#     df_temp['date_utc'] = pd.to_datetime(df_temp['date'])
#     df_temp['med_frq'] = np.median(np.diff(df_temp.date_utc))
#     df_data = pd.concat([df_data, df_temp])

# df_stations = df_data.groupby('station_code').first()
# gdf_stations = gpd.GeoDataFrame(df_stations,
#                  crs={'init': 'epsg:4326'},
#                  geometry=df_stations.apply(lambda row: shapely.geometry.Point((row.lon, row.lat)), axis=1)
#                 )

# # Create geolocated DSMP/Hydromet datasets
# df_data_dsmp = df_data.loc[df_data.identifier.str.contains('DSMP')]
# gdf_data_dsmp = gpd.GeoDataFrame(df_data_dsmp.join(gdf_stations.geometry, on='station_code'))
# df_data_hydromet = df_data.loc[~df_data.identifier.str.contains('DSMP')]
# gdf_data_hydromet = gpd.GeoDataFrame(df_data_hydromet.join(gdf_stations.geometry, on='station_code'))

## 3. Get data

### Setup functions

In [34]:
def process_station(ic_rs, max_diff=1):
    def wrap(fc_station):
        fc_matchups = get_matchups(fc_station, ic_rs, max_diff).map(get_sample)
        return fc_matchups
    return wrap

def run_station_task(ic_rs, sensor, max_diff=1):
    def wrap(fc_station):
        fc_matchups = get_matchups(fc_station, ic_rs, max_diff).map(get_sample)
        task = ee.batch.Export.table.toDrive(**{
            'collection': fc_matchups, 
            'description': f'TSS_export_{sensor}',
            'folder': rf'Earth Engine/TSM/{sensor}'})
        task.start()
        return task
    return wrap

def get_matchups(fc_station, ic_rs, max_diff=1):
    """ Matches FeatureCollection with closest match from ImageCollection. """
    geometry = ee.FeatureCollection(fc_station).geometry()
    ic_rs = ic_rs.filter(ee.Filter.bounds(geometry))
    max_diff_filter = ee.Filter.maxDifference(**{
      'difference': max_diff * 24 * 60 * 60 * 1000,
      'leftField': 'system:time_start',
      'rightField': 'system:time_start'
    })
    save_best_join = ee.Join.saveBest(**{
      'matchKey': 'bestImage',
      'measureKey': 'timeDiff'
    })
    fc_matchups = save_best_join.apply(fc_station, ic_rs, max_diff_filter);
    return fc_matchups

def get_sample(feature):
    """ Sample matched image at feature geometry and add aggregated value as property. """
    feature = ee.Feature(feature)
    match_img = ee.Image(feature.get('bestImage'))
    geometry = feature.geometry().buffer(200)
    value = feature.get('value')
    samples_agg = match_img.reduceRegion(reducer=ee.Reducer.median(), geometry=geometry)
    feature = feature \
        .set('values_eo', samples_agg) \
        .set('bestImage', match_img.get('system:index')) \
        .set('sensor', feature.getString('sensor'))
    return(feature)

### Define settings

In [146]:
# set timespan
start_date, end_date = '2000-01-01', '2024-12-31'

# Cloud masking (scene-based)
cld_filt_thresh = 70        # Maximum image cloud cover percent allowed in image collection

# water masking
mask_water = True

# Cloud masking (pixel-based, cloud score+ only)
qa_band = 'cs_cdf'
clear_thresh = 0.75

In [147]:
%load_ext autoreload
%autoreload 2
import functions_process as funcs_process
import functions_turbidity as funcs_turb
import functions_sampling as funcs_sampling
from tqdm.notebook import tqdm

# prepare insitu data
df = gpd.read_file('input/insitu_data.csv', ignore_geometry=True)
df['geometry'] = gpd.GeoSeries.from_wkt(df['geometry']).set_crs('4326')
gdf_data = gpd.GeoDataFrame(df)
gdf_data = gdf_data.loc[~(gdf_data.geometry.is_empty)]
gdf_data = gdf_data.loc[~(gdf_data.source=='WQMN')]

tasks = []
for identifier in tqdm(gdf_data.identifier.unique()):
    fn = identifier.split('@')[1].replace(' ', '').replace('[', '').replace(']', '').replace('(', '').replace(')', '')
    gdf_data_station = gdf_data.loc[gdf_data.identifier==identifier]
    gdf_data_station['date'] = gdf_data_station.date.apply(lambda x: str(x)[:19])
    fc_station = ee.FeatureCollection(geemap.gdf_to_ee(
        gdf_data_station, 
        date='date', date_format='YYYY-MM-dd HH:mm:ss'))
    bounds = fc_station.geometry()
    ic_oli = funcs_process.load_rrs_imcoll(sensor='oli', start_date=start_date, end_date=end_date, bounds=bounds, watermask='qa', harmonize_bnames=True)
    ic_etm = funcs_process.load_rrs_imcoll(sensor='etm', start_date=start_date, end_date=end_date, bounds=bounds, watermask='qa', harmonize_bnames=True)
    ic_msi = funcs_process.load_rrs_imcoll(sensor='msi', start_date=start_date, end_date=end_date, bounds=bounds, watermask='index', harmonize_bnames=True)
    fc_matchups_oli = get_matchups(fc_station, ic_oli, max_diff=3)
    fc_matchups_msi = get_matchups(fc_station, ic_msi, max_diff=3)
    fc_matchups_etm = get_matchups(fc_station, ic_etm, max_diff=3)
    fc_matchups = ee.FeatureCollection([fc_matchups_oli, fc_matchups_msi, fc_matchups_etm]).flatten().map(get_sample)
    task = ee.batch.Export.table.toDrive(**{
        'collection': fc_matchups, 
        'description': f'Rrs_{fn}',
        'folder': 'SOSW_SPM_Rrs_18042024'})
    task.start()
    tasks.append(task)

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


  0%|          | 0/75 [00:00<?, ?it/s]

In [148]:
import time
from datetime import datetime, timedelta

def check_tasks_status(tasks):
    """ Check the state of all provided ee.task objects and posts status updates. """
    colordict = {'white': '\033[0m', 'red': '\033[91m', 'orange': '\033[93m', 'green': '\033[92m'}
    states = []
    for task in tasks:
        # get state and times
        status = task.status()
        state = status['state']
        task_id = status['id']
        time_start, time_update = status['creation_timestamp_ms'], status['update_timestamp_ms']
        time_elapsed = timedelta(milliseconds=(time_update-time_start))
        time_now = datetime.now()
        # set output color
        if state == 'COMPLETED':
            color ='green'
        elif (state == 'RUNNING') | (state == 'READY'):
            color = 'orange'
        elif (state == 'FAILED') | (state == 'CANCEL_REQUESTED') | (state == 'CANCELLED'):
            color = 'red'
        else:
            color = 'white'
        # print msg
        status_msg = f"[{str(time_now)[:19]}] Task {task_id}" \
                     f"({status.get('description', 'No description')}): {colordict[color]+state+colordict['white']}"
                     #f" (runtime: {time_elapsed.seconds/60:0.1f}min)"
        print(status_msg)
        states.append(state)
    return states

# tasks = []
# for sensor, fc in [('oli', fc_matchups_oli), ('etm', fc_matchups_etm), ('msi', fc_matchups_msi)]:
#     task = ee.batch.Export.table.toDrive(**{
#         'collection': fc, 
#         'description': f'TSS_export_{sensor}',
#         'folder': 'Earth Engine'})
#     task.start()
#     tasks.append(task)
3
all_completed = False
while not all_completed:
    check_tasks_status(tasks)
    states = [task.status()['state'] for task in tasks]
    if all(state in ['COMPLETED', 'FAILED', 'CANCEL_REQUESTED'] for state in states):
        all_completed = True
        
    else:
        time.sleep(30)
n_failed = sum([state in ['FAILED', 'CANCEL_REQUESTED'] for state in states])
print(f"All export tasks finished ({n_failed} tasks failed).")

[2024-04-18 16:08:44] Task IKJ3KZZVF2UFACMKVGTNWIBT(Rrs_KH_014501_StungTreng): [93mRUNNING[0m
[2024-04-18 16:08:44] Task PRX7WW3SL74SFE2JVMKITQRG(Rrs_KH_014901_Kratie): [93mRUNNING[0m
[2024-04-18 16:08:45] Task UON4ZXFC2ZBRMYDJB6LJNSZ6(Rrs_KH_019801_ChroyChangVar): [93mREADY[0m
[2024-04-18 16:08:45] Task AEURMZFP3O36WYO5PLPXQZBF(Rrs_KH_020102_PrekKdam): [93mREADY[0m
[2024-04-18 16:08:45] Task WQT3M25C32D5UYNVIS76FT4L(Rrs_KH_KNR_KohNorea): [93mREADY[0m
[2024-04-18 16:08:46] Task J6EMPQU2CGHBBMU5XPQZBI5O(Rrs_KH_OSP_OSP-MRC): [93mREADY[0m
[2024-04-18 16:08:46] Task KDSRTMSMOXQHOFRLVY5VIA52(Rrs_KH_SKB_SekongBridge): [93mREADY[0m
[2024-04-18 16:08:46] Task 2WXD2XK23RUFSYL2VR235GH5(Rrs_LA_013901_Pakse): [93mREADY[0m
[2024-04-18 16:08:47] Task 5ONODZN6DZGJIZGP4SUPGNMH(Rrs_TH_010501_ChiangSaen): [93mREADY[0m
[2024-04-18 16:08:47] Task PXRWQAFKRGJTTEKMQYVIDLMX(Rrs_TH_011903_ChiangKhan): [93mREADY[0m
[2024-04-18 16:08:48] Task 5NNA7XIESQUHLDC5O4NLOS2I(Rrs_TH_012001_NongKhai):

In [151]:
import ast 

path_drive = Path(r"G:\Meine Ablage\Earth Engine\SOSW_SPM_Rrs")
paths_csv = list(path_drive.glob('*.csv'))

df = pd.DataFrame([])
for path in paths_csv:
    try:
        df_temp = pd.read_csv(path)
        df = pd.concat([df, df_temp])
    except pd.errors.EmptyDataError:
        print(f'Note: {path.name} was empty. Skipping.')
        continue # will skip the rest of the block and move to next file

def parse_str_dict(string):
    string = string.replace('=', '":').replace(', ', ', "').replace('null', 'None')[1:-1]
    string = '{"' + string +'}'
    parsed_dict = dict(ast.literal_eval(string))
    return parsed_dict

df['values_eo'] = df.values_eo.apply(parse_str_dict)
df_values_eo = df['values_eo'].apply(pd.Series)
df = pd.concat([df.drop('values_eo', axis=1), df_values_eo], axis=1)
df['rg_ratio'] = df['red']/df['green']
df.hvplot.scatter(x='rg_ratio', y='value')

Note: Rrs_LA_011201_LuangPrabang.csv was empty. Skipping.
Note: Rrs_LA_013401_Savannakhet.csv was empty. Skipping.
Note: Rrs_LA_014101_BanMouang.csv was empty. Skipping.
Note: Rrs_LA_110101_BanSibounhom.csv was empty. Skipping.
Note: Rrs_LA_120101_BanMixai.csv was empty. Skipping.
Note: Rrs_LA_110201_BanKokVan.csv was empty. Skipping.
Note: Rrs_LA_120102_BanPakBakdownstream.csv was empty. Skipping.
Note: Rrs_LA_230102_ThaNgon.csv was empty. Skipping.
Note: Rrs_LA_230103_BanPakNgum.csv was empty. Skipping.
Note: Rrs_LA_320101_SeBangfai.csv was empty. Skipping.
Note: Rrs_LA_230205_MuongKasi.csv was empty. Skipping.
Note: Rrs_LA_350101_BanKengdone.csv was empty. Skipping.
Note: Rrs_LA_350601_Kengkok.csv was empty. Skipping.
Note: Rrs_TH_010801_ChiangKhong.csv was empty. Skipping.
Note: Rrs_TH_011904_PaMongDamSite.csv was empty. Skipping.
Note: Rrs_TH_050104_ChiangRai.csv was empty. Skipping.
Note: Rrs_TH_310102_NamKae.csv was empty. Skipping.
Note: Rrs_TH_370210_BanKaeSiChomphu.csv was em

In [None]:
# import time
# from tqdm.notebook import tqdm

# def monitor_ee_tasks(tasks, check_interval=30):
#     """
#     Monitor a list of GEE tasks and display tqdm.notebook progress bars for each.

#     Parameters:
#     - tasks: A list of ee.task objects to monitor.
#     - check_interval: Time in seconds between status checks.
#     """
#     # Initialize a dictionary to hold our progress bars
#     progress_bars = {}

#     # Initialize progress bars for each task
#     for task in tasks:
#         status = task.status()
#         progress_bars[status['id']] = tqdm(total=100, desc=f"Task {status['description']}")
#     try:
#         tasks_complete = []
#         while len(tasks)!=len(tasks_complete):
#             statuses = [task.status() for task in tasks]            
#             for status in statuses:
#                 id = status['id']
#                 if status['state'] in ['FAILED', 'CANCELLED', 'CANCEL_REQUESTED']:
#                         progress_bars[id].bar_style = 'danger'
#                 elif status['state'] in ['COMPLETED']:
#                     progress_bars[id].n = 100
#                     progress_bars[id].refresh()
#                     progress_bars[id].close()
#                     tasks_complete.append(status['id'])
#                 else:
#                     progress = status.get('progress', 0)
#                     progress_bars[id].n = progress
#                     progress_bars[id].refresh()            
#             # Wait for a bit before checking the status again
#             time.sleep(check_interval)
#     except KeyboardInterrupt:
#         print("Monitoring interrupted.")

# monitor_ee_tasks(tasks)

In [None]:
# comput tsm features
# ic_all = ic_all \
#     .map(funcs_turb.calc_spm_nechad) \
#     .map(funcs_turb.calc_tur_nechad) \
#     .map(funcs_turb.calc_tur_dogliotti) \
#     .map(funcs_turb.calc_indices)

In [None]:
# crs = ic_msi.first().select(0).projection().crs()
# scale = 30

# # wxee convert to xarray
# #ds_msi = ic_msi.select('B4').limit(25).wx.to_xarray(scale=scale, region=bounds)

# # geemap export
# geemap.ee_export_image_collection_to_drive(ee.ImageCollection(ic_msi).select('B4'), folder='export/oli', maxPixels=200000000, region=bounds, scale=30)

In [None]:
# # Export red bands to geotiffs
# geemap.ee_export_image_collection_to_drive(ee.ImageCollection(imcoll_etm).select('B3'), folder='export/msi', maxPixels=200000000, region=bounds, scale=30)
# geemap.ee_export_image_collection_to_drive(ee.ImageCollection(imcoll_oli).select('B4'), folder='export/oli', maxPixels=200000000, region=bounds, scale=30)
# geemap.ee_export_image_collection_to_drive(ee.ImageCollection(imcoll_msi).select('B4'), folder='export/oli', maxPixels=200000000, region=bounds, scale=30)