# CSV to parquet

We use pyarrow here.

We also rename column names to lowercase.

In [2]:
import os
from multiprocessing import Pool
import re
import gzip
import json
import importlib
from itertools import zip_longest

from tqdm import tqdm # progress bar animation
import pandas as pd
import numpy as np

# pyarrow is like pandas, but works for datasets too big for memory.
import pyarrow
import pyarrow as pa
import pyarrow.csv
import pyarrow.compute as pc
import pyarrow.dataset as ds
import pyarrow.parquet

# utils is our local utility module
# if we change utils.py, and re-run a normal 'import'
# python won't reload it by default. (Since it's already loaded.)
# So we force a reload
import utils
importlib.reload(utils)

<module 'utils' from '/home/matthew/Documents/TSE/AppliedEconometrics/repo/utils.py'>

## Constants and Configuration

In [3]:
repo_data_dir = '/home/matthew/Documents/TSE/AppliedEconometrics/repo/data/'
laptop_data_dir = '/home/matthew/data/'

# output of the previous script
source_dir = os.path.join(laptop_data_dir, '01-C-split-mapped-csv')


# the parquet files go here
dest_dir = os.path.join(laptop_data_dir, '01-D-parquet-pyarrow')

# once files are processed, we move them here
# if move_when_done
archive_dir = os.path.join(laptop_data_dir, '01-D-split-mapped-csv-done')
move_when_done = True

schema_path = os.path.join(repo_data_dir, 'schemas.json')


In [5]:
version_col_name = 'SCHEMA_VERSION'
top_timestamp_col_name = 'TOP_TIMESTAMP'

In [6]:
logger = utils.Logger(os.path.join(repo_data_dir, 'logs.txt'))
logger.info("Initialising Logger")

## Prepare Schemas

In [7]:
with open(schema_path, 'r') as f:
    schemas = json.load(f)

In [8]:
# AEMO's schemas have Oracle SQL types
# map those to types arrow can use
# e.g. DATE -> pl.datatypes.Date
# NUMBER(2,0) -> pl.Int16
# NUMBER(15,5) -> pl.Float64
# VARCHAR2(10) -> pl.String
# if date_as_str, return string instead of datetime
# (because pyarrow can't read datetimes when parsing from CSV)
def aemo_type_to_arrow_type(t: str, date_as_str=False) -> pa.DataType:
    t = t.upper()
    if re.match(r"VARCHAR(2)?\(\d+\)", t):
        return pa.string()
    if re.match(r"CHAR\((\d+)\)", t):
        # single character
        # arrow has no dedicated type for that
        # so use string
        # (could use categorical?)
        return pa.string()
    elif t.startswith("NUMBER"):
        match = re.match(r"NUMBER ?\((\d+), ?(\d+)\)", t)
        if match:
            whole_digits = int(match.group(1))
            decimal_digits = int(match.group(2))
        else:
            # e.g. NUMBER(2)
            match = re.match(r"NUMBER ?\((\d+)", t)
            assert match, f"Unsure how to cast {t} to arrow type"
            whole_digits = int(match.group(1))
            decimal_digits = 0
            
        if decimal_digits == 0:
            # integer
            # we assume signed (can't tell unsigned from the schema)
            # but how many bits?
            max_val = 10**whole_digits

            if 2**(8-1) > max_val:
                return pa.int8()
            elif 2**(16-1) > max_val:
                return pa.int16()
            elif 2**(32-1) > max_val:
                return pa.int32()
            else:
                return pa.int64()
        else:
            # we could use pa.decimal128(whole_digits, decimal_digits)
            # but we don't need that much accuracy
            return pa.float64()
    elif (t == 'DATE') or re.match(r"TIMESTAMP\((\d)\)", t):
        # watch out, when AEMO say "date" they mean "datetime"
        # for both dates and datetimes they say "date",
        # but both have a time component. (For actual dates, it's always midnight.)
        # and some dates go out as far as 9999-12-31 23:59:59.999
        # (and some dates are 9999-12-31 23:59:59.997)
        if date_as_str:
            return pa.string()
        else:
            # pyarrow doesn't support parsing into a given timezone when reading from CSV
            # it does for batched chunks of CSV, but we want to stream to avoid using up all memory
            # so we'll treat these as timezone unaware datetimes
            #return pa.timestamp('s', tz='Australia/Brisbane')
            return pa.timestamp('s')
    else:
        raise ValueError(f"Unsure how to convert AEMO type {t} to arrow type")


In [9]:
utils.create_dir(dest_dir)

In [10]:
# DISPATCHLOAD is quite large, and we only need some columns for our analysis. If we drop the columns we don't need, the whole thing will be far faster.
# include ones we *might* use
def get_cols_to_skip(table):
    cols = []
    if table == 'DISPATCHLOAD':
        for c in schemas['DISPATCHLOAD'].keys():
            if any(c.upper().startswith(prefix) for prefix in ['RAISE', 'LOWER', 'VIOLATION', 'MARGINAL']):
                cols.append(c)
    return cols

In [11]:
# find tables we have no schema for
[t for t in os.listdir(source_dir) if t not in schemas]

[]

In [None]:
# takes in the name of a folder of CSVs
# converts them all to a single parquet file
# for `table`, the files are like
# source_dir / table / SCHEMA_VERSION=2 / TOP_TIMESTAMP=2019_03_02_00_45_12 /  something.CSV.gz
# renames columns to lowercase names
def convert_csv_parquet(table):
    table_dir = os.path.join(source_dir, table)
    parquet_file = os.path.join(dest_dir, table + '.parquet')

    logger.info(f"Preparing to process {table} from {table_dir} to {parquet_file}")

    input_schema = {}
    output_schema = {}
    datetime_columns = []
    expected_source_columns = list(schemas[table]['columns'].keys())
    columns_to_skip = get_cols_to_skip(table)
    source_columns_to_read = [c for c in expected_source_columns if c not in columns_to_skip]
    for (c,s) in schemas[table]['columns'].items():
        t = s['AEMO_type']
        input_schema[c] = aemo_type_to_arrow_type(t, date_as_str=True)
        if c not in columns_to_skip:
            output_schema[c.lower()] = aemo_type_to_arrow_type(t, date_as_str=False)
            if isinstance(output_schema[c.lower()], pa.TimestampType):
                datetime_columns.append(c)
    output_schema.update({
        version_col_name.lower(): pa.uint8(),
        # leave as string, don't bother converting
        # we only use this as a sort key later for deduplication
        top_timestamp_col_name.lower(): pa.string(), 
    })

    output_columns = list(output_schema.keys())
    
    input_schema = pyarrow.schema(input_schema)
    output_schema = pyarrow.schema(output_schema)

    logger.info(f"Starting to process {table} from {table_dir} to {parquet_file}")

    # this will overwrite an existing file (from a previous run)    
    with pyarrow.parquet.ParquetWriter(
        where=parquet_file, 
        schema=output_schema, 
        dictionary_pagesize_limit=1,
        use_dictionary=False, #[c for c in schemas[table]['primary_keys'] if 'DATE' not in c.upper()],
        write_statistics=False) as writer:
        for schema_subdir in os.listdir(table_dir):
            match = re.match(rf"{version_col_name}=(\d+)", schema_subdir)
            assert match, f"Unable to extract schema version from {os.path.join(table_dir, schema_subdir)}"
            schema_version = int(match.group(1))

            for top_timestamp_subdir in os.listdir(os.path.join(table_dir, schema_subdir)):
                match = re.match(rf"{top_timestamp_col_name}=([\d_]+)", top_timestamp_subdir)
                assert match, f"Unable to extract top_timestamp from {os.path.join(table_dir, schema_subdir, top_timestamp_subdir)}"
                top_timestamp = match.group(1)

                for csv_file in os.listdir(os.path.join(table_dir, schema_subdir, top_timestamp_subdir)):
                    try:
                        
                        csv_path = os.path.join(table_dir, schema_subdir, top_timestamp_subdir, csv_file)
                        logger.info(f"Processing {csv_path}")
                        with gzip.open(csv_path, 'rt') as f:
                            first_line = f.readline()
                            assert isinstance(first_line, str), f"file openned in wrong mode. {type(first_line)=}"
                            actual_source_columns = first_line.strip().split(',')

                        unexpected_source_columns = [c for c in actual_source_columns if c not in expected_source_columns]
                        if unexpected_source_columns:
                            logger.warning(f"Found unexpected columns {unexpected_source_columns} in {csv_file}")
                        
                        csv_reader = pyarrow.csv.open_csv(csv_path, 
                                                          convert_options=pyarrow.csv.ConvertOptions(
                                                              column_types=input_schema, 
                                                              include_missing_columns=True, 
                                                              include_columns=expected_source_columns)
                                                         )
            
                        for batch in csv_reader:
                            columns = []
                            
                            for (i, column_name) in enumerate(expected_source_columns):
                                assert isinstance(column_name, str), f"column_name is type {type(column_name)}"
                                column_data = batch[column_name]
                                if column_name in datetime_columns:
                                    # pyarrow assumes the timesstamp is UTC by default
                                    # the schema specifies the timezone as UTC+10 (which we want)
                                    # but it adds 10h when doing the conversion
                                    # so let's subtract 10h
                                    # This is checked with a unit test later
                                    column_data = pc.strptime(column_data, format="%Y/%m/%d %H:%M:%S", unit="s", error_is_null=True)
                                    column_data = pc.assume_timezone(column_data, timezone='Australia/Brisbane')
                                    #column_data = pc.add(column_data, -dt.timedelta(hours=TIMEZONE_OFFSET))
                                columns.append(column_data)
    
                            # now add the metadata from folder names as columns
                            # pa.array(repeat(x), type=pa.string(), size=batch.num_rows)
                            columns.append(pa.Array.from_pandas(np.repeat(schema_version, batch.num_rows)))
    
                            # convert top timestamp to datetime too
                            tt = pa.array(np.repeat(top_timestamp, batch.num_rows), type=pa.string(), size=batch.num_rows)
                            tt = pc.strptime(tt, format="%Y_%m_%d_%H_%M_%S", unit="s", error_is_null=True)
                            tt = pc.assume_timezone(tt, timezone='Australia/Brisbane')
                            columns.append(tt)

                            for (oc, ic) in zip_longest(output_schema.names[:-2], expected_source_columns):
                                assert oc == ic.lower(), f"{oc=} {ic=}"
                            
                            updated_table = pyarrow.Table.from_arrays(
                                columns, 
                                schema=output_schema
                            )
                            writer.write(updated_table)
                    except pa.ArrowInvalid as ex:
                        logger.error(f"Issue with {csv_path}")
                        logger.info(f"INPUT SCHEMA:\n{input_schema}")
                        logger.info(f"OUTPUT SCHEMA:\n{output_schema}")
                        raise
        logger.info(f"Closing parquet for {table}")
    logger.info(f"Closed parquet for {table}")
    if move_when_done:
        # move away,
        # so if the next table fails, and we re-run the script
        # we don't waste time re-doing this one
        table_archive_dir = os.path.join(archive_dir, table)
        logger.info(f"Finished with {table}, moving {table_dir} to {table_archive_dir}")
        utils.create_dir(archive_dir)
        os.rename(table_dir, table_archive_dir)

tables = [t for t in schemas if os.path.exists(os.path.join(source_dir, t))]
tables[::-1]
logger.reset()
logger.info(f"Tables listed")

logger.flush = True

# no multiprocessing
# because some tables require almost all memory to process
# (even when streaming)
for table in tqdm(tables):
    convert_csv_parquet(table)

  0%|                                                                                                                                                            | 0/2 [00:00<?, ?it/s]

In [None]:
# try this next
# https://arrow.apache.org/docs/python/dataset.html#configuring-rows-per-group-during-a-write

In [None]:
import pyarrow.dataset as ds
table = "DUDETAIL"
source_dir = f"/home/matthew/data/01-C-split-mapped-done/{table}"
part = ds.partitioning(
    pa.schema([("SCHEMA_VERSION", pa.int8()), ("TOP_TIMESTAMP", pa.string()),
    flavor="hive"
)
schema = {c: aemo_type_to_arrow_type(t['AEMO_type'], date_as_str=True) for (c,t) in schemas[table]['columns']}
dataset = ds.dataset(
    source=source_dir, 
    format="csv",
    partitioning=part,
    schema=pyarrow.schema(schema)
)