## This notebook performs the CRS transformation (EPSG:3857+NAVD88) to UTM_13N_G2139_3D during the pdal processing step itself
* This takes 5x amount of time than the no-transformation processing, and was not efficient for large tiles
* we parallelized the process and broke it into smaller tiles
* To avoid EPT streaming errors, we introduced a 20 second delay

In [None]:
# %% [markdown]
# Given an area of interest, we want to query and download available USGS 3DEP data. The data should conform to the bounds and crs of the input shape. Options are provided to filter out noise, and generate DTM/DSM, and saving the associated point cloud. We also provide an option to filter out LIDAR returns that exceed a user specified percentile threshold.

# %%
# PDAL imports
import pdal

# rasterio imports
import rasterio
from rasterio.warp import transform_bounds
from rasterio import warp
from rasterio.merge import merge

# GIS imports
from pyproj import CRS
from shapely.geometry import Polygon

# Dataframe imports
import pandas as pd
import geopandas as gpd

# Misc imports
import json
import requests
from pathlib import Path
import os
import time

In [None]:
# %%
def return_readers(input_aoi, src_crs, pointcloud_resolution = 1, n_rows = 5, n_cols=5, buffer_value=0):
    """
    This method takes a raster file and finds overlapping 3DEP data. It then returns a series of readers
    corresponding to non overlapping areas that can be used as part of further PDAL processing pipelines
    The method also returns the CRS specified i
    """
    xmin, ymin, xmax, ymax = input_aoi.bounds
    x_step = (xmax - xmin) / n_cols
    y_step = (ymax - ymin) / n_rows

    dst_crs = CRS.from_epsg(4326)

    readers = []
    pointcloud_input_crs = []

    for i in range(int(n_cols)):
        for j in range(int(n_rows)):
            aoi = Polygon.from_bounds(xmin+i*x_step, ymin+j*y_step, xmin+(i+1)*x_step, ymin+(j+1)*y_step)

            src_bounds_transformed = transform_bounds(src_crs, dst_crs, *aoi.bounds)
            aoi_4326 = Polygon.from_bounds(*src_bounds_transformed)

            src_bounds_transformed_3857 = transform_bounds(src_crs, CRS.from_epsg(3857), *aoi.bounds)
            aoi_3857 = Polygon.from_bounds(*src_bounds_transformed_3857)
           # print(aoi.bounds, src_bounds_transformed_3857)
            if buffer_value:
                aoi_3857.buffer(buffer_value)

            gdf = gpd.read_file('https://raw.githubusercontent.com/hobuinc/usgs-lidar/master/boundaries/resources.geojson').set_crs(4326) 
            # in the eventuality that the above URL breaks, we store a local copy
            # gdf = gpd.read_file('../data/shapefiles/resources.geojson').set_crs(4326)

            for _, row in gdf.iterrows():
                if row.geometry.intersects(aoi_4326):
                    usgs_dataset_name = row['name']
                    break

            #print("Dataset being used: ", usgs_dataset_name)
            url = f"https://s3-us-west-2.amazonaws.com/usgs-lidar-public/{usgs_dataset_name}/ept.json"
            reader = {
            "type": "readers.ept",
            "filename": url,
            "resolution": pointcloud_resolution,
            "polygon": str(aoi_3857.wkt),
            }
            
            # SRS associated with the 3DEP dataset
            response = requests.get(url)
            data = response.json()
            # srs_wkt = data['srs']['wkt']
            with open("SRS_CRS.wkt", 'r') as f:
                srs_wkt = f.read()
            
            pointcloud_input_crs.append(CRS.from_wkt(srs_wkt))
            # print("SRS CRS: ", srs_wkt)
            readers.append(reader)

    return readers, pointcloud_input_crs

In [None]:
# %%
# function that returns a PDAL pipeline to create a pointcloud based on user flags
def create_pdal_pipeline(filter_low_noise=True, filter_high_noise=True, 
                         filter_road=True, reset_classes=False, reclassify_ground=False,
                         return_only_ground=False, percentile_filter=True, percentile_threshold=0.95,
                         group_filter=None, reproject=True, save_pointcloud=False, 
                         pointcloud_file = 'pointcloud', input_crs=None,
                         output_crs=None, output_type='laz'):
    
    assert abs(percentile_threshold) <= 1, "Percentile threshold must be in range [0, 1]"
    assert output_type in ['las', 'laz'], "Output type must be either 'las' or 'laz'"
    assert output_crs is not None, "Argument 'output_crs' must be explicitly specified!"

    stage_filter_low_noise = {
        "type":"filters.range",
        "limits":"Classification![7:7]"
    }
    stage_filter_high_noise = {
        "type":"filters.range",
        "limits":"Classification![18:18]"
    }
    stage_filter_road = {
        "type":"filters.range",
        "limits":"Classification![11:11]"
    }
    stage_reset_classes = {
        "type":"filters.assign",
        "value":"Classification = 0"
    }
    stage_reclassify_ground = {
        "type":"filters.smrf",
        # added from pdal smrf documentation, in turn from Pingel, 2013
        "scalar":1.2,
        "slope":0.2,
        "threshold":0.45,
        "window":8.0
    }
    stage_group_filter = {
        "type":"filters.returns",
        "groups":group_filter
    }
    stage_percentile_filter =  {
        "type":"filters.python",
        "script":"filter_percentile.py",
        "pdalargs": {"percentile_threshold":percentile_threshold},
        "function":"filter_percentile",
        "module":"anything"
    }
    stage_return_ground = {
        "type":"filters.range",
        "limits":"Classification[2:2]"
    }

    stage_reprojection = {
        "type":"filters.reprojection",
        "out_srs":str(output_crs)
    }
    if input_crs is not None:
        stage_reprojection["in_srs"] = str(input_crs)
    
    stage_save_pointcloud_las = {
        "type": "writers.las",
        "filename": f"{pointcloud_file}.las"
    }
    stage_save_pointcloud_laz = {
        "type": "writers.las",
        "compression": "true",
        "minor_version": "2",
        "dataformat_id": "0",
        "filename": f"{pointcloud_file}.laz"
    }

    # Build pipeline
    pipeline = []

    # resetting the original classifications resets 
    # all point classifications to 0 (Unclassified)    
    if reset_classes:
        pipeline.append(stage_reset_classes)
        if reclassify_ground:
            pipeline.append(stage_reclassify_ground)
    else:
        # we apply the percentile filter first as it 
        # classifies detected outliers as 'high noise'
        if group_filter is not None:
            pipeline.append(stage_group_filter)
        if percentile_filter:
            pipeline.append(stage_percentile_filter)
        if filter_low_noise:
            pipeline.append(stage_filter_low_noise)
        if percentile_filter or filter_high_noise:
            pipeline.append(stage_filter_high_noise)
        if filter_road:
            pipeline.append(stage_filter_road)

    # For creating DTMs, we want to process only ground returns
    if return_only_ground:
        pipeline.append(stage_return_ground)
    
    if reproject:
        pipeline.append(stage_reprojection)
    
    # the pipeline can save the pointclouds to a separate file if needed
    if save_pointcloud:
        if output_type == 'laz':
            pipeline.append(stage_save_pointcloud_laz)
        else:
            pipeline.append(stage_save_pointcloud_las)

    return pipeline

# %%
# function that returns a PDAL pipeline to create a DEM based on user flags
def create_dem_stage(dem_filename='dem_output.tif', pointcloud_resolution=1.,
                        gridmethod='idw', dimension='Z'):
    dem_stage = {
            "type":"writers.gdal",
            "filename":dem_filename,
            "gdaldriver":'GTiff',
            "nodata":-9999,
            "output_type":gridmethod,
            "resolution":float(pointcloud_resolution),
            "gdalopts":"COMPRESS=LZW,TILED=YES,blockxsize=256,blockysize=256,COPY_SRC_OVERVIEWS=YES"
    }

    if dimension == 'Z':
        dem_stage.update({
            'dimension': 'Z',
            'where': 'Z>0'
        })
    else:
        dem_stage.update({
            'dimension':dimension
        })
    
    return [dem_stage]

# %%
# bounds for which pointcloud is created
gdf = gpd.read_file('processing_extent.geojson')
xmin, ymin, xmax, ymax = gdf.iloc[0].geometry.bounds

input_aoi = Polygon.from_bounds(xmin, ymin, xmax, ymax)
input_crs = gdf.crs.to_wkt()

In [None]:
 # specify the input CRS of EPT point clouds
with open('SRS_CRS.wkt', 'r') as f:
    INPUT_PC_CRS = ' '.join(f.read().replace('\n', '').split())

In [None]:
 # specify the output CRS of DEMs
with open('UTM_13N_WGS84_G2139_3D.wkt', 'r') as f:
    OUTPUT_CRS = ' '.join(f.read().replace('\n', '').split())

In [None]:
# The method returns pointcloud readers, as well as the pointcloud file CRS as a WKT string
# Specfying a buffer_value > 0 will generate overlapping DEM tiles, resulting in a seamless
# final mosaicked DEM
readers, POINTCLOUD_CRS = return_readers(input_aoi, input_crs, 
pointcloud_resolution = 1, n_rows=20, n_cols=20, buffer_value=0)

In [None]:
# Set pointcloud processing parameters 
FILTER_LOW_NOISE = True
FILTER_HIGH_NOISE = True
FILTER_ROAD = False
RETURN_ONLY_GROUND = False # Set true for DTM
RESET_CLASSES = False
RECLASSIFY_GROUND = False
PERCENTILE_FILTER = False # Set to True to apply percentile based filtering of Z values
PERCENTILE_THRESHOLD = 0.95 # Percentile value to filter out noisy Z returns

# "first,only" will return top of canopy, "last,only" will return lowest return
# set to None to use default values
GROUP_FILTER="first,only"

REPROJECT = True
SAVE_POINTCLOUD=False
DEM_RESOLUTION = 1
OUTPUT_TYPE='laz'
GRID_METHOD='idw'
DIMENSION='Z' # can be set to options accepted by writers.gdal. Set to 'intensity' to return intensity rasters

output_path = Path('tmp_dem_reproj/')
output_path.mkdir(exist_ok=True)

print(f"Number of readers: {len(readers)}")


In [None]:
from tqdm import tqdm
from multiprocessing import Pool
from functools import partial
import threading
import time
import random
import multiprocessing

In [None]:
def init(lock):
    global starting
    starting = lock


In [None]:
def parallel_pdal_execute(reader_counter_tuple):
    starting.acquire() # no other process can get it until it is released
    threading.Timer(20, starting.release).start() # release in 20 second
    reader,i = reader_counter_tuple
    dem_file = output_path / f'dem_tile_aoi_{str(i).zfill(4)}.tif'
    if os.path.exists(dem_file):
        return 0
    else:
        pipeline = {'pipeline':[reader]}
        pdal_pipeline = create_pdal_pipeline(
            filter_low_noise=FILTER_LOW_NOISE,
            filter_high_noise=FILTER_HIGH_NOISE,
            filter_road=FILTER_ROAD,
            reset_classes=RESET_CLASSES, reclassify_ground=RECLASSIFY_GROUND,
            return_only_ground=RETURN_ONLY_GROUND, 
            percentile_filter=PERCENTILE_FILTER, percentile_threshold=PERCENTILE_THRESHOLD,
            group_filter=GROUP_FILTER,
            reproject=REPROJECT,
            save_pointcloud=SAVE_POINTCLOUD, 
            pointcloud_file='pointcloud', input_crs = INPUT_PC_CRS,
            output_crs=OUTPUT_CRS, output_type=OUTPUT_TYPE
        )
        dem_stage = create_dem_stage(dem_filename=str(dem_file), 
                                        pointcloud_resolution=DEM_RESOLUTION, 
                                        gridmethod=GRID_METHOD, dimension=DIMENSION)
    
        # apply interpolation to fill gaps when generating DSM/DTM
        dem_stage[0]['window_size'] = 4
        
        pipeline['pipeline'] += pdal_pipeline
        pipeline['pipeline'] += dem_stage
        pipeline = pdal.Pipeline(json.dumps(pipeline))
        pipeline.execute()
        return 0

In [None]:
tasks = []
for i, reader in enumerate(readers):
    tuple_input = (reader,i)
    tasks.append(tuple_input)

In [None]:
readers[0]

In [None]:
def run_imap_multiprocessing(func, argument_list, num_processes):

    pool = Pool(processes=num_processes,initializer=init,initargs=[multiprocessing.Lock()])

    result_list_tqdm = []
    for result in tqdm(pool.imap(func=func, iterable=argument_list), total=len(argument_list)):
        result_list_tqdm.append(result)

    return result_list_tqdm

In [None]:
results = run_imap_multiprocessing(parallel_pdal_execute,tasks,num_processes=15)

In [None]:
!gdalbuildvrt dem_mos_utm13N_G2139.vrt tmp_dem_reproj/dem_tile*.tif


In [None]:
# convert to cloud optimised geotiff
#cmd 
!gdal_translate -co TILED=YES -co COMPRESS=LZW -co BIGTIFF=IF_SAFER -co COPY_SRC_OVERVIEWS=YES -co COMPRESS_OVERVIEW=YES -co NUM_THREADS=ALL_CPUS -co PREDICTOR=3 dem_mos_utm13N_G2139.vrt dem_mos_utm13N_G2139.tif

In [None]:
!ls dem_mos_utm13N_G2139.tif -lah

In [None]:
!gdaladdo -r gauss dem_mos_utm13N_G2139.tif 2 4 8 16 32 64 128 256 512 1024

In [None]:
!ls dem_mos_utm13N_G2139.tif -lah