In [None]:
import os

from utils import date_from_file_name
import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq
from lxml import etree
from pathlib import Path
import time
import sys

from utils import date_from_file_name

In [7]:
sorted(list(os.listdir('data/datove_sady_stk')), key=date_from_file_name)

['Prohlídky vozidel STK a SME za 01-01-2019.xml',
 'Prohlídky vozidel STK a SME za 02-01-2019.xml',
 'Prohlídky vozidel STK a SME za 03-01-2019.xml',
 'Prohlídky vozidel STK a SME za 04-01-2019.xml',
 'Prohlídky vozidel STK a SME za 05-01-2019.xml',
 'Prohlídky vozidel STK a SME za 06-01-2019.xml',
 'Prohlídky vozidel STK a SME za 07-01-2019.xml',
 'Prohlídky vozidel STK a SME za 08-01-2019.xml',
 'Prohlídky vozidel STK a SME za 09-01-2019.xml',
 'Prohlídky vozidel STK a SME za 10-01-2019.xml',
 'Prohlídky vozidel STK a SME za 11-01-2019.xml',
 'Prohlídky vozidel STK a SME za 12-01-2019.xml',
 'Prohlídky vozidel STK a SME za 13-01-2019.xml',
 'Prohlídky vozidel STK a SME za 14-01-2019.xml',
 'Prohlídky vozidel STK a SME za 15-01-2019.xml',
 'Prohlídky vozidel STK a SME za 16-01-2019.xml',
 'Prohlídky vozidel STK a SME za 17-01-2019.xml',
 'Prohlídky vozidel STK a SME za 18-01-2019.xml',
 'Prohlídky vozidel STK a SME za 19-01-2019.xml',
 'Prohlídky vozidel STK a SME za 20-01-2019.xml',


In [None]:
DATA_DIR = Path("data/datove_sady_stk") 

# Base directory to save the Parquet tables
BASE_OUTPUT_DIR = Path("data/parquet_output_full")

# Table-specific output directories
PROHLIDKY_OUT_DIR = BASE_OUTPUT_DIR / "prohlidky"
ZAVADY_OUT_DIR = BASE_OUTPUT_DIR / "zavady"
UKONY_OUT_DIR = BASE_OUTPUT_DIR / "ukony"
ADRTYPY_OUT_DIR = BASE_OUTPUT_DIR / "adr_typy"

# Number of records per batch file (for the main 'prohlidky' table)
BATCH_SIZE = 50000

# The XML namespaces from your XSDs:
# 'p': istp:opendata:schemas:ProhlidkaSeznam:v1 (Inspection data)
# 'd': istp:opendata:schemas:DatovaSada:v1 (Wrapper/Container data)
NS = {
    'p': 'istp:opendata:schemas:ProhlidkaSeznam:v1', 
    'd': 'istp:opendata:schemas:DatovaSada:v1'      
}

# The tag for the repeating record
TAG_NAME_PROHLIDKA = f"{{{NS['p']}}}Prohlidka"
# The tag for the wrapper
TAG_NAME_DAT_SADA = f"{{{NS['d']}}}DatovaSada"

# --- Helper Functions ---










def write_batch(batch_data, output_dir, table_name, part_num):
    """
    Converts a list of dictionaries to an Arrow Table
    and writes it to a Parquet file.
    """
    if not batch_data:
        return
        
    file_name = f"{table_name}_part_{part_num:05d}.parquet"
    # Suppress verbose batch writing output
    # print(f"  Writing batch {part_num} for {table_name} ({len(batch_data)} records) to {file_name}...")
    try:
        df = pd.DataFrame(batch_data)
        table = pa.Table.from_pandas(df, preserve_index=False)
        pq.write_table(table, output_dir / file_name)
    except Exception as e:
        print(f"  Error writing batch {file_name}: {e}")

# --- Main Processing Logic ---

def process_files():
    """
    Main processing loop. Handles file iteration, memory management, and batch writing.
    """
    # Create output directories
    PROHLIDKY_OUT_DIR.mkdir(parents=True, exist_ok=True)
    ZAVADY_OUT_DIR.mkdir(parents=True, exist_ok=True)
    UKONY_OUT_DIR.mkdir(parents=True, exist_ok=True)
    ADRTYPY_OUT_DIR.mkdir(parents=True, exist_ok=True)

    xml_files = sorted(list(DATA_DIR.glob("*.xml")), key=lambda path: date_from_file_name(str(path)))
    if not xml_files:
        print(f"No .xml files found in {DATA_DIR}")
        return

    print(f"Found {len(xml_files)} XML files. Starting processing...")
    
    # Batches for each table
    prohlidky_batch = []
    zavady_batch = []
    ukony_batch = []
    adr_typy_batch = []
    
    # Part numbers for file names
    prohlidky_part_num = 0
    zavady_part_num = 0
    ukony_part_num = 0
    adr_typy_part_num = 0

    total_prohlidky = 0
    total_zavady = 0
    total_ukony = 0
    total_adr_typy = 0
    
    start_time = time.time()

    for i, xml_file in enumerate(xml_files):
        print(f"({i+1}/{len(xml_files)}) Processing file: {xml_file.name}...")
        try:
            # 1. Parse the entire file tree to find the DatovyObsah element.
            #    This is the only necessary instance where the whole file is loaded to 
            #    extract the single DatovyObsah block, which is unavoidable due to the XSD structure.
            tree = etree.parse(str(xml_file))
            
            # 2. Locate DatovyObsah (d:DatovyObsah)
            datovy_obsah = tree.find(f'd:DatovyObsah', NS)
            
            if datovy_obsah is None or len(datovy_obsah) == 0:
                print(f"  Error: Missing d:DatovyObsah tag in {xml_file.name}. Skipping file.")
                del tree
                continue

            # 3. The actual ProhlidkaSeznam element is the first child of DatovyObsah
            prohlidka_seznam_elem = datovy_obsah[0]

            # 4. Iterate over the children of ProhlidkaSeznam (p:Prohlidka elements)
            for elem in prohlidka_seznam_elem.iterchildren(tag=TAG_NAME_PROHLIDKA):
                
                prohlidka_rec, zavady_recs, ukony_recs, adr_typy_recs = parse_prohlidka_element(elem, NS)
                
                if prohlidka_rec:
                    prohlidky_batch.append(prohlidka_rec)
                    zavady_batch.extend(zavady_recs)
                    ukony_batch.extend(ukony_recs)
                    adr_typy_batch.extend(adr_typy_recs)
                    
                    total_prohlidky += 1
                    total_zavady += len(zavady_recs)
                    total_ukony += len(ukony_recs)
                    total_adr_typy += len(adr_typy_recs)
                
                # CRITICAL: Clear the element to manage memory
                elem.clear()

                # Check if the MAIN batch is full
                if len(prohlidky_batch) >= BATCH_SIZE:
                    # Write all current batches
                    print(f"  Writing Batch {prohlidky_part_num}...")
                    write_batch(prohlidky_batch, PROHLIDKY_OUT_DIR, "prohlidky", prohlidky_part_num)
                    prohlidky_batch = []
                    prohlidky_part_num += 1

                    write_batch(zavady_batch, ZAVADY_OUT_DIR, "zavady", zavady_part_num)
                    zavady_batch = []
                    zavady_part_num += 1
                    
                    write_batch(ukony_batch, UKONY_OUT_DIR, "ukony", ukony_part_num)
                    ukony_batch = []
                    ukony_part_num += 1
                    
                    write_batch(adr_typy_batch, ADRTYPY_OUT_DIR, "adr_typy", adr_typy_part_num)
                    adr_typy_batch = []
                    adr_typy_part_num += 1

            del tree

        except etree.XMLSyntaxError as e:
            print(f"  Syntax error parsing {xml_file.name}: {e}. Skipping file.")
        except Exception as e:
            print(f"  An unexpected error occurred with {xml_file.name}: {e}. Skipping file.")

    # Write any remaining records
    print("\nWriting final batches...")
    if prohlidky_batch:
        write_batch(prohlidky_batch, PROHLIDKY_OUT_DIR, "prohlidky", prohlidky_part_num)
    if zavady_batch:
        write_batch(zavady_batch, ZAVADY_OUT_DIR, "zavady", zavady_part_num)
    if ukony_batch:
        write_batch(ukony_batch, UKONY_OUT_DIR, "ukony", ukony_part_num)
    if adr_typy_batch:
        write_batch(adr_typy_batch, ADRTYPY_OUT_DIR, "adr_typy", adr_typy_part_num)

    end_time = time.time()
    print("\n--- Processing Complete ---")
    print(f"Total time: {end_time - start_time:.2f} seconds")
    print(f"Output saved to: {BASE_OUTPUT_DIR.resolve()}")
    print("\n--- Record Totals ---")
    print(f"Inspections (prohlidky): {total_prohlidky}")
    print(f"Defects (zavady):         {total_zavady}")
    print(f"Control Actions (ukony):  {total_ukony}")
    print(f"ADR Types (adr_typy):     {total_adr_typy}")

In [13]:
process_files()

Found 2183 XML files. Starting processing...
(1/2183) Processing file: Prohlídky vozidel STK a SME za 01-01-2019.xml...
(2/2183) Processing file: Prohlídky vozidel STK a SME za 02-01-2019.xml...
(3/2183) Processing file: Prohlídky vozidel STK a SME za 03-01-2019.xml...
(4/2183) Processing file: Prohlídky vozidel STK a SME za 04-01-2019.xml...
  Writing Batch 0...
(5/2183) Processing file: Prohlídky vozidel STK a SME za 05-01-2019.xml...
(6/2183) Processing file: Prohlídky vozidel STK a SME za 06-01-2019.xml...
(7/2183) Processing file: Prohlídky vozidel STK a SME za 07-01-2019.xml...
(8/2183) Processing file: Prohlídky vozidel STK a SME za 08-01-2019.xml...
  Writing Batch 1...
(9/2183) Processing file: Prohlídky vozidel STK a SME za 09-01-2019.xml...
(10/2183) Processing file: Prohlídky vozidel STK a SME za 10-01-2019.xml...
(11/2183) Processing file: Prohlídky vozidel STK a SME za 11-01-2019.xml...
  Writing Batch 2...
(12/2183) Processing file: Prohlídky vozidel STK a SME za 12-01-2

KeyboardInterrupt: 