## Load Data

In [1]:
import os
import time
from minio import Minio
from pyspark import SparkContext, HiveContext
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession, SQLContext
from delta.tables import DeltaTable
from spark.utils import get_spark_session

### Get filenames from Minio

In [2]:
# -------------------------------------
# Discover TSV and Parquet files in MinIO
# -------------------------------------
minio_client = get_minio_client()

bucket         = "cdm-lake"
source_prefix  = "ontology-source/"          # keep the trailing slash

# → pull the generator into a list so we can iterate more than once
objects = list(
    minio_client.list_objects(bucket, prefix=source_prefix, recursive=True)
)

tsv_files = [obj.object_name for obj in objects if obj.object_name.endswith(".tsv")]
parquet_files = [obj.object_name for obj in objects if obj.object_name.endswith(".parquet")]

print("TSV files   :", tsv_files)
print("Parquet files:", parquet_files)

TSV files   : []
Parquet files: ['ontology-source/entailed_edge.parquet', 'ontology-source/feature_annotation.parquet', 'ontology-source/prefix.parquet', 'ontology-source/statements.parquet', 'ontology-source/term_association.parquet']


### Create spark session

In [3]:
try:
    spark.stop()
except:
    pass
spark = get_spark_session()

25/07/08 06:03:17 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/08 06:03:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/07/08 06:03:19 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
25/07/08 06:03:21 WARN S3ABlockOutputStream: Application invoked the Syncable API against stream writing to spark-job-logs/jplfaria/app-20250708060319-0012.inprogress. This is unsupported
25/07/08 06:03:21 WARN Utils: spark.executor.instances less than spark.dynamicAllocation.minExecutors is invalid, ignoring its setting, please update your configs.


### Create database

In [4]:
namespace = 'ontology_data'
create_namespace_if_not_exists(spark, namespace)

Namespace ontology_data is ready to use.


In [5]:
spark.sql(f"SHOW TABLES in {namespace}").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



### DROP TABLE CELL BE CAREFUL- DONT RUN

In [17]:
# Get the list of tables
tables = spark.sql(f"SHOW TABLES IN {namespace}").collect()

# Drop each table
for row in tables:
    table_name = row.tableName
    spark.sql(f"DROP TABLE IF EXISTS {namespace}.{table_name}")
    print(f"Dropped table: {namespace}.{table_name}")

Dropped table: ontology_data.entailed_edge
Dropped table: ontology_data.prefix
Dropped table: ontology_data.statements
Dropped table: ontology_data.term_association


In [6]:
spark.sql(f"SHOW TABLES in {namespace}").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
+---------+---------+-----------+



### Define and execute Delta table

#### Load parquet files

In [7]:
def create_delta_table(spark, file_name, bucket="cdm-lake"):
    """
    Load a *Parquet* file from MinIO and materialise it as a Delta table.

    Only the lines that changed compared with the TSV version are marked
    with  # <-- changed
    """
    # ---------- table / path handling ----------------------------------------------------------
    table_name = os.path.splitext(os.path.basename(file_name))[0]        

    print(f"Processing {table_name}...")
    full_path = f"s3a://{bucket}/{file_name}"
    print(f"Reading from {full_path}")

    # ---------- READ (now Parquet) --------------------------------------------------------------
    df = (spark.read
                .format("parquet")                                       
                .load(full_path))                                        

    # ---------- WRITE as Delta & register -------------------------------------------------------
    delta_path = f"s3a://{bucket}/ontology-deltalake/{table_name}"
    print(f"Writing to {delta_path}")

    (df.write
       .mode("overwrite")
       .option("compression", "snappy")
       .format("delta")
       .saveAsTable(f"{namespace}.{table_name}"))

    return table_name, df.count()


# ------------------------------------------------------------------
# Example driver loop (now over a list of .parquet files)           |
# ------------------------------------------------------------------
results = []
for parquet_file in parquet_files:      
    try:
        table, rows = create_delta_table(spark, parquet_file)
        results.append({"table": table, "rows": rows})
        print(f"✔  {table}: {rows} rows")
    except Exception as e:
        print(f"✖  Error processing {parquet_file}: {e}")

print("\nSummary:")
for r in results:
    print(f"• {r['table']}: {r['rows']} rows")

Processing entailed_edge...
Reading from s3a://cdm-lake/ontology-source/entailed_edge.parquet


                                                                                

Writing to s3a://cdm-lake/ontology-deltalake/entailed_edge


                                                                                

✔  entailed_edge: 117545336 rows
Processing feature_annotation...
Reading from s3a://cdm-lake/ontology-source/feature_annotation.parquet
Writing to s3a://cdm-lake/ontology-deltalake/feature_annotation


                                                                                

✔  feature_annotation: 236843 rows
Processing prefix...
Reading from s3a://cdm-lake/ontology-source/prefix.parquet
Writing to s3a://cdm-lake/ontology-deltalake/prefix


                                                                                

✔  prefix: 1221 rows
Processing statements...
Reading from s3a://cdm-lake/ontology-source/statements.parquet
Writing to s3a://cdm-lake/ontology-deltalake/statements


                                                                                

✔  statements: 42373349 rows
Processing term_association...
Reading from s3a://cdm-lake/ontology-source/term_association.parquet
Writing to s3a://cdm-lake/ontology-deltalake/term_association


                                                                                

✔  term_association: 3271 rows

Summary:
• entailed_edge: 117545336 rows
• feature_annotation: 236843 rows
• prefix: 1221 rows
• statements: 42373349 rows
• term_association: 3271 rows


#### Load as .tsv (if needed)

In [27]:
def create_delta_table(spark, file_name, bucket="cdm-lake"):
    # Extract table name from file path
    table_name = os.path.basename(file_name).replace('.tsv', '')
    
    print(f"Processing {table_name}...")
    
    # Read TSV file from MinIO
    print(f"Reading from s3a://{bucket}/{file_name}")
    df = spark.read.format("csv") \
        .option("header", "true") \
        .option("delimiter", "\t") \
        .option("inferSchema", "true") \
        .load(f"s3a://{bucket}/{file_name}")
    
    # Write as Delta table in MinIO
    delta_path = f"s3a://{bucket}/ontology-deltalake/{table_name}"
    print(f"Writing to {delta_path}")
    
#    df.write \
#        .format("delta") \
#        .mode("overwrite") \
#        .option("overwriteSchema", "true") \
#        .save(delta_path)
    
    # Create table in the database
#    spark.sql(f"""
#    CREATE TABLE IF NOT EXISTS {namespace}.{table_name}
#    USING DELTA
#    LOCATION '{delta_path}'
#    """)

    df.write.mode(
        "overwrite"
        ).option("compression", "snappy"
        ).format("delta"
        ).saveAsTable(f"{namespace}.{table_name}"
    )
    
    return table_name, df.count()

# Process each TSV file
results = []
for tsv_file in tsv_files:
    try:
        table_name, row_count = create_delta_table(spark, tsv_file)
        results.append({"table": table_name, "rows": row_count})
        print(f"Successfully processed {table_name} with {row_count} rows")
    except Exception as e:
        print(f"Error processing {tsv_file}: {str(e)}")

# Print summary
print("\nProcessing Summary:")
for result in results:
    print(f"Table {result['table']}: {result['rows']} rows")

Processing entailed_edge...
Reading from s3a://cdm-lake/ontology-source/entailed_edge.tsv


                                                                                

Writing to s3a://cdm-lake/ontology-deltalake/entailed_edge


                                                                                

Successfully processed entailed_edge with 101472614 rows
Processing prefix...
Reading from s3a://cdm-lake/ontology-source/prefix.tsv
Writing to s3a://cdm-lake/ontology-deltalake/prefix


                                                                                

Successfully processed prefix with 1216 rows
Processing statements...
Reading from s3a://cdm-lake/ontology-source/statements.tsv


                                                                                

Writing to s3a://cdm-lake/ontology-deltalake/statements




Successfully processed statements with 37960705 rows

Processing Summary:
Table entailed_edge: 101472614 rows
Table prefix: 1216 rows
Table statements: 37960705 rows


                                                                                

### Verify the tables were created

In [8]:
# List all tables
print("Available tables in database:")
spark.sql(f"SHOW TABLES IN {namespace}").show()

# Sample query to verify data
for table in ['statements', 'entailed_edge', 'prefix']:
    print(f"\nSample data from {table}:")
    spark.sql(f"SELECT * FROM {namespace}.{table} LIMIT 5").show(truncate=False)

Available tables in database:
+-------------+------------------+-----------+
|    namespace|         tableName|isTemporary|
+-------------+------------------+-----------+
|ontology_data|     entailed_edge|      false|
|ontology_data|feature_annotation|      false|
|ontology_data|            prefix|      false|
|ontology_data|        statements|      false|
|ontology_data|  term_association|      false|
+-------------+------------------+-----------+


Sample data from statements:


                                                                                

+-----------+-----------+----------------+--------------+------------------------------------------------+--------+--------+-----+
|stanza     |subject    |predicate       |object        |value                                           |datatype|language|graph|
+-----------+-----------+----------------+--------------+------------------------------------------------+--------+--------+-----+
|EC:2.8.3.23|EC:2.8.3.23|rdfs:subClassOf |EC:2.8.3      |NULL                                            |NULL    |NULL    |NULL |
|EC:2.8.3.23|EC:2.8.3.23|rdf:type        |owl:Class     |NULL                                            |NULL    |NULL    |NULL |
|EC:2.8.3.24|EC:2.8.3.24|rdfs:label      |NULL          |(R)-2-hydroxy-4-methylpentanoate CoA-transferase|NULL    |NULL    |NULL |
|EC:2.8.3.24|EC:2.8.3.24|rdfs:isDefinedBy|obo:eccode.owl|NULL                                            |NULL    |NULL    |NULL |
|EC:2.8.3.24|EC:2.8.3.24|oio:id          |NULL          |eccode:2.8.3.24           

                                                                                

+----------------------+---------------+----------------------+
|subject               |predicate      |object                |
+----------------------+---------------+----------------------+
|NCBITaxon:250646      |rdfs:subClassOf|NCBITaxon:250646      |
|NCBITaxon:2762532     |rdfs:subClassOf|NCBITaxon:2762532     |
|seed.compound:cpd25778|rdfs:subClassOf|seed.compound:cpd25778|
|NCBITaxon:912255      |rdfs:subClassOf|NCBITaxon:912255      |
|UniProtKB:Q62MS6      |rdfs:subClassOf|UniProtKB:Q62MS6      |
+----------------------+---------------+----------------------+


Sample data from prefix:
+------+-------------------------------------------+
|prefix|base                                       |
+------+-------------------------------------------+
|prefix|base                                       |
|rdf   |http://www.w3.org/1999/02/22-rdf-syntax-ns#|
|rdfs  |http://www.w3.org/2000/01/rdf-schema#      |
|xsd   |http://www.w3.org/2001/XMLSchema#          |
|owl   |http://www.w3.org/

### Verify MinIO structure

In [9]:
def verify_minio_structure():
    # ----------------------------
    # 1)  show all source files
    # ----------------------------
    print("Source files:")
    for obj in minio_client.list_objects(bucket,
                                         prefix="ontology-source/",
                                         recursive=True):        # <- True
        if obj.object_name.endswith((".tsv", ".parquet")):        # <- both
            print(f"  {os.path.basename(obj.object_name)}")

    # ----------------------------
    # 2)  show Delta-table folders
    # ----------------------------
    print("\nDelta tables (warehouse):")
    delta_prefix = "warehouse/ontology_data.db/"                  # <- new root
    current_table = None

    for obj in minio_client.list_objects(bucket,
                                         prefix=delta_prefix,
                                         recursive=True):
        parts = obj.object_name[len(delta_prefix):].split("/", 1) # strip prefix
        table = parts[0]

        if table != current_table:                                # new table dir
            current_table = table
            print(f"\n  {table}/")

        if len(parts) > 1:                                        # show leaf path
            print(f"    ├── {parts[1]}")

In [10]:
verify_minio_structure()

Source files:
  entailed_edge.parquet
  feature_annotation.parquet
  prefix.parquet
  statements.parquet
  term_association.parquet

Delta tables (warehouse):

  entailed_edge/
    ├── _delta_log/00000000000000000000.json
    ├── _delta_log/_commits/
    ├── part-00000-5ecf2959-1831-402b-9473-7961c899f7f0-c000.snappy.parquet
    ├── part-00001-767e8d37-a182-4cc4-a224-f41a50d2c812-c000.snappy.parquet
    ├── part-00002-ec7a1702-48ad-4a71-92f7-4124c8c835b0-c000.snappy.parquet

  feature_annotation/
    ├── _delta_log/00000000000000000000.json
    ├── _delta_log/_commits/
    ├── part-00000-061dc51d-3e96-499e-8395-2a8261f9e82b-c000.snappy.parquet

  prefix/
    ├── _delta_log/00000000000000000000.json
    ├── _delta_log/_commits/
    ├── part-00000-c4ecc9ad-31c7-4d02-a41b-64726a765343-c000.snappy.parquet

  statements/
    ├── _delta_log/00000000000000000000.json
    ├── _delta_log/_commits/
    ├── part-00000-71b8bd1b-59ef-4625-b71a-1413d0bb1cfe-c000.snappy.parquet
    ├── part-00001-885