# Block Model API + Parquet Files



### Prerequisites

To get started with this notebook it's recommended to create a new Python virtual environment and to install `requirements.txt` in the `blockmodels/python` folder, eg.

`> pip install -r requirements.txt`

Apache Arrow is a cross-language development platform for in-memory analytics, and PyArrow is the Python API for Apache Arrow.
PyArrow is used extensively in this notebook and you should become familiar with its capabilities.

### Data types and naming restrictions

Evo supports block models with the following Parquet data types:

* boolean
* int8/16/32/64
* float16/32/64
* utf8
* date32
* timestamp
    * unit = microseconds (us)
    * tz = UTC

**Note:** Apache Arrow natively uses lower case data types, e.g. float64, while the Block Model API uses enumerated versions of these data types with capitalisation. e.g. Float64.

Below are the reserved system attributes (column names):

* `i`, `j`, `k` - parent block indexes (uint32)
* `sidx` - sub-block indexes (uint32)
* `dx`, `dy`, `dz` - block sizes (float64 - is not stored but is calculated on the fly)
* `x`, `y`, `z` - block centroids (float64 - is not stored but is calculated on the fly)

### Convert a block model in CSV format to Parquet

In [None]:
# 1. Convert a regular block model CSV file to an Evo-compatible Parquet file.
# The input CSV contains block model centroids and several data columns with non-UUID column names.

import pprint
import uuid
from pathlib import Path

import pyarrow as pa
import pyarrow.csv as pv
import pyarrow.parquet as pq

# Local copy of the Parquet file being uploaded can have any name since it will be renamed by Evo when saved in the Azure blob store.
# Reserved system column names are: x, y, z, dx, dy, dz, i, j, k, sidx.
# Non-system columns must be given valid UUIDs as their columns names and any API requests made must match the UUIDs generated in the CSV file.

# Part 1 - Convert CSV to Parquet

input_csv = "data/example1/regular-bm-named-columns.csv"
input_csv_path = Path(input_csv)
output_parquet = input_csv_path.parent / f"{input_csv_path.stem}.parquet"

csv_columns = {
    "x": pa.float64(),
    "y": pa.float64(),
    "z": pa.float64(),  # The service expects centroid columns in a Parquet file to be lowercase 'x', 'y', 'z' and must all be float64 Parquet data type
    "Lith": pa.utf8(),  # List the column names and required data types
    "LMS1_Pct": pa.float32(),  # Parquet files do not support white space in column names, be sure to remove any spaces in CSV column names!
    "MV_Pct": pa.float32(),
    "Blocks": pa.utf8(),
}

options = pv.ConvertOptions(column_types=csv_columns)
table = pv.read_csv(str(input_csv_path), convert_options=options)

# Part 2 - Rename original CSV columns names to compatible UUIDs

[csv_columns.pop(name, None) for name in ["x", "y", "z"]]  # Remove system columns names from CSV columns

# Get list of CSV names
csv_names = list(csv_columns.keys())

# Generate list of UUIDs
uuid_names = [str(uuid.uuid4()) for _ in csv_names]

names_new = dict(zip(uuid_names, csv_names))  # Create CSV name : uuid dictionary
merged_new_names = {
    "x": "x",
    "y": "y",
    "z": "z",
}  # Add x, y, z default values back to dictionary
merged_new_names.update(names_new)

# The table.rename_columns function might not be appropriate for large files.
# If this is the case for you, see code cell #5 for an example of how to edit columns names after Parquet creation.
table_rename = table.rename_columns(merged_new_names)  # Replace CSV column names with generated uuid column names

# Evo expects Parquet files to be created with the following parameters:
# - compression='zstd'
# - write_statistics=True
# - version='2.6' (it's recommended to use the latest Parquet version currently available)
pq.write_table(
    table_rename,
    output_parquet,
    compression="zstd",
    row_group_size=100_000,
    version="2.6",
    data_page_version="1.0",
    write_statistics=True,
)

print("Column mapping to use with the Block Model API:")
pprint.pprint(merged_new_names)

In [None]:
# 2. Convert a regular block model CSV file to an Evo-compatible Parquet file.
# The input CSV contains block model centroids and several data columns that already have UUIDs as the column headers.

from pathlib import Path

import pyarrow as pa
import pyarrow.csv as pv
import pyarrow.parquet as pq

# Local copy of the Parquet file being uploaded can have any name since it will be renamed by Evo when saved in the Azure blob store.
# Reserved system column names are: x, y, z, dx, dy, dz, i, j, k, sidx.
# Non-system columns must be given valid UUIDs as their columns names and any API requests made to Evo must match the UUIDs generated in the CSV file.

# Tip: Use the following Excel formula to generate random UUIDs:
# =LOWER(CONCATENATE(DEC2HEX(RANDBETWEEN(0,4294967295),8),"-",DEC2HEX(RANDBETWEEN(0,65535),4),"-",DEC2HEX(RANDBETWEEN(0,65535),4),"-",DEC2HEX(RANDBETWEEN(0,65535),4),"-",DEC2HEX(RANDBETWEEN(0,4294967295),8),DEC2HEX(RANDBETWEEN(0,65535),4)))

input_csv = "data/example2/regular-bm-uuid-columns.csv"
input_csv_path = Path(input_csv)  # UUIDs were generated with the Excel code above
output_parquet = input_csv_path.parent / f"{input_csv_path.stem}.parquet"

csv_columns = {
    "x": pa.float64(),
    "y": pa.float64(),
    "z": pa.float64(),  # Evo expects centroid columns in a Parquet file to be lowercase 'x', 'y', 'z' and to all be float64 pyarrow data type
    "68002a18-5f56-57d2-e6c3-5dd87d3b7ad9": pa.utf8(),  # Lith
    "688d284c-d2be-5219-b38a-0712a8fa6058": pa.float32(),  # LMS1_Pct
    "a88f0e41-2dda-6417-3fce-69f19eeabe29": pa.float32(),  # MV_Pct
    "a1757e57-3fd4-0e65-87fd-bfa98abc3caa": pa.utf8(),  # Blocks
}

options = pv.ConvertOptions(column_types=csv_columns)
table = pv.read_csv(str(input_csv_path), convert_options=options)

# Evo expects Parquet files to be created with the following parameters:
# - compression='zstd'
# - write_statistics=True
# - version='2.6' (it's recommended to use the latest Parquet version currently available)
pq.write_table(
    table,
    output_parquet,
    compression="zstd",
    row_group_size=100_000,
    version="2.6",
    data_page_version="1.0",
    write_statistics=True,
)

print("Data types used were:")
print(table.schema)

In [None]:
# 3. Convert a variable-octree block model CSV file to an Evo-compatible Parquet file.
# The input CSV containing block indices and several data columns with non-UUID column names.

import pprint
import uuid
from pathlib import Path

import pyarrow as pa
import pyarrow.csv as pv
import pyarrow.parquet as pq

# Local client copy of the Parquet file being uploaded can have any name since it will be renamed by Evo when saved in the Azure blob store.
# Reserved system column names are: x, y, z, dx, dy, dz, i, j, k, sidx.
# Non-system columns must be given valid UUIDs as their columns names and any API requests made to Evo must match the UUIDs generated in the CSV file.

# Part 1 - Convert CSV to Parquet

input_csv = "data/example3/variable-octree-bm-named-columns.csv"
input_csv_path = Path(input_csv)
output_parquet = input_csv_path.parent / f"{input_csv_path.stem}.parquet"

csv_columns = {
    "i": pa.uint32(),
    "j": pa.uint32(),
    "k": pa.uint32(),
    "sidx": pa.uint32(),  # Evo expects block model indices in a Parquet file to be lowercase 'i', 'j', 'k', 'sidx' and to all be uint32 pyarrow data type
    "a0": pa.float32(),  # List the column names and required data types
    "a1": pa.utf8(),
    "a2": pa.bool_(),
}

options = pv.ConvertOptions(column_types=csv_columns)
table = pv.read_csv(str(input_csv_path), convert_options=options)

# Part 2 - Rename original CSV columns names to compatible UUIDs

[csv_columns.pop(name, None) for name in ["i", "j", "k", "sidx"]]  # Remove system columns names from CSV columns

# Get list of CSV names
csv_names = list(csv_columns.keys())

# Generate list of UUIDs
uuid_names = [str(uuid.uuid4()) for _ in csv_names]

names_new = dict(zip(uuid_names, csv_names))  # Create CSV name : uuid dictionary
merged_new_names = {
    "i": "i",
    "j": "j",
    "k": "k",
    "sidx": "sidx",
}  # Add x, y, z default values back to dictionary
merged_new_names.update(names_new)

# The table.rename_columns function might not be appropriate for large files.
# If this is the case for you, see code cell #5 for an example of how to edit columns names after Parquet creation.
table_rename = table.rename_columns(merged_new_names)  # Replace csv column names with generated uuid column names

# Evo expects Parquet files to be created with the following parameters:
# - compression='zstd'
# - write_statistics=True
# - version='2.6' (it's recommended to use the latest Parquet version currently available)
pq.write_table(
    table_rename,
    output_parquet,
    compression="zstd",
    row_group_size=100_000,
    version="2.6",
    data_page_version="1.0",
    write_statistics=True,
)

print("Column mapping to use with the Block Model API:")
pprint.pprint(merged_new_names)

In [None]:
# 4. Convert a variable-octree block model CSV file to an Evo-compatible Parquet file.
# The input CSV containing block model indices and several data columns that have UUIDs as the column headers.

from pathlib import Path

import pyarrow as pa
import pyarrow.csv as pv
import pyarrow.parquet as pq

# Local client copy of the Parquet file being uploaded can have any name since it will be renamed by Evo when saved in the Azure blob store.
# Reserved system column names are: x, y, z, dx, dy, dz, i, j, k, sidx.
# Non-system columns must be given valid UUIDs as their columns names and any API requests must match the UUIDs generated in the CSV file.

# Tip: Use the following Excel formula to generate random UUIDs:
# =LOWER(CONCATENATE(DEC2HEX(RANDBETWEEN(0,4294967295),8),"-",DEC2HEX(RANDBETWEEN(0,65535),4),"-",DEC2HEX(RANDBETWEEN(0,65535),4),"-",DEC2HEX(RANDBETWEEN(0,65535),4),"-",DEC2HEX(RANDBETWEEN(0,4294967295),8),DEC2HEX(RANDBETWEEN(0,65535),4)))

input_csv = "data/example4/variable-octree-bm-uuid-columns.csv"  # edit to match CSV path
input_csv_path = Path(input_csv)  # UUIDs were generated with the Excel code above
output_parquet = input_csv_path.parent / f"{input_csv_path.stem}.parquet"

csv_columns = {
    "x": pa.float64(),
    "y": pa.float64(),
    "z": pa.float64(),  # Evo expects centroid columns in a Parquet file to be lowercase 'x', 'y', 'z' and to all be float64 Parquet data type
    "da0f878c-c524-622b-50a5-a1f39b6c2075": pa.float32(),  # a0
    "12b66d3d-838e-2ffe-fd52-9d5144e33e5e": pa.utf8(),  # a1
    "e3f3cfcf-5bc4-f7c9-5364-72151d2f9d70": pa.bool_(),  # a2
}

options = pv.ConvertOptions(column_types=csv_columns)
table = pv.read_csv(str(input_csv_path), convert_options=options)

# Evo expects Parquet files to be created with the following parameters:
# - compression='zstd'
# - write_statistics=True
# - version='2.6' (it's recommended to use the latest Parquet version currently available)
pq.write_table(
    table,
    output_parquet,
    compression="zstd",
    row_group_size=100_000,
    version="2.6",
    data_page_version="1.0",
    write_statistics=True,
)

print("Data types used were:")
print(table.schema)

## Parquet File Handling

The following code cells include functions for common Parquet file handling tasks:
- Rename a column in a Parquet file.
- Recast a column data type in a Parquet file.
- Drop columns from a Parquet file.
- Convert a Parquet file to CSV.

In [None]:
# 5. Rename columns in a Parquet file after it has been created.
# Previous code cells in this notebook renamed columns during the Parquet creation process.

from pathlib import Path

import pyarrow as pa
import pyarrow.parquet as pq


def rename(pq_file, name_update_dict):
    pq_file = Path(pq_file)
    pq_out_path = pq_file.parent / f"{pq_file.stem}-columns-renamed.parquet"
    parquet_file = pq.ParquetFile(pq_file)
    row_groups = parquet_file.num_row_groups
    schema = parquet_file.schema_arrow

    # Check update all keys exist in the schema
    if not any(item in schema.names for item in list(name_update_dict.keys())):
        print("Not all of your update fields were found in the Parquet file schema, exiting now")
        exit()

    # Rename columns in schema
    new_fields = []
    for each in schema:
        if each.name in list(name_update_dict.keys()):
            new_fields.append(pa.field(name=name_update_dict[each.name], type=each.type))
        else:
            new_fields.append(pa.field(name=each.name, type=each.type))
    new_schema = pa.schema(new_fields)

    # For each row group get the table, rename the columns and drop the others
    for grp in range(0, row_groups):
        table_orig = parquet_file.read_row_group(grp)
        table_rename = table_orig.rename_columns(new_schema.names)

        if grp == 0:
            pqwriter = pq.ParquetWriter(
                str(pq_out_path), compression="zstd", schema=new_schema, version="2.6"
            )  # Do this on first group to create the schema
        try:
            pqwriter.write_table(table_rename)
        except Exception:
            print("Error occurred in row_group " + str(grp))
            exit()

        # Close the Parquet writer
    if pqwriter:
        pqwriter.close()
    else:
        print("pqwriter had already closed")


filepath = "data/example5/bm.parquet"
column_name_update = {
    "68002a18-5f56-57d2-e6c3-5dd87d3b7ad9": "renamed_1",
    "a1757e57-3fd4-0e65-87fd-bfa98abc3caa": "renamed_2",
}  # {old: new}

rename(filepath, column_name_update)

In [None]:
# 6. Recast a column data type in a Parquet file to a different pyarrow data type.

from pathlib import Path

import pyarrow as pa
import pyarrow.parquet as pq


def recast(pq_file, update_columns, new_type):
    pq_file = Path(pq_file)
    pq_out_path = pq_file.parent / f"{pq_file.stem}-new-data-type.parquet"
    parquet_file = pq.ParquetFile(pq_file)
    row_groups = parquet_file.num_row_groups
    schema = parquet_file.schema_arrow

    # Check all update_columns exist in the schema
    if not any(item in schema.names for item in update_columns):
        print("Not all of your update fields were found in the Parquet file schema, exiting now")
        exit()

    # Build new schema with new types
    new_fields = []
    for each in schema:
        if each.name in update_columns:
            new_fields.append(pa.field(name=each.name, type=new_type))
        else:
            new_fields.append(pa.field(name=each.name, type=each.type))

    new_schema = pa.schema(new_fields)

    # For each row group get the table, then get the column, recast the column if required
    for grp in range(0, row_groups):
        table_orig = parquet_file.read_row_group(grp)
        chunked_arrays = []

        # Go through each table column, get it's name, then set it's type from the schema
        for idx, col in enumerate(table_orig):
            name = table_orig.column_names[idx]
            chunked_arrays.append(table_orig.column(name).cast(new_schema.field(name).type, safe=True))
        # Build table from list of chunked arrays
        new_table = pa.Table.from_arrays(chunked_arrays, schema=new_schema)

        # Write the new table to file
        if grp == 0:
            pqwriter = pq.ParquetWriter(
                str(pq_out_path), compression="zstd", schema=new_schema, version="2.6"
            )  # Do this on first group to create the schema
        try:
            pqwriter.write_table(new_table)
        except Exception:
            print("Error occurred in row_group " + str(grp))
            exit()

        # Close the Parquet writer
    if pqwriter:
        pqwriter.close()
    else:
        print("pqwriter had already closed")


file_path = "data/example6/bm.parquet"
update_columns = ["a88f0e41-2dda-6417-3fce-69f19eeabe29"]  # UUID header for MV_Pct column
new_type = pa.float64()

recast(file_path, update_columns, new_type)

In [None]:
# 7. Take a list of field names that you want to keep and drops all others from a Parquet file.

from pathlib import Path

import pyarrow.parquet as pq


def reduce_columns(pq_file, fields_to_keep):
    # Read Parquet file
    parquet_file = pq.ParquetFile(str(pq_file))
    all_cols = parquet_file.schema.names
    fields_to_drop = [x for x in all_cols if x not in fields_to_keep]
    schema = parquet_file.schema_arrow

    pq_out_path = pq_file.parent / f"{pq_file.stem}-reduced-columns.parquet"

    # Prepare reduce schema
    for field in fields_to_drop:
        idx = schema.get_field_index(field)
        schema = schema.remove(idx)

    row_groups = parquet_file.num_row_groups

    for i in range(row_groups):
        table_in = parquet_file.read_row_group(i)
        table_in = table_in.drop(fields_to_drop)

        if i == 0:
            pqwriter = pq.ParquetWriter(
                str(pq_out_path), schema, compression="zstd", version="2.6"
            )  # Do this on first group to create the schema
        try:
            pqwriter.write_table(table_in)
        except Exception:
            print("Error occurred in row_group " + str(i))
            exit()

    # Close the Parquet writer
    if pqwriter:
        pqwriter.close()
    else:
        print("pqwriter had already closed")


pq_file = Path("data/example7/bm.parquet")
fields_to_keep = [
    "a88f0e41-2dda-6417-3fce-69f19eeabe29",
    "68002a18-5f56-57d2-e6c3-5dd87d3b7ad9",
]  # Columns not in this list will be removed from the Parquet file

reduce_columns(pq_file, fields_to_keep)

In [None]:
# 8. Convert a Parquet file to CSV.

from pathlib import Path

import pyarrow.parquet as pq


def convert_to_csv(pq_file):
    pq_file = Path(pq_file)
    csv_out_path = pq_file.parent / f"{pq_file.stem}.csv"
    parquet_file = pq.ParquetFile(pq_file)
    row_groups = parquet_file.num_row_groups

    for grp in range(0, row_groups):
        table = parquet_file.read_row_group(grp)
        df = table.to_pandas()
        if grp == 0:
            df.to_csv(
                csv_out_path,
                sep=",",
                header=True,
                index=False,
                mode="w",
                lineterminator="\r\n",
            )
        else:
            df.to_csv(
                csv_out_path,
                sep=",",
                header=False,
                index=False,
                mode="a",
                lineterminator="\r\n",
            )


file_path = "data/example8/bm.parquet"  # Parquet file to be converted to CSV

convert_to_csv(file_path)