# Download and run zonal stats on WOfS water statistics

https://docs.digitalearthafrica.org/en/latest/data_specs/Landsat_WOfS_specs.html

For the WOfS daily data, the data need to be downloaded and then processed.

In [1]:
import sys, os, importlib, json, multiprocessing
import rasterio, boto3

import pandas as pd
import geopandas as gpd
import GOSTRocks.rasterMisc as rMisc

from tqdm import tqdm
from osgeo import gdal
from shapely.geometry import shape
from botocore import UNSIGNED
from botocore.config import Config
from GOSTRocks.misc import tPrint

bucket = 'deafrica-services'
prefix = 'wofs_ls' # 'wofs_ls_summary_annual'
region = 'af-south-1'
s3client = boto3.client('s3', region_name='af-south-1', config=Config(signature_version=UNSIGNED))

In [2]:
in_folder = "/home/wb411133/projects/WOFS_Walker"
pt_files = [os.path.join(in_folder, x) for x in os.listdir(in_folder) if x.endswith(".shp")]

in_extents = gpd.read_file("wofs_ls_summary_alltime-regions-deafrica-data.geojson")
in_extents['COL'] = in_extents['region_code'].apply(lambda x: int(x.split("_")[0]) + 84) # 181 These additions transform numbers for downloads
in_extents['ROW'] = in_extents['region_code'].apply(lambda x: int(x.split("_")[1]) + 96) #
in_extents['COL_ROW'] = in_extents.apply(lambda x: f"{x['COL']}_{x['ROW']}", axis=1)
extents_index = in_extents.sindex
                                         
landsat_extents = gpd.read_file("/home/public/Data/GLOBAL/WRS2_descending.shp")
landsat_index = landsat_extents.sindex        


In [3]:
final_folder = os.path.join(in_folder, "WOFS_Daily_summaries")
if not os.path.exists(final_folder):
    os.makedirs(final_folder)

In [4]:
wofs_tiles = pd.read_csv("WOFS_AWS_SUMMARY.csv", index_col=0)
wofs_tiles['MONTH'] = wofs_tiles['Key'].apply(lambda x: x.split("/")[-3])
wofs_tiles['DAY'] = wofs_tiles['Key'].apply(lambda x: x.split("/")[-2])
wofs_tiles.head()

  mask |= (ar1 == a)


Unnamed: 0,Key,LastModified,ETag,Size,StorageClass,COL,ROW,COL_ROW,TYPE,YEAR,MONTH,DAY
0,wofs_ls/1-0-0/148/072/2005/11/21/wofs_ls_14807...,2021-09-01 04:58:11+00:00,"""865518daa97eb5abada45389ca8275b0""",3.961378,STANDARD,148,72,148_72,water,2005,11,21
1,wofs_ls/1-0-0/148/072/2005/12/23/wofs_ls_14807...,2021-09-01 04:06:08+00:00,"""ada110dbc2125e9f887503c1d1eb6099""",3.956423,STANDARD,148,72,148_72,water,2005,12,23
2,wofs_ls/1-0-0/148/072/2006/01/24/wofs_ls_14807...,2021-09-01 04:09:59+00:00,"""8baba74aab96bd79fdf4e1e598f5eba0""",4.412727,STANDARD,148,72,148_72,water,2006,1,24
3,wofs_ls/1-0-0/148/072/2006/02/25/wofs_ls_14807...,2021-09-01 03:58:38+00:00,"""d960813048736658c36542711939e9f7-2""",8.448545,STANDARD,148,72,148_72,water,2006,2,25
4,wofs_ls/1-0-0/148/072/2006/04/30/wofs_ls_14807...,2021-09-01 04:10:04+00:00,"""58b52ddc11ff59c1b56d7d20334d0052""",3.539613,STANDARD,148,72,148_72,water,2006,4,30


# Download and create VRT files for WOFS data

In [6]:
sel_year = 2011
out_folder = os.path.join(in_folder, "IMAGES", str(sel_year))
if not os.path.exists(out_folder):
    os.makedirs(out_folder)
    
for pt_file in pt_files:
    # Read in the current points file
    pt_name = os.path.basename(pt_file)[:-4]
    final_year = os.path.join(final_folder, f'{pt_name}_{sel_year}.csv')
    if not os.path.exists(final_year):
        curPt = gpd.read_file(pt_file)
        pt_geom = [(x.x, x.y) for x in curPt['geometry']]
        pt_final = curPt.copy()
        # Find intersecting Landsat tiles
        potential_tiles = landsat_extents.iloc[list(landsat_index.intersection(curPt.total_bounds))].sort_values(['PATH','ROW'])
        tPrint(f"Downloading imagery for {pt_name} in {sel_year}")
        for idx, row in potential_tiles.iterrows():
            # For each Landsat tile, identify the WOFS tiles
            sel_wofs = wofs_tiles.loc[(wofs_tiles['ROW'] == row['ROW']) & 
                                      (wofs_tiles['COL'] == row['PATH']) & 
                                      (wofs_tiles['TYPE'] == 'water') &
                                      (wofs_tiles['YEAR'] == sel_year)]
            if sel_wofs.shape[0] > 0:
                # Download each image in the selected tile, organized by day and month
                for sel_idx, sel_row in sel_wofs.iterrows():
                    image_folder = os.path.join(out_folder, sel_row['MONTH'], sel_row['DAY'])
                    if not os.path.exists(image_folder):
                        os.makedirs(image_folder)
                    out_file = os.path.join(image_folder, os.path.basename(sel_row['Key']))
                    if not os.path.exists(out_file):
                        s3client.download_file(bucket, sel_row['Key'], out_file)                        
        tPrint('Download Complete')
        # Create VRT file for each YEAR_MONTH_DAY 
        all_dirs = []
        all_vrts = []
        for root, dirs, files in os.walk(out_folder):
            if os.path.basename(root) != str(sel_year):
                for d in dirs:
                    all_dirs.append(os.path.join(root, d))
            for c_dir in all_dirs:
                out_vrt = os.path.join(os.path.dirname(c_dir), f'WOFS_VRT_{"_".join(c_dir.split("/")[-3:])}.vrt')
                vrt_files = [os.path.join(c_dir, x) for x in os.listdir(c_dir)]                       
                vrt_options = gdal.BuildVRTOptions(resampleAlg='cubic', addAlpha=True)
                my_vrt = gdal.BuildVRT(out_vrt, vrt_files, options=vrt_options)
                my_vrt = None
                all_vrts.append(out_vrt)
        # Run zonal stats on created VRT files
        all_vrts.sort()
        for vrt in all_vrts:
            if not os.path.basename(vrt)[:-4] in pt_final.columns:
                curR = rasterio.open(vrt)
                pt_res = [x[0] for x in curR.sample(pt_geom)]
                pt_final[os.path.basename(vrt)[:-4]] = pt_res
                # tPrint(os.path.basename(vrt)[:-4])

        pd.DataFrame(pt_final.drop(['geometry'], axis=1)).to_csv(final_year)




In [7]:
xx = pd.DataFrame(pt_final.drop(['geometry'], axis=1))
xx.head()

NameError: name 'pt_final' is not defined

In [5]:
final_year

NameError: name 'final_year' is not defined

# Run grid-base zonal stats

In [8]:
unq_vals = [128, 130, 160]

grid_folder = "/home/wb411133/projects/WOFS_Walker/GRIDS"
grids = [os.path.join(grid_folder, x) for x in os.listdir(grid_folder) if x.endswith(".shp")]

vrt_folder = "/home/wb411133/projects/WOFS_Walker/IMAGES/2011"
vrts = []
for root, dirs, files in os.walk(vrt_folder):
    for f in files:
        if f.endswith(".vrt"):
            vrts.append(os.path.join(root, f))


In [9]:
# Group VRTs into months
vrt_months = {}
for vrt in vrts:
    cur_month = vrt.split("_")[-2]
    try:
        vrt_months[cur_month].append(vrt)
    except:
        vrt_months[cur_month] = [vrt]

In [10]:
vrt_months

{'01': ['/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_07.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_23.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_15.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_31.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_14.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_22.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_30.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_05.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_21.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_12.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_20.vrt',
  '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_28.vrt',
  '/home/wb411133/projects/WOFS_Wa

### Run straight up

In [11]:
for cur_grid in grids:
    grid_folder = os.path.join(final_folder, os.path.basename(cur_grid).replace(".shp", ""))
    if not os.path.exists(grid_folder):
        os.makedirs(grid_folder)
    for month, vrts in vrt_months.items():
        try:
            del final
        except:
            pass
        out_file = os.path.join(grid_folder, os.path.basename(cur_grid).replace(".shp", f"_{month}.csv"))
        if not os.path.exists(out_file):
            curD = gpd.read_file(cur_grid)
            pbar = tqdm(vrts)
            pbar.set_description(f'Processing {os.path.basename(cur_grid).replace(".shp", "")} {month}')

            for cur_vrt in pbar:
                curR = rasterio.open(cur_vrt)
                if curD.crs != curR.crs:
                    curD = curD.to_crs(curR.crs)
                date = "_".join(cur_vrt.split("_")[-3:]).replace(".vrt", "")

                res = rMisc.zonalStats(curD, curR, rastType='C', unqVals=unq_vals)
                res = pd.DataFrame(res, columns=[f'{date}_{x}' for x in unq_vals])

                try:
                    final = final.join(res)                
                except:
                    final = res            
            final['GRID_ID'] = curD['GRID_ID']
            final.to_csv(out_file)

Processing grid_BFA01_ply 01:   0%|          | 0/29 [01:12<?, ?it/s]


KeyboardInterrupt: 

### Multi-processing

In [12]:
unq_vals = [128, 130, 160]
def run_zonal(x_cur_grid, x_vrts, x_out_file):
    curD = gpd.read_file(x_cur_grid)
    #pbar = tqdm(x_vrts)
    #pbar.set_description(f'Processing {os.path.basename(x_out_file).replace(".csv", "")}')
    for cur_vrt in vrts:
        curR = rasterio.open(cur_vrt)
        if curD.crs != curR.crs:
            curD = curD.to_crs(curR.crs)
        date = "_".join(cur_vrt.split("_")[-3:]).replace(".vrt", "")
        
        res = rMisc.zonalStats(curD, curR, rastType='C', unqVals=unq_vals)
        res = pd.DataFrame(res, columns=[f'{date}_{x}' for x in unq_vals])

        try:
            final = final.join(res)                
        except:
            final = res
        # tPrint(cur_vrt)
    final['GRID_ID'] = curD['GRID_ID']
    final.to_csv(x_out_file)
    tPrint(f'Completed {os.path.basename(x_out_file).replace(".csv", "")}')
    

In [13]:
try:
    del final
except:
    pass

mp_args = []        
for cur_grid in grids:
    for month, vrts in vrt_months.items():
        out_file = os.path.join(final_folder, os.path.basename(cur_grid).replace(".shp", f"_{month}2011.csv"))
        if not os.path.exists(out_file):
            mp_args.append([cur_grid, vrts, out_file])

print (len(mp_args))
            

1


In [17]:
mp_args

[['/home/wb411133/projects/WOFS_Walker/GRIDS/grid_BFA01_ply.shp',
  ['/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_07.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_23.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_15.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_31.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_14.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_22.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_30.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_05.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_21.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_12.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMAGES/2011/01/WOFS_VRT_2011_01_20.vrt',
   '/home/wb411133/projects/WOFS_Walker/IMA

In [None]:
nCores = min([60, len(mp_args)])
with multiprocessing.Pool(nCores) as pool:
    pool.starmap(run_zonal, mp_args)

# DEBUGGING