# CMC ADE -- ingest from tar files
`FR`
Le présent notebook permet d'extraire les données d'observations en format [swob-ml](https://dd.weather.gc.ca/observations/doc/Met-ML-SchemaDescriptionV2_f.pdf) qui sont concaténées dans un fichier [tar](https://fr.wikipedia.org/wiki/Tar_(informatique)) [^1]. Les observations sont ajoutées dans un dataframe et sont réécrites sur disque en format [Delta Lake](https://delta-io.github.io/delta-rs/).

&#9658; Les fichiers (ou "tables") Delta Lake que vous trouverez plus bas contiennent des données sur plusieurs jours en mars 2024.

`EN`
This notebook allows extacting observations encoded in [swob-ml](https://dd.weather.gc.ca/observations/doc/Met-ML-SchemaDescriptionV2_e.pdf)  files concatenated in [tar files](https://en.wikipedia.org/wiki/Tar_(computing))[^2].  As they are extracted from the swob-ml, observations are stored in a dataframe and later written to disk as [Delta Lake](https://delta-io.github.io/delta-rs/) tables.

&#9658; Delta Lake tables found below contain obserrvation data from several days in March 2024.

[^1]: Voir le répertoire `/space/hall6/sitestore/eccc/prod/hubs/ade/rawdata/swob/ca/` pour des exemples.
[^2]: See the `/space/hall6/sitestore/eccc/prod/hubs/ade/rawdata/swob/ca/` directory for examples.

In [None]:
import datetime as dt
import os, getpass
import pathlib
import tarfile
import time

import daft
import deltalake
import numpy as np
import pandas as pd
import polars as pl
import pyarrow as pa
from deltalake import DeltaTable  # S3FileSystem ??
from deltalake.writer import write_deltalake
from great_tables import GT, html, md
from great_tables.data import islands
from tabulate import tabulate
from tqdm.notebook import tqdm

In [None]:
from my_nb_utils import (
    extract_xml_data_to_pd,
    get_latest_tag_and_date,
    memused,
    print_system_usage,
)

In [None]:
modules = {
    "pandas": {
        "name": pd.__name__,
        "version": pd.__version__,
        "url": "https://github.com/pandas-dev/pandas",
    },
    "polars": {
        "name": pl.__name__,
        "version": pl.__version__,
        "url": "https://github.com/pola-rs/polars",
    },
    "pyarrow": {
        "name": pa.__name__,
        "version": pa.__version__,
        "url": "https://github.com/apache/arrow",
    },
    "daft": {
        "name": daft.__name__,
        "version": daft.__version__,
        "url": "https://github.com/Eventual-Inc/Daft",
    },
    "delta-io Rust": {
        "name": deltalake.__name__,
        "version": deltalake.__version__,
        "url": "https://github.com/delta-io/delta-rs",
    },
}

# for module_info in modules.values():
#     print(f"Current version of {module_info['name']} is {module_info['version']}")
#     print(f"GitHub repository URL: {module_info['url']}")
#     print()

for module_info in modules.values():
    print(f"Module: {module_info['name']}")
    print(f"Current version: {module_info['version']}")
    repo_url = module_info["url"]
    name, latest_version, dt = get_latest_tag_and_date(repo_url)
    print(f"Latest version: {latest_version}")

In [None]:
memused()

In [None]:
print_system_usage()

# Append data in a Delta Lake table

In [None]:
base_dir = getpass.getpass(prompt='Base directory for TAR files : ')

In [None]:
filenames = ["2024032718_tar", "2024032712_tar", "2024032706_tar", "2024032700_tar"]
filenames_18 = ["2024032718_tar"]
filenames_12 = ["2024032712_tar"]
filenames_06 = ["2024032706_tar"]
filenames_00 = ["2024032700_tar"]
tar_files = [base_dir + filename for filename in filenames]
tar_files_18 = [base_dir + filename for filename in filenames_18]
tar_files_12 = [base_dir + filename for filename in filenames_12]
tar_files_06 = [base_dir + filename for filename in filenames_06]
tar_files_00 = [base_dir + filename for filename in filenames_00]

In [None]:
tar_files

In [None]:
# Define an empty list to store DataFrames for each XML file
dfs = []
total_bad_data_records = 0

for file in tqdm(tar_files):
    print(f"Processing {file}")
    with tarfile.open(file, "r") as tar:
        for member in tqdm(tar.getmembers()):
            # Check if the member is a file
            if member.isfile():
                # Extract the file content
                xml_data = tar.extractfile(member)
                # Extract data from XML and append to list of DataFrames
                df, bad_data_records = extract_xml_data_to_pd(xml_data)
                total_bad_data_records += bad_data_records
                dfs.append(df)

# Concatenate all DataFrames into a single DataFrame
final_df = pd.concat(dfs, ignore_index=True)
print(
    f"Total number of skipped records due to missing or invalid MEASUREMENT data = {total_bad_data_records}"
)

In [None]:
#                   18         12         06         00
# name            167092     196563     228209     196342
# value           167092     196563     228209     196342
final_df  # Pandas df; does not show column type; see Polars df below
# final_df.count()

In [None]:
# gives
# Data columns (total 13 columns):
# #   Column        Non-Null Count   Dtype
# ---  ------        --------------   -----
# 0   name          792644 non-null  object
# 1   value         753137 non-null  object
#
# Where `value` is not a float !!
final_df.info()

In [None]:
# print(f'Number of unique "station name" values = {len(list(final_df['stn_nam'].unique()))}')
# print(f'Number of unique  "name" values = {len(list(final_df['name'].unique()))}')
# print(f'Number of unique "wmo_synop_id" values = {len(list(final_df['wmo_synop_id'].unique()))}')
# print(f'Number of unique "clim_id" values = {len(list(final_df['clim_id'].unique()))}')
# print(f'Number of unique "msc_id" values = {len(list(final_df['msc_id'].unique()))}')
# print(f'Number of unique "latitude" values = {len(list(final_df['lat'].unique()))}')
# print(f'Number of unique "longitude" values = {len(list(final_df['long'].unique()))}')
print(list(final_df['name'].unique()))
# print(list(final_df['date_tm'].unique()))
# type(list(final_df['date_tm'].unique())[0])

In [None]:
# Write parquet files
# final_df.to_parquet('20240327.parquet', engine='pyarrow', compression='snappy')
# final_df.to_parquet('pyarrow_gzip.parquet', engine='pyarrow', compression='gzip')
# final_df.to_parquet('pyarrow_brotli.parquet', engine='pyarrow', compression='brotli')
# final_df.to_parquet('pyarrow_lz4.parquet', engine='pyarrow', compression='lz4')
# final_df.to_parquet('pyarrow_zstd.parquet', engine='pyarrow', compression='zstd')

## Convert the Pandas DataFrame to a Polars DataFrame

In [None]:
# This is temporary.  tar files should be writable directly in a Polars df
# We are then casting Polars data types to each column
# BECAUSE THIS DOES NOT WORK PROPERLY IN PANDAS : values are not floats
# That  defeats the purpose of data analysis !

df = pl.from_pandas(final_df)

# Define the column names and their corresponding types; aka data schema
column_types = {
    "name": pl.String,
    "value": pl.Float64,  # **Force casting to float did not work in Pandas !**
    "uom": pl.String,
    "date_tm": pl.Datetime,
    "stn_nam": pl.String,
    "tc_id": pl.String,
    "wmo_synop_id": pl.String,
    "stn_elev": pl.Float64,
    "data_pvdr": pl.String,
    "msc_id": pl.String,
    "clim_id": pl.String,
    "lat": pl.Float64,
    "long": pl.Float64,
}

for col, dtype in column_types.items():
    # Check if the column exists in the DataFrame and if it's not already of the specified type
    if col in df.columns and df[col].dtype != dtype:
        df = df.select(
            [
                pl.when(pl.col(col).is_not_null(), pl.col(col))
                .otherwise(pl.lit(None))
                .alias(col)
                if dtype == pl.String
                else pl.col(col).cast(dtype).alias(col)
            ]
        )

In [None]:
df

# Delta Lake

## What are Delta Lake tables and why use them over individual Parquet files

Delta tables consist of metadata in a transaction log and data stored in Parquet files.  Polars [or any dataframe library] can skip Parquet files based on metadata, but it needs to open up each file and read the metadata, which is slower that grabbing the file-level metadata directly from the transaction log.

## Java vs Rust/Python

"Rust deltalake" refers to the Rust API of delta-rs (no Spark dependency); this is what this notebook uses <br>
"Python deltalake" refers to the Python API of delta-rs (no Spark dependency)

## Ordering and partitioning

From the [Delta Lake best practices page](https://delta-io.github.io/delta-rs/delta-lake-best-practices/)

... optimizing the performance of your Delta tables ... depends on your data ingestion into the Delta table and query patterns. You must understand your data and how users run queries to best leverage Delta Lake.

The idea is to colocate similar data in the same files to make file skipping more effective

Two approaches :
Z ordering
Hive-style partitioning

You can use Hive-style partitioning in conjunction with Z Ordering. You can partition a table by one column and Z Order by another. They’re different tactics that aim to help you skip more files and run queries faster.

See also tips on append-only tables, like our observations or any sensor measurement. 

# Delta Lake tests
Writing with and without partitioning

In [None]:
# No optimization i.e. neither partitioning nor Z ordering
# df.write_delta("tar_swob_no_optimization") First write
df.write_delta("tar_swob_no_optimization", mode="append")  # append

In [None]:
!ls -Rlht "tar_swob_no_optimization" | wc

In [None]:
tar_swob_no_optimization_dt = DeltaTable("tar_swob_no_optimization")

In [None]:
tar_swob_no_optimization_dt.history()
tar_swob_no_optimization_dt.version()
tar_swob_no_optimization_dt.

# todo : Optimization tests

In [None]:
# Open "tar_swob_no_optimization" as a Polars df
# Save it back as a Delta table partitioned by station

In [None]:
df = pl.scan_delta("tar_swob_no_optimization").collect()

In [None]:
# Define partition columns
partition_cols = ["stn_nam"]

df.write_delta(
    "tar_swob_P_by_stn_name",
    mode="append",
    delta_write_options={"partition_by": partition_cols},
)

In [None]:
# # Idéal à long terme puisque les noms de stations vont peu varier, mais les dates oui
# !ls -Rlht "tar_swob_P_by_stn_name" | wc
# # !ls -Rlht "tar_swob_P_by_stn_name"

In [None]:
# # Define partition columns
# partition_cols = ["date_tm"]
# 
# # Define Z-order columns
# # zorder_cols = ["stn_nam"]
# df.write_delta(
#     "tar_swob_P_by_date",
#     mode="append",
#     delta_write_options={"partition_by": partition_cols},
# )

In [None]:
# !ls -Rlht "tar_swob_P_by_date" | wc