# LAZ Files At Scale

0. Imports, session etc
1. Set Up AWS S3
2. Load Sync + Load Async
3. Enrich with H3 including performance tests (or at least a nod to)


# 0. Imports, Create Session Etc.

In [1]:
import json
import numpy as np
from snowflake.snowpark.session import Session
from snowflake.snowpark.functions import sproc, col, round
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T
import pandas as pd
import os
import sys

In [2]:
from snowflake.snowpark import DataFrame

In [3]:
snowflake_connection_cfg = json.loads(open("/Users/mitaylor/Documents/creds/creds.json").read())
aws_connection_cfg = json.loads(open("/Users/mitaylor/Documents/creds/aws_creds.json").read())

AWS_SECRET_KEY = aws_connection_cfg['password']
AWS_KEY_ID = aws_connection_cfg['account']
# Creating Snowpark Session
session = Session.builder.configs(snowflake_connection_cfg).create()

# Create a virtual warehouse, db and a stage for our ML models
session.sql("CREATE OR REPLACE WAREHOUSE LAZ_VH WITH WAREHOUSE_SIZE='X-SMALL'").collect()
session.sql("USE DATABASE LAZ_DB").collect()
#session.sql("CREATE OR REPLACE DATABASE LAZ_DB").collect()
#session.sql("CREATE OR REPLACE STAGE PIPELINE").collect()

[Row(status='Statement executed successfully.')]

# 1. Setup AWS S3 Stage

In [4]:
session.use_schema('PUBLIC')
session.sql(f"""
CREATE OR REPLACE STAGE LAZ_S3_STAGE 
URL = 's3://mtaylor-raw-data-store/laz'
CREDENTIALS = (AWS_KEY_ID = '{AWS_KEY_ID}'  AWS_SECRET_KEY = '{AWS_SECRET_KEY}')
file_format = (type = 'CSV' field_delimiter = ',');
""").collect()

[Row(status='Stage area LAZ_S3_STAGE successfully created.')]

In [5]:
session.sql('LS @LAZ_S3_STAGE').collect()

[Row(name='s3://mtaylor-raw-data-store/laz/TQ2080_P_9983_20150206_20150206.laz', size=12135892, md5='160f236219c5b34354508702cde549a2', last_modified='Mon, 31 Jul 2023 14:25:11 GMT'),
 Row(name='s3://mtaylor-raw-data-store/laz/TQ2082_P_9983_20150206_20150206.laz', size=11113655, md5='4dba0ace4ccbfa21f825fda13b2f3a8c', last_modified='Mon, 31 Jul 2023 14:27:51 GMT'),
 Row(name='s3://mtaylor-raw-data-store/laz/TQ2084_P_9983_20150206_20150206.laz', size=552357, md5='c89abbdf9da9e25a2b232210447b436f', last_modified='Mon, 31 Jul 2023 14:27:31 GMT'),
 Row(name='s3://mtaylor-raw-data-store/laz/TQ2280_P_9983_20150206_20150206.laz', size=7218310, md5='664d0c2b9963ba7d32ecc2fdd983ea92', last_modified='Mon, 31 Jul 2023 14:27:16 GMT'),
 Row(name='s3://mtaylor-raw-data-store/laz/TQ2282_P_9983_20150206_20150206.laz', size=35108512, md5='fb084d58a1acc43f890fffb8d54c2f45-3', last_modified='Mon, 31 Jul 2023 14:25:07 GMT'),
 Row(name='s3://mtaylor-raw-data-store/laz/TQ2284_P_9983_20150206_20150206.laz', 

# 2.1 Synchronous SPROC (fine for small file volumes)

In [8]:
@sproc(name='import_laz_sync', 
       packages=['snowflake-snowpark-python','laspy', 'lazrs-python', 'pyproj'], 
       is_permanent=True, 
       replace=True,
       stage_location='@PIPELINE', 
       session=session)
def import_laz_sync(session: Session, df_iloc_start: int, df_iloc_end: int) -> T.Variant:
    from snowflake.snowpark.files import SnowflakeFile
    import pandas as pd
    from pyproj import Proj, transform
    import pandas as pd
    import laspy
    
    
    v84 = Proj(proj="latlong",towgs84="0,0,0",ellps="WGS84")
    v36 = Proj(proj="latlong", k=0.9996012717, ellps="airy",
            towgs84="446.448,-125.157,542.060,0.1502,0.2470,0.8421,-20.4894")
    vgrid = Proj(init="world:bng")
    
    def vectorized_convert(df):
        vlon36, vlat36 = vgrid(df['x'].values, 
                               df['y'].values, 
                               inverse=True)
        converted = transform(v36, v84, vlon36, vlat36)
        df['longitude'] = converted[0]
        df['latitude'] = converted[1]
        return df
    
    
    scope = pd.DataFrame(session.sql('LS @LAZ_S3_STAGE').collect())
    scope = scope.iloc[df_iloc_start:df_iloc_end,:]
    
    for row in range(len(scope)):
        raw_url = scope.iloc[row][0].split('/laz')[1]
        scoped_url = session.sql(f"SELECT BUILD_SCOPED_FILE_URL( @MY_S3_TRAIN_STAGE , '{raw_url}')").collect()[0][0]
        with SnowflakeFile.open(scoped_url, 'rb') as f:
            inFile = laspy.read(f)  
            #Import LAS into numpy array (X=raw integer value x=scaled float value)
            lidar_points = np.array((inFile.x,inFile.y,inFile.z,inFile.intensity,
                           inFile.raw_classification,inFile.scan_angle_rank)).transpose()
            
            #Transform to pandas DataFrame
            lidar_df=pd.DataFrame(lidar_points, columns=["x","y","z","intensity","classification", "scan_angle_rank"])

            #Update with Long/Lat
            lidar_df = vectorized_convert(lidar_df)
            
            sdf = session.create_dataframe(lidar_df)
            sdf.write.save_as_table("LIDAR_DATASET", mode="append") # do this async?
            #session.write_pandas(lidar_df, table_name='LIDAR_DATASET', auto_create_table=True, overwrite=True)
            
    return ("LAZ FILES INGESTED")

Package 'lazrs-python' is not installed in the local environment. Your UDF might not work when the package is installed on the server but not on your local environment.


In [9]:
%%time
import_laz_sync(0,3)

CPU times: user 27 ms, sys: 4.59 ms, total: 31.6 ms
Wall time: 1min 1s


'"LAZ FILES INGESTED"'

# 2.2 Asynchronous SPROC (fine for small file volumes)

Note we could be smarter here, divvy up the work based on the file sizes, but a long enough list should even out the problem

In [15]:
from math import ceil
file_count = len(pd.DataFrame(session.sql('LS @my_s3_train_stage').collect()))
chunks = 4
step_size = ceil(file_count/chunks)

8

In [17]:
for i in range(0,file_count,step_size):
    end = i + step_size
    session.sql(f'''CALL import_laz_sync({i},{end})''').collect(block=False)

# 3. Enrich With H3

In [18]:
h3_sdf = session.sql('''
select
H3_LATLNG_TO_CELL(lidar_dataset_gis."longitude", lidar_dataset_gis."latitude", 12) as HEX12,
H3_LATLNG_TO_CELL(lidar_dataset_gis."longitude", lidar_dataset_gis."latitude", 11) as HEX11,
H3_LATLNG_TO_CELL(lidar_dataset_gis."longitude", lidar_dataset_gis."latitude", 10) as HEX10,
H3_LATLNG_TO_CELL(lidar_dataset_gis."longitude", lidar_dataset_gis."latitude", 9) as HEX9,
H3_LATLNG_TO_CELL(lidar_dataset_gis."longitude", lidar_dataset_gis."latitude", 8) as HEX8,
H3_LATLNG_TO_CELL(lidar_dataset_gis."longitude", lidar_dataset_gis."latitude", 7) as HEX7,
H3_LATLNG_TO_CELL(lidar_dataset_gis."longitude", lidar_dataset_gis."latitude", 6) as HEX6,
"z","intensity","classification", "scan_angle_rank" from lidar_dataset_gis''')

In [None]:
%%time
h3_sdf.write.mode("overwrite").save_as_table("LIDAR_DATASET_H3")