# Cloud Native Geospatial Capstone Project

## Project Goals

### Summary

Create a file collection summary (e.g. number of datapoints / deployments)  by applying SQL commands to any parquet data set stored in `aodn-cloud-optimised` using DuckDB. (I use dask and S3 as well) 

### Test collection

slocum_glider_delayed_qc.parquet/ -> 553 objects, ~34.2GB 

### Motivation

Using familiar tools such as xarray and pandas are slow!



Import packages

In [36]:
import duckdb
import dask.dataframe as dd
import s3fs
import sys
import os
from io import StringIO
from datetime import datetime
import time
import matplotlib.pyplot as plt
import cartopy.feature as cfeature
import cartopy.crs as ccrs
import io
import base64

# for timing the notebook execution
notebook_start = time.perf_counter()


In [37]:
# get a list of all parquet datasets on S3

def list_parquet_files():
    fs = s3fs.S3FileSystem(anon=True)
    files = fs.glob(f's3://aodn-cloud-optimised/*.parquet')
    return files

available_parquet_datasets = list_parquet_files()
available_parquet_datasets

['aodn-cloud-optimised/animal_acoustic_tracking_delayed_qc.parquet',
 'aodn-cloud-optimised/animal_ctd_satellite_relay_tagging_delayed_qc.parquet',
 'aodn-cloud-optimised/argo.parquet',
 'aodn-cloud-optimised/autonomous_underwater_vehicle.parquet',
 'aodn-cloud-optimised/diver_benthic_cover_in_situ_qc.parquet',
 'aodn-cloud-optimised/diver_cryptobenthic_fish_abundance_qc.parquet',
 'aodn-cloud-optimised/diver_mobile_macroinvertebrate_abundance_qc.parquet',
 'aodn-cloud-optimised/diver_off_transect_species_observations_qc.parquet',
 'aodn-cloud-optimised/diver_reef_fish_abundance_biomass_qc.parquet',
 'aodn-cloud-optimised/diver_site_information_qc.parquet',
 'aodn-cloud-optimised/diver_survey_metadata_qc.parquet',
 'aodn-cloud-optimised/mooring_acidification_delayed_qc.parquet',
 'aodn-cloud-optimised/mooring_acidification_realtime_qc.parquet',
 'aodn-cloud-optimised/mooring_ctd_delayed_qc.parquet',
 'aodn-cloud-optimised/mooring_hourly_timeseries_delayed_qc.parquet',
 'aodn-cloud-opti

### Get a summary of the data set using dask


In [38]:
# Define the dataset name and S3 URI

dataset_name = 'argo'
s3_uri = f's3://aodn-cloud-optimised/{dataset_name}.parquet/'

In [39]:
# get s3 size of dataset and number of files
def get_s3_size_and_file_count(s3_uri):
    fs = s3fs.S3FileSystem(anon=True)
    total_size = 0
    file_count = 0
    
    files = fs.glob(s3_uri + '**/*.parquet')

    for file in files:
        total_size += fs.info(file)['Size']
        file_count += 1
        
    print(f"Total size of dataset: {total_size / (1024**3):.2f} GB")
    print(f"Total number of files: {file_count}")

    return total_size, file_count, files

_,_,files = get_s3_size_and_file_count(s3_uri)

Total size of dataset: 3.12 GB
Total number of files: 25312


In [40]:
# extract NetCDF files appended to the parquet dataset

nc_files = [
    os.path.basename(p).split(".nc")[0] + ".nc"
    for p in files
]
nc_files 

['53546_prof.nc',
 '53546_prof.nc',
 '53547_prof.nc',
 '53548_prof.nc',
 '53548_prof.nc',
 '53553_prof.nc',
 '53553_prof.nc',
 '53555_prof.nc',
 '56501_prof.nc',
 '56501_prof.nc',
 '56501_prof.nc',
 '56508_prof.nc',
 '56509_prof.nc',
 '56510_prof.nc',
 '56510_prof.nc',
 '56510_prof.nc',
 '5900026_prof.nc',
 '5900026_prof.nc',
 '5900027_prof.nc',
 '5900027_prof.nc',
 '5900028_prof.nc',
 '5900029_prof.nc',
 '5900030_prof.nc',
 '5900031_prof.nc',
 '5900032_prof.nc',
 '5900033_prof.nc',
 '5900034_prof.nc',
 '5900035_prof.nc',
 '5900036_prof.nc',
 '5900037_prof.nc',
 '53547_prof.nc',
 '53548_prof.nc',
 '53548_prof.nc',
 '56501_prof.nc',
 '56501_prof.nc',
 '56508_prof.nc',
 '56509_prof.nc',
 '56509_prof.nc',
 '56510_prof.nc',
 '56510_prof.nc',
 '56510_prof.nc',
 '56510_prof.nc',
 '5900026_prof.nc',
 '5900026_prof.nc',
 '5900027_prof.nc',
 '5900027_prof.nc',
 '5900027_prof.nc',
 '5900027_prof.nc',
 '5900027_prof.nc',
 '5900027_prof.nc',
 '5900028_prof.nc',
 '5900029_prof.nc',
 '5900029_prof.n

In [41]:
# get the date it was created and last updated

fs = s3fs.S3FileSystem()
    
latest = None
first = None

for path in fs.find(s3_uri):
    info = fs.info(path)
    modified = info["LastModified"]
    
    if latest is None or modified > latest:
        latest = modified
    if first is None or modified < first:
        first = modified
        

In [42]:
def summarise_ds(s3_uri):
    # use dask to read the parquet file from S3
    ddf = dd.read_parquet(s3_uri, engine='pyarrow',
                        storage_options={"profile": "edge-admin"})
    for col, dtype in ddf.dtypes.items():
        print(f"{col:<35} {dtype}")
    
    # summarise partitions
    print(' ' * 150)
    print(f"This data set has {ddf.npartitions} partitions.")
    print(' ' * 150)
    fs = s3fs.S3FileSystem()
    partitions = fs.ls(s3_uri)
    partition_names = [p.split("=")[-1] for p in partitions]
    # print("\n".join(p.split("=")[-1] for p in partitions))count
    
    partition_column_names = list(
        ddf.select_dtypes(include="category").columns
    )
    column_names = list(
        ddf.select_dtypes(exclude="category").columns
    )
    print(f"Partition column names: {', '.join(partition_column_names)}")

    return partition_names, partition_column_names, column_names  

# Or just run it directly to see output:
partition_names, partition_column_names, column_names = summarise_ds(s3_uri)
    

PROJECT_NAME                        string
PI_NAME                             string
CYCLE_NUMBER                        float64
DIRECTION                           string
DATA_CENTRE                         string
DC_REFERENCE                        string
DATA_STATE_INDICATOR                string
DATA_MODE                           string
PLATFORM_TYPE                       string
FLOAT_SERIAL_NO                     string
FIRMWARE_VERSION                    string
WMO_INST_TYPE                       string
JULD                                datetime64[ns]
JULD_QC                             string
JULD_LOCATION                       datetime64[ns]
LATITUDE                            float64
LONGITUDE                           float64
POSITION_QC                         string
POSITIONING_SYSTEM                  string
PROFILE_PRES_QC                     string
PROFILE_TEMP_QC                     string
PROFILE_PSAL_QC                     string
VERTICAL_SAMPLING_SCHEME           

Use DuckDB to get information

In [43]:
# Initialize DuckDB connection and setup

con = duckdb.connect()

con.execute("INSTALL httpfs; LOAD httpfs;")
con.execute("""
            SET s3_region='ap-southeast-2';
            """) 
con.execute("PRAGMA enable_progress_bar") # show progress bar for long-running queries
con.execute("PRAGMA threads=14") # use all available threads for query execution

<_duckdb.DuckDBPyConnection at 0x78777df7ff70>

In [44]:
# SQL query

partition_names_sql = ", ".join(f"'{d}'" for d in partition_names[1::]) # skip the first partition which is metadata

partition_column_name = partition_column_names[0]

if 'TIME' in column_names and 'LONGITUDE' in column_names and 'LATITUDE' in column_names:

    # This is an IMOS dataset
    # Variable names are standardised

    sql = f"""
    SELECT
    {partition_column_name},
    MIN(TIME) AS start_time,
    MAX(TIME) AS end_time,
    MIN(LONGITUDE) AS min_longitude,
    MAX(LONGITUDE) AS max_longitude,
    MIN(LATITUDE) AS min_latitude,
    MAX(LATITUDE) AS max_latitude,
    COUNT(*)  AS n_rows
    FROM read_parquet(
    '{s3_uri}**/*.parquet',
    hive_partitioning=1
    )
    WHERE {partition_column_name} IN ({partition_names_sql})
    GROUP BY {partition_column_name}
    ORDER BY {partition_column_name}
    """
    
else:
    
    # This is not an IMOS dataset, so just get row counts by partition
    # Variable names will likely vary by dataset
    
    sql = f"""
    SELECT
    {partition_column_name},
    COUNT(*)  AS n_rows
    FROM read_parquet(
    '{s3_uri}**/*.parquet',
    hive_partitioning=1
    )
    WHERE {partition_column_name} IN ({partition_names_sql})
    GROUP BY {partition_column_name}
    ORDER BY {partition_column_name}
    """
    

In [None]:
stats = con.execute(sql).fetchdf()

# Testing 
# hive partitioning, LIMIT 1, group/order by deployment code, count rows, min/max time = 6m45s
# Specifying a list of partition names decreased time to ~5m
# it would estimate a longer period of time to execute, and then suddenly finish (e.g. 35 min at first, took 18 mins)



  2% ▕▊                                     ▏ (~14.9 minutes remaining) 

In [None]:
# Format start_time and end_time as strings for better display in HTML
stats["start_time"] = stats["start_time"].dt.strftime("%Y-%m-%d %H:%M")
stats["end_time"]   = stats["end_time"].dt.strftime("%Y-%m-%d %H:%M")

stats

In [None]:
total_rows = stats["n_rows"].sum()
print(total_rows)

In [None]:
# Create plot
fig = plt.figure(figsize=(8, 8))
ax = plt.axes(projection=ccrs.PlateCarree())

# Set Australia extent
ax.set_extent([110, 155, -45, -10], crs=ccrs.PlateCarree())

# Add coastline
ax.coastlines(resolution='10m')
ax.add_feature(cfeature.BORDERS, linewidth=0.5)

# Scatter
ax.scatter(
    stats['min_longitude'],
    stats['min_latitude'],
    s=stats['n_rows'] / 300,
    edgecolors='k',
    transform=ccrs.PlateCarree()
)

ax.set_title("Partition approx locations (Australia)")



# Save to memory buffer
buf = io.BytesIO()
plt.savefig(buf, format="png", bbox_inches="tight")
plt.close(fig)

buf.seek(0)

# Convert to base64
img_base64 = base64.b64encode(buf.read()).decode("utf-8")

Export information to a html 

In [None]:
def capture_prints_to_html(func, dataset_name, output_file='output.html', *args, **kwargs):
    """
    Execute a function and capture all print statements to an HTML file.
    
    Parameters:
    -----------
    func : callable
        The function to execute
    dataset_name : str
        Name of the dataset
    output_file : str
        Path to the output HTML file
    *args, **kwargs
        Arguments to pass to the function
    
    Returns:
    --------
    result : any
    func : callable
        The function to execute
    output_file : str
        Path to the output HTML file
    *args, **kwargs
        Arguments to pass to the function
        }}
        .container {{
            background-color: white;
            padding: 30px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }}
        h1 {{
            color: #333;
    
    Returns:
    --------
    result : any
        The return value of the function
    """
    # Create a StringIO object to capture stdout
    captured_output = StringIO()
    old_stdout = sys.stdout
    
    try:
        # Redirect stdout to our StringIO object
        sys.stdout = captured_output
        
        # Execute the function
        result = func(*args, **kwargs)
        
    finally:
        # Restore stdout
        sys.stdout = old_stdout
    
    # Get the captured output
    output_text = captured_output.getvalue()
    
    # Create HTML content
    html_content = f"""<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>{dataset_name}</title>
    <style>
        body {{
            font-family: 'Courier New', monospace;
            font-size: 16px;
            background-color: #f5f5f5;
            padding: 20px;
            max-width: 95vw;
            margin: 0 auto;
        }}
        .container {{
            background-color: white;
            padding: 30px;
            border-radius: 8px;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
        }}
        h1 {{
            color: #333;
            font-size: 28px;
            border-bottom: 2px solid #4CAF50;
            padding-bottom: 10px;
        }}
        .timestamp {{
            color: #666;
            font-size: 14px;
            margin-bottom: 20px;
        }}
        pre {{
            background-color: #f8f8f8;
            border: 1px solid #ddd;
            border-radius: 4px;
            padding: 15px;
            overflow-x: auto;
            line-height: 1.6;
            font-size: 15px;
        }}
    </style>
</head>
<body>
    <div class="container">
        <h1>{dataset_name}</h1>
        <div class="timestamp">Generated: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}</div>
        <pre>{output_text}</pre>
        <img src="data:image/png;base64,{img_base64}" style="max-width:100%; height:auto;" />
    </div>
</body>
</html>"""
    
    # Write to file
    with open(output_file, 'w', encoding='utf-8') as f:
        f.write(html_content)
    
    print(f"Output saved to: {output_file}")
    
    return result


In [None]:
def html_summary_content():
    
    # S3 summaryTotal notebook runtimep is much faste
    print("=" * 150)
    print("PARQUET FILE SUMMARY")
    print("=" * 150)
    print(' ' * 150)
    _,_,files = get_s3_size_and_file_count(s3_uri)
    print(' ' * 150)
    print(f"Parquet file created: {first}")
    print(f"Parquet file last updated: {latest}")
    print(' ' * 150)

    # List the files in the dataset
    print("=" * 150)
    print("DATASET VARIABLES AND PARTITIONS")
    print("=" * 150)
    print(' ' * 150)
    summarise_ds(s3_uri)
    print(' ' * 150)
    
    # Deployment stats
    print("=" * 150)
    print("DATASET STATISTICS")
    print("=" * 150)
    print(f"\nTotal rows: {total_rows}")
    print(' ' * 150)
    print(stats.to_string())
    print(' ' * 150)
    
    # List the files in the dataset
    print("=" * 150)
    print("PARQUET FILES IN DATASET")
    print("=" * 150)
    print(' ' * 150)
    for file in files:
        print(file)
    print("=" * 150)
    print(' ' * 150)
    print("=" * 150)
    print("NETCDF FILES APPENDED")
    print("=" * 150)
    print(' ' * 150)
    for file in nc_files:
        print(file)
    print("=" * 150)
    print(' ' * 150)

    # work out total notebook runtime
    notebook_end = time.perf_counter()
    total_elapsed = notebook_end - notebook_start
    minutes = int(total_elapsed // 60)
    seconds = total_elapsed % 60
    print(' ' * 150)
    print(f"Total notebook runtime: {minutes}m {seconds:.1f}s")
    print(' ' * 150)
    


# Capture all output to a single HTML file
capture_prints_to_html(html_summary_content, dataset_name, f'{dataset_name}_summary.html')


### Notes

- The use of the DuckDB progress bar was a quick way to see if my query was reasonable (if it would take too long)
- There are two progress bars when loading into memory (first one for the DuckDB query, and second one for memory (much longer))
- it would estimate a longer period of time to execute, and then suddenly finish (e.g. 35 min at first, took 18 mins)
- Earlier testing suggests using duckdb with 14 cores makes querying twice as fast
- Setting hive partitioning to True and enabling partition pruning is much faster for getting statistics - perhaps because DuckDB is told where to look rather than discovering partitions/loading metadata?
- Multiple ways to get partition information
- Even though `slocum_glider_delayed_qc.parquet` is much larger (~34.2GB) compared to `argo.parquet` (~3GB), the notebook took a similar amount of time, suggesting that we need to optimise some files..