# Processing Plot Lidar

In this notebook we will create a lidar point cloud for each plot. This includes
- Cropping to the plot geometry
- Filter out noise or non ground, veg or undefined points
- Saving cloud as a cloud optimised point cloud (COPC)

To process the data we will use PDAL pipelines.
We will also use dask to run the processing in parallel.

NOTE: Processing of plot lidar is really only doene for visualisation purposes.

In [1]:
from pathlib import Path
import json

import geopandas as gpd
import pdal
import pandas as pd

## Pipeline Template

In [None]:
# Note, we could have this just a string, but as a dict allows us to add comments
def create_plot_pipeline(input_path: str = '', output_path: str = '', polygon_wkt: str = ''):
    pipeline_dict = [
        # Read the input LAS file
        {
            "type": "readers.copc",
            "filename": input_path,
            "polygon": polygon_wkt,
        },
        
        # Only take unclassified, ground and vegetation points
        {
            "type": "filters.range",
            "limits": "Classification[0:5]",
        },

        # Save as a COPC file
        {
            "type": "writers.copc",
            "filename": output_path,
            "forward": "scale,offset",
            "extra_dims": "all"
        }
    ]

    return json.dumps(pipeline_dict, indent=2)

### Plot pipelines

In [3]:
plots_gdf = gpd.read_file("../data/outputs/plots/plots.geojson")
plots_gdf = plots_gdf.set_index('id')
plots_gdf.head()

Unnamed: 0_level_0,site,plot_number,site_plot_id,geometry
id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1
AGG_O_01_P1,AGG_O_01,1,AGG_O_01_P1,"POLYGON ((463042.83 5259846.736, 463025.797 52..."
AGG_O_01_P2,AGG_O_01,2,AGG_O_01_P2,"POLYGON ((463124.556 5259819.234, 463116.068 5..."
AGG_O_01_P3,AGG_O_01,3,AGG_O_01_P3,"POLYGON ((463201.174 5259815.806, 463200.551 5..."
AGG_O_01_P4,AGG_O_01,4,AGG_O_01_P4,"POLYGON ((463257.777 5259801.962, 463245.303 5..."
AGG_O_01_P5,AGG_O_01,5,AGG_O_01_P5,"POLYGON ((463303.022 5259789.552, 463289.794 5..."


In [4]:
outputs_dir = Path("../data/outputs")
sites_lidar_dir = outputs_dir / "sites" / "lidar"
plots_lidar_dir = outputs_dir / "plots" / "lidar"
plots_lidar_dir.mkdir(parents=True, exist_ok=True)


def create_pipeline_from_plot(plot_row):
    site_id = plot_row["site"]
    site_plot_id = plot_row["site_plot_id"]

    input_path = str(sites_lidar_dir / f"{site_id}.copc.laz")
    output_path = str(plots_lidar_dir / f"{site_plot_id}.copc.laz")
    polygon_wkt = plot_row.geometry.wkt

    return pd.Series(
        {
            "pipeline": create_plot_pipeline(
                input_path=input_path, output_path=output_path, polygon_wkt=polygon_wkt
            )
        }
    )


pipelines = plots_gdf.apply(create_pipeline_from_plot, axis=1)
pipelines

Unnamed: 0_level_0,pipeline
id,Unnamed: 1_level_1
AGG_O_01_P1,"[\n {\n ""type"": ""readers.copc"",\n ""file..."
AGG_O_01_P2,"[\n {\n ""type"": ""readers.copc"",\n ""file..."
AGG_O_01_P3,"[\n {\n ""type"": ""readers.copc"",\n ""file..."
AGG_O_01_P4,"[\n {\n ""type"": ""readers.copc"",\n ""file..."
AGG_O_01_P5,"[\n {\n ""type"": ""readers.copc"",\n ""file..."
...,...
ULY_Y_96_P1,"[\n {\n ""type"": ""readers.copc"",\n ""file..."
ULY_Y_96_P2,"[\n {\n ""type"": ""readers.copc"",\n ""file..."
ULY_Y_96_P3,"[\n {\n ""type"": ""readers.copc"",\n ""file..."
ULY_Y_96_P4,"[\n {\n ""type"": ""readers.copc"",\n ""file..."


## Processing

In [5]:
def process_pdal_pipeline(pipeline: str, return_data: bool = False):
    """
    Process a PDAL pipeline string.

    Args:
        pipeline (str): The PDAL pipeline JSON string.
        return_data (bool): If True, return the PDAL Pipeline object after execution. Defaults to False. Returning pipeline data
        will contain metadata and all the points processed by the pipeline. This can be a large object so defaults to False.
    """
    pipeline_obj = pdal.Pipeline(pipeline)
    count = pipeline_obj.execute()  # Execute the pipeline
    return (count, pipeline_obj if return_data else None)

In [6]:
%%time

test_pipeline = pipelines.loc['AGG_O_01_P1']

(count, pl) = process_pdal_pipeline(test_pipeline.pipeline, return_data=True)
print(f"Processed {count} points.")

points = pl.arrays[0]
points_df = pd.DataFrame(pl.arrays[0])
points_df.head()

Processed 252716 points.
CPU times: user 940 ms, sys: 59.5 ms, total: 1e+03 ms
Wall time: 883 ms


Unnamed: 0,X,Y,Z,Intensity,ReturnNumber,NumberOfReturns,ScanDirectionFlag,EdgeOfFlightLine,Classification,Synthetic,...,UserData,PointSourceId,GpsTime,ScanChannel,Red,Green,Blue,Infrared,HeightAboveGround,Altitude
0,462992.467,5259864.165,0.0,30703,2,2,0,0,2,0,...,4,1,411879500.0,0,10280,12079,17733,18852,0.0,507.014146
1,462992.616,5259864.525,0.0,29779,2,2,0,0,2,0,...,4,1,411879500.0,0,13878,15677,21331,15782,0.0,506.996145
2,462991.933,5259862.205,0.0,31499,1,1,0,0,2,0,...,4,1,411879500.0,0,10023,11822,18504,10908,0.0,507.256154
3,462992.514,5259863.611,0.0,30821,2,2,0,0,2,0,...,4,1,411879500.0,0,10794,12593,18247,17575,0.0,507.021149
4,462992.42,5259863.384,1.624,29910,1,2,0,0,0,0,...,12,1,411879500.0,0,11051,12850,18247,14702,1.624,508.72215


In [7]:
from dask.distributed import Client

client = Client()  # Start a Dask client
client

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 4
Total threads: 8,Total memory: 16.00 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:61175,Workers: 0
Dashboard: http://127.0.0.1:8787/status,Total threads: 0
Started: Just now,Total memory: 0 B

0,1
Comm: tcp://127.0.0.1:61187,Total threads: 2
Dashboard: http://127.0.0.1:61189/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:61178,
Local directory: /var/folders/37/j4yld2bd7pz4_0p7b249nvv40000gn/T/dask-scratch-space/worker-q_8lkqzc,Local directory: /var/folders/37/j4yld2bd7pz4_0p7b249nvv40000gn/T/dask-scratch-space/worker-q_8lkqzc

0,1
Comm: tcp://127.0.0.1:61188,Total threads: 2
Dashboard: http://127.0.0.1:61190/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:61180,
Local directory: /var/folders/37/j4yld2bd7pz4_0p7b249nvv40000gn/T/dask-scratch-space/worker-giwhptt8,Local directory: /var/folders/37/j4yld2bd7pz4_0p7b249nvv40000gn/T/dask-scratch-space/worker-giwhptt8

0,1
Comm: tcp://127.0.0.1:61193,Total threads: 2
Dashboard: http://127.0.0.1:61195/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:61182,
Local directory: /var/folders/37/j4yld2bd7pz4_0p7b249nvv40000gn/T/dask-scratch-space/worker-jmguag7t,Local directory: /var/folders/37/j4yld2bd7pz4_0p7b249nvv40000gn/T/dask-scratch-space/worker-jmguag7t

0,1
Comm: tcp://127.0.0.1:61194,Total threads: 2
Dashboard: http://127.0.0.1:61197/status,Memory: 4.00 GiB
Nanny: tcp://127.0.0.1:61184,
Local directory: /var/folders/37/j4yld2bd7pz4_0p7b249nvv40000gn/T/dask-scratch-space/worker-78nxmimf,Local directory: /var/folders/37/j4yld2bd7pz4_0p7b249nvv40000gn/T/dask-scratch-space/worker-78nxmimf


In [8]:
%%time

futures = client.map(process_pdal_pipeline, pipelines['pipeline'].to_list(), key=pipelines.index.to_list())
results = client.gather(futures)

CPU times: user 2.15 s, sys: 522 ms, total: 2.67 s
Wall time: 51.3 s


In [9]:
client.close()

In [10]:
total_points = 0

for r in results:
    total_points += r[0]

f"Total points: {total_points:,}"

'Total points: 52,940,785'