# SparkSession initialization and MinIO/S3 access configuration.

Static variables' definitions:

In [1]:
S3 = 'http://server:9000'
SPARK_MASTER = "spark://server:7077"
HOST_IP = "station"
S3_BUCKET = 's3a://local-vqe-results/experiments/'
S3_WAREHOUSE = 's3a://local-features/warehouse/'

Access to MinIO/S3 is based on `MINIO_ACCESSS_KEY` and `MINIO_SECRET_KEY` its important for both values to be set.

In [2]:
import os

if not os.environ.get('MINIO_ACCESS_KEY') and not os.environ.get('MINIO_SECRET_KEY'):
    raise ValueError('Both MINIO_ACCESS_KEY and MINIO_SECRET_KEY are not set.')
if not os.environ.get('MINIO_ACCESS_KEY') and os.environ.get('MINIO_SECRET_KEY'):
    raise ValueError('MINIO_ACCESS_KEY is not set.')
if os.environ.get('MINIO_ACCESS_KEY') and not os.environ.get('MINIO_SECRET_KEY'):
    raise ValueError('MINIO_SECRET_KEY is not set.')

In [3]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder.appName('Quantum Pipeline Feature Processing')
    .master(SPARK_MASTER)
    .config("spark.driver.host", HOST_IP)
    .config("spark.jars.packages", (
        'org.slf4j:slf4j-api:2.0.17,'
        'commons-codec:commons-codec:1.18.0,'
        'com.google.j2objc:j2objc-annotations:3.0.0,'
        'org.apache.spark:spark-avro_2.12:3.5.5,'
        'org.apache.hadoop:hadoop-aws:3.3.1,'
        'org.apache.hadoop:hadoop-common:3.3.1,'
        'org.apache.iceberg:iceberg-spark-runtime-3.5_2.12:1.4.2,'
        )
    )
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.catalog.quantum_catalog", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.quantum_catalog.type", "hadoop")
    .config("spark.sql.catalog.quantum_catalog.warehouse", S3_WAREHOUSE)
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.access.key", os.environ.get('MINIO_ACCESS_KEY'))
    .config("spark.hadoop.fs.s3a.secret.key", os.environ.get('MINIO_SECRET_KEY'))
    .config("spark.hadoop.fs.s3a.endpoint", S3)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
    .config("spark.hadoop.fs.s3a.aws.credentials.provider", 
            "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    .getOrCreate()
)

:: loading settings :: url = jar:file:/home/zweiss/.pyenv/versions/3.12.9/envs/quantum-pipeline/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/zweiss/.ivy2/cache
The jars for the packages stored in: /home/zweiss/.ivy2/jars
org.slf4j#slf4j-api added as a dependency
commons-codec#commons-codec added as a dependency
com.google.j2objc#j2objc-annotations added as a dependency
org.apache.spark#spark-avro_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
org.apache.hadoop#hadoop-common added as a dependency
org.apache.iceberg#iceberg-spark-runtime-3.5_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c1c1aaed-5f20-42a6-a7fe-b1893ab793e4;1.0
	confs: [default]
	found org.slf4j#slf4j-api;2.0.17 in central
	found commons-codec#commons-codec;1.18.0 in central
	found com.google.j2objc#j2objc-annotations;3.0.0 in central
	found org.apache.spark#spark-avro_2.12;3.5.5 in central
	found org.tukaani#xz;1.9 in central
	found org.apache.hadoop#hadoop-aws;3.3.1 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.901 in central
	found org.wi

# Reading the data from MinIO/S3

Read available topics from the MinIO/S3 - different experiments are in different topics i.e.:

`vqe_decorated_result_mol0_HH_it1_bs_sto3g_bk_aer_simulator_statevector_gpu` 

This topic defines an experiment on molecule with id 0, which is H_2, containing a single iteration of the VQE, with basis set of sto3g and backend utilising GPU acceleration and statevector simulation method.

In [4]:
import requests

def list_available_topics():
    """List available topics within experiments location from the storage."""
    # configure hadoop to use appropriate filesystem
    spark._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    spark._jsc.hadoopConfiguration().set("fs.s3a.aws.credentials.provider", 
                                         "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")
    
    # create a configured filesystem
    fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(
        spark._jvm.java.net.URI.create(S3_BUCKET), 
        spark._jsc.hadoopConfiguration()
    )
    
    path = spark._jvm.org.apache.hadoop.fs.Path(S3_BUCKET)
    
    try:
        if fs.exists(path) and fs.isDirectory(path):
            return [f.getPath().getName() for f in fs.listStatus(path) if f.isDirectory()]
        return []
    except Exception as e:
        print(f"Error accessing S3: {e}")
        return []

def read_experiments_by_topic(topic_name):
    """Read Avro experiment files from a specific topic's directory."""
    topic_path = f"{S3_BUCKET}{topic_name}/partition=*/*.avro"
    df = spark.read.format("avro").load(topic_path)
    return df

Create a dataframe out of experiment data files from the storage. For currently the only implemented algorithm - VQE - output of `df.show(1)` looks like so:
```
+--------------------+--------------------+---------+------------------+-------------------+------------------+------------------+-----------+
|          vqe_result|            molecule|basis_set|  hamiltonian_time|       mapping_time|          vqe_time|        total_time|molecule_id|
+--------------------+--------------------+---------+------------------+-------------------+------------------+------------------+-----------+
|{{aer_simulator_s...|{{[H, H], [[0.0, ...|    sto3g|0.4875204563140869|0.02751922607421875|0.4407076835632324|0.9557473659515381|          0|
+--------------------+--------------------+---------+------------------+-------------------+------------------+------------------+-----------+
only showing top 1 row
```

In [5]:
# get topics available in the storage
available_topics = list_available_topics()
print("Available Topics:", available_topics)

# read one particular topic
df = read_experiments_by_topic(available_topics[0]).repartition(1)
df.show(1)

25/04/14 17:09:34 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


Available Topics: ['vqe_decorated_result_mol0_HH_it1_bs_sto3g_bk_aer_simulator_statevector_gpu', 'vqe_decorated_result_mol1_OHH_it1_bs_sto3g_bk_aer_simulator_statevector_gpu', 'vqe_decorated_result_mol2_HeHe_it1_bs_sto3g_bk_aer_simulator_statevector_gpu', 'vqe_decorated_result_mol3_LiH_it1_bs_sto3g_bk_aer_simulator_statevector_gpu', 'vqe_decorated_result_mol4_BeH_it1_bs_sto3g_bk_aer_simulator_statevector_gpu', 'vqe_decorated_result_mol5_BH_it1_bs_sto3g_bk_aer_simulator_statevector_gpu', 'vqe_decorated_result_mol6_CHHHH_it1_bs_sto3g_bk_aer_simulator_statevector_gpu', 'vqe_decorated_result_mol7_NHHH_it1_bs_sto3g_bk_aer_simulator_statevector_gpu', 'vqe_decorated_result_mol8_NN_it1_bs_sto3g_bk_aer_simulator_statevector_gpu']


                                                                                

+--------------------+--------------------+---------+------------------+--------------------+-------------------+-------------------+-----------+
|          vqe_result|            molecule|basis_set|  hamiltonian_time|        mapping_time|           vqe_time|         total_time|molecule_id|
+--------------------+--------------------+---------+------------------+--------------------+-------------------+-------------------+-----------+
|{{aer_simulator_s...|{{[H, H], [[0.0, ...|    sto3g|0.0638587474822998|0.022032976150512695|0.33730435371398926|0.42319607734680176|          0|
+--------------------+--------------------+---------+------------------+--------------------+-------------------+-------------------+-----------+
only showing top 1 row



25/04/14 17:12:43 ERROR TaskSchedulerImpl: Lost executor 0 on 172.22.0.6: worker lost: 172.22.0.6:34979 got disassociated
25/04/14 17:12:53 WARN StandaloneAppClient$ClientEndpoint: Connection to 640653264d32:7077 failed; waiting for master to reconnect...
25/04/14 17:12:53 WARN StandaloneSchedulerBackend: Disconnected from Spark cluster! Waiting for reconnection...


# Processing the data into ML features

#### In order for data tracking to be as efficient as possible metadata columns are added for each DataFrame.

In [None]:
from pyspark.sql.functions import udf, current_timestamp, current_date, lit, col, exp, size, explode, expr
from pyspark.sql.types import StringType
import uuid

def add_metadata_columns(dataframe, processing_name):
    """Add metadata columns to the dataframes."""
    #  udf to generate uuids
    generate_uuid = udf(lambda: str(uuid.uuid4()), StringType())
    
    return dataframe \
        .withColumn("experiment_id", generate_uuid()) \
        .withColumn("processing_timestamp", current_timestamp()) \
        .withColumn("processing_date", current_date()) \
        .withColumn("processing_batch_id", lit(str(uuid.uuid4()))) \
        .withColumn("processing_name", lit(processing_name))

#### Here Iceberg table is created with specified parameters i.e. partitioning in order to group similar rows together.

In [None]:
def create_iceberg_table(dataframe, table_name, partition_columns=None, comment=None):
    """Create Iceberg table with partitioning and other properties (if provided)."""
    writer = dataframe.write \
        .format("iceberg") \
        .option("write-format", "parquet")
    
    # partition, if specified partition columns
    if partition_columns:
        writer = writer.partitionBy(*partition_columns)
    
    # add optional properties, if defined
    if comment:
        writer = writer.option("comment", comment)

    # create the table for the feature
    writer.mode("overwrite") \
        .saveAsTable(f"quantum_catalog.quantum_features.{table_name}")
    
    print(f"Created table: quantum_catalog.quantum_features.{table_name}")
    
    # create a tag for this version of the table
    snapshot_id = spark.sql(f"SELECT snapshot_id FROM quantum_catalog.quantum_features.{table_name}.snapshots ORDER BY committed_at DESC LIMIT 1").collect()[0][0]
    version_tag = f"v_{dataframe.first().processing_batch_id.replace('-', '')}"
    
    spark.sql(f"""
    ALTER TABLE quantum_catalog.quantum_features.{table_name}
    CREATE TAG {version_tag} AS OF VERSION {snapshot_id}
    """)
    
    print(f"Created version tag: {version_tag} for table {table_name}")
    
    return version_tag

#### Key part of the incremental updates - function checks if provided table exists and if it does, it utilises an antijoin to get only the records that do not exist in the target table.

In [None]:
def identify_new_records(new_data_df, table_name, key_columns):
    """
    Identifies records in new_data_df that don't exist in the target table.
    Uses DataFrame operations for better performance.
    
    Args:
        new_data_df: DataFrame containing potentially new data
        table_name: Name of the table to check against
        key_columns: List of column names that uniquely identify records
    
    Returns:
        DataFrame: records that don't exist in the target table
    """
    try:
        # if new_data_df empty, return empty df
        if new_data_df.isEmpty():
            return new_data_df

        # if table does not exist, every record is new
        if not spark.catalog.tableExists(f"quantum_catalog.quantum_features.{table_name}"):
            return new_data_df

        # retrieve existing keys
        existing_keys = spark.sql(f"SELECT DISTINCT {', '.join(key_columns)} FROM quantum_catalog.quantum_features.{table_name}")
        
        # if table exists, but has no records - return all records
        if existing_keys.isEmpty():
            return new_data_df

        # marker column for the join
        new_with_marker = new_data_df.select(*key_columns).distinct().withColumn("is_new", lit(1))
        existing_with_marker = existing_keys.withColumn("exists", lit(1))

        # left join
        joined = new_with_marker.join(existing_with_marker, on=key_columns, how="left")

        # ensure join is not empty
        if joined.isEmpty():
            return new_data_df

        # get only the keys that do not exist in the records yet
        new_keys = joined.filter(col("exists").isNull()).select(*key_columns)

        # ensure new keys are not empty
        if new_keys.isEmpty():
            return new_data_df.limit(0)

        # join back to get an array with full records
        truly_new_data = new_data_df.join(new_keys, on=key_columns, how="inner")

        return truly_new_data

    except Exception as e:
        raise RuntimeError(f"Error identifying new records: {str(e)}")

#### Incremental update i.e. processes only those features that weren't processed before. If the table exists new data is appended to existing features, otherwise creates the table.

In [None]:
def process_incremental_data(new_data_df, table_name, key_columns, partition_columns=None, comment=None):
    """
    Process the data incrementally - only data that wasn't processed yet.
    
    Args:
      new_data_df: DataFrame containing potentially new data
      table_name: Name of the target table
      key_columns: List of column names that uniquely identify records
      partition_columns: Optional list of columns to partition by
      comment: Optional comment for the table
    
    Returns:
        tuple:  version tag and count of new records processed
    """
    # check if table exists
    table_exists = spark.catalog._jcatalog.tableExists(f"quantum_catalog.quantum_features.{table_name}")

    # if the table doesn't exist, create it
    if not table_exists:
        writer = new_data_df.write \
            .format("iceberg") \
            .option("write-format", "parquet")
        
        # partition, if specified partition columns
        if partition_columns:
            writer = writer.partitionBy(*partition_columns)
        
        # add optional properties, if defined
        if comment:
            writer = writer.option("comment", comment)

        # create the table for the feature
        writer.mode("overwrite") \
            .saveAsTable(f"quantum_catalog.quantum_features.{table_name}")
        
        print(f"Created table: quantum_catalog.quantum_features.{table_name}")
        
        # create a tag for this version of the table
        snapshot_id = spark.sql(f"SELECT snapshot_id FROM quantum_catalog.quantum_features.{table_name}.snapshots ORDER BY committed_at DESC LIMIT 1").collect()[0][0]
        
        # Get processing_batch_id before we lose reference to the DataFrame
        first_row = new_data_df.limit(1).collect()
        processing_batch_id = first_row[0]["processing_batch_id"].replace('-', '')
        version_tag = f"v_{processing_batch_id}"
        
        spark.sql(f"""
        ALTER TABLE quantum_catalog.quantum_features.{table_name}
        CREATE TAG {version_tag} AS OF VERSION {snapshot_id}
        """)
        
        print(f"Created version tag: {version_tag} for table {table_name}")
        
        return version_tag, new_data_df.count()
    else:
        # if table does exist
        # Store the processing_batch_id before filtering for new records
        first_row = new_data_df.limit(1).collect()
        if not first_row:
            print(f"Input dataset is empty for {table_name}")
            return None, 0
            
        processing_batch_id = first_row[0]["processing_batch_id"].replace("-", "")
        
        truly_new_data = identify_new_records(new_data_df, table_name, key_columns)

        # if no new data, do not move with the process
        new_record_count = truly_new_data.count()
        if new_record_count == 0:
            print(f"No new records found for table {table_name}")
            return None, 0

        # write only new data to the Iceberg
        writer = truly_new_data.write \
            .format("iceberg") \
            .option("write-format", "parquet")
    
        # add partitioning if specified
        if partition_columns:
            writer = writer.partitionBy(*partition_columns)
    
        # append to the existing table
        writer.mode("append") \
            .saveAsTable(f"quantum_catalog.quantum_features.{table_name}")
    
        print(f"Appended {new_record_count} new records to table {table_name}")
        
        # create a tag for the incremental update
        snapshot_id = spark.sql(
            f"SELECT snapshot_id FROM quantum_catalog.quantum_features.{table_name}.snapshots ORDER BY committed_at DESC LIMIT 1"
        ).collect()[0][0]

        version_tag = f"v_incr_{processing_batch_id}"
    
        spark.sql(f'''
            ALTER TABLE quantum_catalog.quantum_features.{table_name}
            CREATE TAG {version_tag} AS OF VERSION {snapshot_id}
        ''')
    
        print(f"Created incremental version tag: {version_tag} for table {table_name}")
    
        return version_tag, new_record_count

#### Unpack the data into DataFrames and add unique fields to identify each experiment and iteration

In [None]:
def transform_quantum_data(df):
    """
    Transforms the original quantum data into various feature tables.
    
    Args:
        df: Original dataframe with quantum simulation data
    
    Returns:
        dict: Dictionary of transformed dataframes
    """
    # original dataframe with metadata attached
    base_df = add_metadata_columns(df, "quantum_base_processing")

    base_df = base_df.select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("vqe_result.initial_data").alias("initial_data"),
        col("vqe_result.iteration_list").alias("iteration_list"),
        col("vqe_result.minimum").alias("minimum_energy"),
        col("vqe_result.optimal_parameters").alias("optimal_parameters"),
        col("vqe_result.maxcv").alias("maxcv"),
        col("vqe_result.minimization_time").alias("minimization_time"),
        col("hamiltonian_time"),
        col("mapping_time"),
        col("vqe_time"),
        col("total_time"),
        col("molecule.molecule_data").alias("molecule_data"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    )

    # molecule information dataframe
    df_molecule = base_df.select(
        col("experiment_id"),
        col("molecule_id"),
        col("molecule_data.symbols").alias("atom_symbols"),
        col("molecule_data.coords").alias("coordinates"),
        col("molecule_data.multiplicity").alias("multiplicity"),
        col("molecule_data.charge").alias("charge"),
        col("molecule_data.units").alias("coordinate_units"),
        col("molecule_data.masses").alias("atomic_masses"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    )

    # ansatz information dataframe
    df_ansatz = base_df.select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("initial_data.ansatz").alias("ansatz"),
        col("initial_data.ansatz_reps").alias("ansatz_reps"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    )

    # metrics information dataframe
    df_metrics = base_df.select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("hamiltonian_time"),
        col("mapping_time"),
        col("vqe_time"),
        col("total_time"),
        col("minimization_time"),
        (col("hamiltonian_time") + col("mapping_time") + col("vqe_time")).alias("computed_total_time"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    )

    # VQE results dataframe
    df_vqe = base_df.select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("initial_data.backend").alias("backend"),
        col("initial_data.num_qubits").alias("num_qubits"),
        col("initial_data.optimizer").alias("optimizer"),
        col("initial_data.noise_backend").alias("noise_backend"),
        col("initial_data.default_shots").alias("default_shots"),
        col("initial_data.ansatz_reps").alias("ansatz_reps"),
        col("minimum_energy"),
        col("maxcv"),
        size(col("iteration_list")).alias("total_iterations"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    )

    # initial parameters dataframe
    df_initial_parameters = base_df.select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("initial_data.backend").alias("backend"),
        col("initial_data.num_qubits").alias("num_qubits"),
        explode(col("initial_data.initial_parameters")).alias("initial_parameter_value"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    ).withColumn(
        "parameter_index", 
        expr("hash(concat(experiment_id, initial_parameter_value)) % 1000000")
    ).withColumn(
        "parameter_id", 
        expr("concat(experiment_id, '_init_', cast(parameter_index as string))")
    )

    # optimal parameters
    df_optimal_parameters = base_df.select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("initial_data.backend").alias("backend"),
        col("initial_data.num_qubits").alias("num_qubits"),
        explode(col("optimal_parameters")).alias("optimal_parameter_value"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    ).withColumn(
        "parameter_index", 
        expr("hash(concat(experiment_id, optimal_parameter_value)) % 1000000")
    ).withColumn(
        "parameter_id", 
        expr("concat(experiment_id, '_opt_', cast(parameter_index as string))")
    )

    # iterations
    df_iterations = base_df.select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("initial_data.backend").alias("backend"),
        col("initial_data.num_qubits").alias("num_qubits"),
        explode(col("iteration_list")).alias("iteration"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    ).select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("backend"),
        col("num_qubits"),
        col("iteration.iteration").alias("iteration_step"),
        col("iteration.result").alias("iteration_energy"),
        col("iteration.std").alias("energy_std_dev"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    ).withColumn(
        "iteration_id", 
        expr("concat(experiment_id, '_iter_', cast(hash(concat(experiment_id, cast(iteration_step as string))) % 1000000 as string))")
    )

    # iteration parameters
    df_iteration_parameters = base_df.select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("initial_data.backend").alias("backend"),
        col("initial_data.num_qubits").alias("num_qubits"),
        explode(col("iteration_list")).alias("iteration"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    ).select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("backend"),
        col("num_qubits"),
        col("iteration.iteration").alias("iteration_step"),
        explode(col("iteration.parameters")).alias("parameter_value"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    ).withColumn(
        "parameter_index", 
        expr("hash(concat(experiment_id, cast(iteration_step as string), parameter_value)) % 1000000")
    ).withColumn(
        "iteration_id", 
        expr("concat(experiment_id, '_iter_', cast(hash(concat(experiment_id, cast(iteration_step as string))) % 1000000 as string))")
    ).withColumn(
        "parameter_id", 
        expr("concat(iteration_id, '_param_', cast(parameter_index as string))")
    )

    # hamiltonian terms
    df_hamiltonian = base_df.select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("initial_data.backend").alias("backend"),
        explode(col("initial_data.hamiltonian")).alias("hamiltonian_term"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    ).select(
        col("experiment_id"),
        col("molecule_id"),
        col("basis_set"),
        col("backend"),
        col("hamiltonian_term.label").alias("term_label"),
        col("hamiltonian_term.coefficients.real").alias("coeff_real"),
        col("hamiltonian_term.coefficients.imaginary").alias("coeff_imag"),
        col("processing_timestamp"),
        col("processing_date"),
        col("processing_batch_id"),
        col("processing_name")
    ).withColumn(
        "term_index", 
        expr("hash(concat(experiment_id, term_label)) % 1000000")
    ).withColumn(
        "term_id", 
        expr("concat(experiment_id, '_term_', cast(term_index as string))")
    )
    
    # return all the transformed dataframes
    return {
        "molecules": df_molecule,
        "ansatz_info": df_ansatz,
        "performance_metrics": df_metrics,
        "vqe_results": df_vqe,
        "initial_parameters": df_initial_parameters,
        "optimal_parameters": df_optimal_parameters,
        "vqe_iterations": df_iterations,
        "iteration_parameters": df_iteration_parameters,
        "hamiltonian_terms": df_hamiltonian,
        "base_df": base_df
    }

#### Store metadata in a separate table for tracking, in case it does not exist

In [None]:
def create_metadata_table_if_not_exists():
    """Create the metadata tracking table if it doesn't exist."""
    spark.sql("""
    CREATE TABLE IF NOT EXISTS quantum_catalog.quantum_features.processing_metadata (
        processing_batch_id STRING,
        processing_name STRING,
        processing_timestamp TIMESTAMP,
        processing_date DATE,
        table_names ARRAY<STRING>,
        table_versions ARRAY<STRING>,
        record_counts ARRAY<BIGINT>,
        source_data_info STRING
    ) USING iceberg
    """)

#### Update the metadata table with new processing information

In [None]:
def update_metadata_table(dfs, table_names, table_versions, record_counts, source_info):
    """Update the metadata table with processing information."""
    base_df = dfs["base_df"]
    
    processing_info = spark.createDataFrame([{
        "processing_batch_id": base_df.first().processing_batch_id,
        "processing_name": base_df.first().processing_name,
        "processing_timestamp": base_df.first().processing_timestamp,
        "processing_date": base_df.first().processing_date,
        "table_names": table_names,
        "table_versions": table_versions,
        "record_counts": record_counts,
        "source_data_info": source_info
    }])

    processing_info.write \
        .format("iceberg") \
        .mode("append") \
        .saveAsTable("quantum_catalog.quantum_features.processing_metadata")
    
    print(f"Updated metadata table with processing batch {base_df.first().processing_batch_id}")

### Process each experiment and update metadata accordingly

In [None]:
def process_experiments_incrementally(df):
    """
    Main function to process quantum data incrementally.
    
    Args:
        df: Original dataframe with quantum simulation data
    """
    # create metadata tracking table if it doesn't exist
    create_metadata_table_if_not_exists()
    
    # transform the data
    dfs = transform_quantum_data(df)
    
    # table configurations for each dataframe
    table_configs = {
        "molecules": {
            "key_columns": ["experiment_id", "molecule_id"],
            "partition_columns": ["processing_date"],
            "comment": "Molecule information for quantum simulations"
        },
        "ansatz_info": {
            "key_columns": ["experiment_id", "molecule_id"],
            "partition_columns": ["processing_date", "basis_set"],
            "comment": "Ansatz configurations for quantum simulations"
        },
        "performance_metrics": {
            "key_columns": ["experiment_id", "molecule_id", "basis_set"],
            "partition_columns": ["processing_date", "basis_set"],
            "comment": "Performance metrics for quantum simulations"
        },
        "vqe_results": {
            "key_columns": ["experiment_id", "molecule_id", "basis_set"],
            "partition_columns": ["processing_date", "basis_set", "backend"],
            "comment": "VQE optimization results for quantum simulations"
        },
        "initial_parameters": {
            "key_columns": ["parameter_id"],
            "partition_columns": ["processing_date", "basis_set"],
            "comment": "Initial parameters for VQE optimization"
        },
        "optimal_parameters": {
            "key_columns": ["parameter_id"],
            "partition_columns": ["processing_date", "basis_set"],
            "comment": "Optimal parameters found by VQE optimization"
        },
        "vqe_iterations": {
            "key_columns": ["iteration_id"],
            "partition_columns": ["processing_date", "basis_set", "backend"],
            "comment": "VQE optimization iterations and energy values"
        },
        "iteration_parameters": {
            "key_columns": ["parameter_id"],
            "partition_columns": ["processing_date", "basis_set"],
            "comment": "Parameters at each iteration of VQE optimization"
        },
        "hamiltonian_terms": {
            "key_columns": ["term_id"],
            "partition_columns": ["processing_date", "basis_set", "backend"],
            "comment": "Hamiltonian terms for quantum simulations"
        }
    }
    
    # process each table incrementally
    table_versions = []
    record_counts = []
    table_names = []
    
    for table_name, config in table_configs.items():
        print(f"Processing table: {table_name}")
        version_tag, count = process_incremental_data(
            dfs[table_name],
            table_name,
            config["key_columns"],
            config["partition_columns"],
            config["comment"]
        )
        
        table_names.append(table_name)
        table_versions.append(version_tag if version_tag else "no_changes")
        record_counts.append(count)
    
    # update metadata tracking
    update_metadata_table(
        dfs,
        table_names,
        table_versions,
        record_counts,
        "Incremental VQE simulation data processing"
    )
    
    print("Incremental processing completed!")
    
    # summary of processed records
    return dict(zip(table_names, record_counts))

In [None]:
processed_counts = process_experiments_incrementally(df)
    
print("\nProcessing Summary:")
for table, count in processed_counts.items():
    print(f"{table}: {count} new records processed")