# Automated Ingestion and Processing of NetCDF Files using PySpark Autoloader

This notebook outlines a comprehensive method for automatically loading, parsing, and processing NetCDF files into structured Spark tables. It employs PySpark's Autoloader feature for efficient data handling, supplemented by custom parsing functions to convert climate data stored in NetCDF format into analyzable Spark DataFrames. Each section is meticulously documented to provide clear guidance on setting up the environment, defining data schemas, and implementing data ingestion and enrichment workflows. This approach not only facilitates robust data management but also enhances the scalability and reliability of climate data analysis pipelines.


### Installation of Essential Libraries for Climate Data Handling

This section of the notebook includes commands to install key Python libraries that are pivotal for handling and processing climate and scientific data:

- `xarray`: This library is fundamental for working with and analyzing large multi-dimensional arrays of scientific data. It is particularly adept at handling netCDF files, which are commonly used for climate data.
- `netCDF4` and `h5netcdf`: These libraries are crucial for interacting with netCDF and HDF5 data formats respectively. They provide the necessary tools to read, write, and manipulate datasets in these complex file formats.

Executing these installation commands ensures that the Python environment is equipped with the necessary tools to handle the specific data types and operations discussed in this notebook.

In [0]:
%pip install xarray
%pip install netCDF4 h5netcdf

### Restarting Python Environment

This command, `dbutils.library.restartPython()`, is used to restart the Python environment within Databricks notebooks. Restarting the Python environment is a critical step after installing new libraries or making significant changes to the environment. It ensures that all installed libraries are loaded correctly and that the environment is reset, clearing any residual state from previous computations. This operation is particularly useful when libraries that affect the entire runtime environment are added or updated.

In [0]:
dbutils.library.restartPython()

### Importing Libraries and Modules for Data Processing

This section includes the import statements necessary for data manipulation, file management, and structured data handling within the notebook. Each library or module plays a critical role in the data processing workflow:

- `pandas` (imported as `pd`): Essential for data manipulation and analysis, particularly useful for handling tabular data with heterogeneously-typed columns.
- `xarray` (imported as `xr`): Facilitates working with labeled multi-dimensional arrays and datasets, which is especially useful for manipulating large climate data files like netCDF.
- `os`: Provides a way of using operating system dependent functionality like reading or writing to the filesystem, crucial for managing data files and directories.
- `pyspark.sql.types`: This module includes classes that define the structure of DataFrames in PySpark, such as `StructType` and `StructField`, along with specific data types (`FloatType`, `StringType`, `TimestampType`, `LongType`, `BinaryType`). These are used to specify schema definitions for Spark DataFrames, ensuring data consistency and structure.
- `datetime`: Used to handle and manipulate date and time data, which is crucial for time-series analysis and operations based on time conditions.

By importing these modules and libraries, the notebook is equipped to handle a variety of data operations, from basic file interactions to complex data transformations within distributed environments.

In [0]:
import pandas as pd
import xarray as xr
import os 
from pyspark.sql.types import StructType, StructField, FloatType, StringType, TimestampType, LongType, BinaryType
from datetime import datetime

### Function: `netcdf_to_bronze_autoloader`

This function facilitates the automated loading, parsing, and processing of NetCDF files into a structured Spark table using the PySpark Autoloader. It's designed to handle large datasets efficiently by leveraging Spark's distributed computing capabilities.

#### Parameters:
- **source_file_location (str):** The directory containing the source NetCDF files.
- **output_schema (`StructType`):** The schema definition for the DataFrame to ensure consistency in data structure.
- **checkpoint_location (str):** Path to store checkpoint information for fault tolerance.
- **streaming_query_name (str):** Name for the streaming query to uniquely identify it.
- **data_format (str):** The data format for reading files, typically set to "BINARYFILE" to delay parsing.
- **table_name (str):** The name of the destination table where data will be written.
- **schema_name (str):** The database schema name under which the table resides.
- **catalog_name (str):** The name of the catalog in which the database schema is organized.
- **write_mode (str):** Specifies the output mode of the stream (e.g., append, complete).
- **data_provider (str):** Identifier for the source of the data, included as metadata.

#### Workflow:
1. **Loading the Stream:** Files are loaded as a binary stream using Spark's Autoloader, which ensures efficient handling of new or modified files.
2. **Parsing NetCDF Files:** A custom function `parse_netcdf` is applied to each file to convert the data from NetCDF format to Pandas DataFrames, then to Spark DataFrames. This includes handling of specific data characteristics like experiment versions, and adding essential metadata such as file paths, timestamps, and source information.
3. **Writing to Bronze Table:** The processed data is written to a Spark table defined by `bronze_table`, which is constructed dynamically from the catalog, schema, and table names provided. This step includes configurations for checkpointing and schema merging to ensure data integrity and flexibility in data management.

#### Features:
- **Efficient Data Ingestion:** Utilizes Spark's capability to handle large datasets and complex file operations.
- **Metadata Management:** Enriches the dataset with metadata such as file creation dates and source information, enhancing data traceability and usability.
- **Dynamic Schema Support:** Supports dynamic schema evolution, which allows for seamless integration of new data fields without disrupting existing processes.

This function is integral for streamlining the ingestion and initial processing of climate data, setting a robust foundation for further analysis and insights.


In [0]:
def netcdf_to_bronze_autoloader(source_file_location,
                     output_schema, checkpoint_location, 
                     streaming_query_name,data_format,table_name, 
                     schema_name, catalog_name,write_mode,
                     data_provider,date_created_attr='date_created'): 
    
   ## loading the stream 
    read_stream = (spark.readStream
	.format("cloudFiles") # "cloudFiles" = use Autoloader
	.option("cloudFiles.format", "BINARYFILE") # So we don't parse it until inside the foreachBatch
 	.load(source_file_location) # Volume containing files. Can use *-wildcards
) 
    

    
    ## a function to parse netcdf files and convert them into pandas and later spark dataframes
    def parse_netcdf(iterator, source_file_path='Source_File_Path',
                     ingest_timestamp_column='Ingest_Timestamp',
                     data_provider= data_provider):  # Add default arguments for metadata
        for pd_df in iterator:
            for _, row in pd_df.iterrows():
                xds = xr.open_dataset(row['path'])  # Assume row has a 'path' column

                # Check if file contains ERA5T data
                if 'expver' in xds.variables:
                    # "Experiment version": if data older than 60 days, straightforward.
                    # If newer, this flag distinguishes old and newer data.
                    pdf = xds.sel(expver=1).drop_vars(['expver']).to_dataframe()
                else:
                    pdf = xds.to_dataframe() 

                pdf.dropna(inplace=True)  # Drop rows with null values

                # Add metadata columns
                pdf[source_file_path] = row['path']
                pdf[ingest_timestamp_column] = pd.Timestamp.now()
                pdf['data_provider'] = data_provider
                pdf['source_file'] = xds.attrs.get('source_file', None)

                # Convert file_modified_in_s3 to datetime
                file_modified_in_s3 = xds.attrs.get('date_modified_in_s3', None)
                if file_modified_in_s3:
                    pdf['file_modified_in_s3'] = pd.to_datetime(file_modified_in_s3)
                else:
                    pdf['file_modified_in_s3'] = None

                # Use the specified attribute to populate date_created, set to null if not present
                file_creation_date = xds.attrs.get(date_created_attr, None)
                if file_creation_date:
                    pdf['date_created'] = pd.to_datetime(file_creation_date)
                else:
                    pdf['date_created'] = None



                yield pdf.reset_index()  # Reset index to align time with variables  


    

    bronze_table = f"{catalog_name}.{schema_name}.{table_name}"

    autoload = (read_stream
	.selectExpr("_metadata.file_path as path") # Keep only the file paths, as /Volumes/...
  	.repartition(64)
	.mapInPandas(parse_netcdf, output_schema) # Read each file and convert to Dataframe rows
	.writeStream # Writestream config must go below all the transformations
	.option("checkpointLocation", checkpoint_location)
    .option("mergeSchema", "true")
	.queryName(streaming_query_name)
	.outputMode(write_mode)
 	.trigger(availableNow=True) # Runs on new files, then shuts down
    .toTable(bronze_table))
                

  

            

                            

           





**Define the expected netCDF data schema, so we can construct the Spark dataframe**

In [0]:
# Define the expected netCDF data schema, so we can construct the Spark dataframe

## decide what your output table might look like

output_schema = StructType([
    StructField("time", TimestampType(), True),
    StructField("latitude", FloatType(), True),
    StructField("longitude", FloatType(), True),
    StructField("mean_t2m_c", FloatType(), True),
    StructField("max_t2m_c", FloatType(), True),
    StructField("min_t2m_c", FloatType(), True),
    StructField("sum_tp_mm", FloatType(), True),
    StructField("file_modified_in_s3", TimestampType(), True),
    StructField("source_file", StringType(), True),
    StructField("Source_File_Path", StringType(), True),
    StructField("Ingest_Timestamp", TimestampType(), True),
    StructField("data_provider", StringType(), True),
    StructField("date_created", TimestampType(), True)
])

**IMPLEMENTATION**

In [0]:
netcdf_to_bronze_autoloader(source_file_location = '/Volumes/pilot/bronze_test/era5_daily_summary/',
                                output_schema = output_schema, checkpoint_location = '/Volumes/pilot/aer_era5_global_bronze/checkpoints/source_to_bronze_era5', 
                                streaming_query_name = 'Load ERA5 Files',
                                data_format = 'delta',
                                table_name = 'aer_era5_bronze_1940_to_present', 
                                schema_name = 'aer_era5_global_bronze', 
                                catalog_name = 'pilot',
                                write_mode = 'append',
                                data_provider='Atmospheric and Environmental Research (AER)',
                                date_created_attr='date_updated')