In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GenomicDQ") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

# Load from Parquet
df = spark.read.parquet("output/genomic_data.parquet")

# Write to Delta table
df.write.format("delta").mode("overwrite").save("/delta/genomic_data")


In [None]:
df_delta = spark.read.format("delta").load("/delta/genomic_data")
df_delta.printSchema()


In [None]:
df_delta.show(truncate=False)


In [None]:
df_delta.createOrReplaceTempView("genomic_data")
spark.sql("SELECT genome_id, start_position FROM genomic_data WHERE start_position > 500").show()


In [None]:
import great_expectations as gx
import pandas as pd

context = gx.get_context()

# Convert Spark -> Pandas
pdf = df.toPandas()

# Run expectations
batch = context.sources.pandas_default.read_dataframe(pdf)
batch.expect_column_values_to_be_unique("genome_id")
batch.expect_column_values_to_not_be_null("sequence")
results = batch.validate()


In [None]:
results

In [None]:
from datetime import datetime
from pyspark.sql.types import StructType, StructField, StringType, BooleanType
import uuid

# Define run metadata
run_id = str(uuid.uuid4())
run_time = datetime.utcnow().isoformat()

schema = StructType([
    StructField("run_id", StringType(), False),
    StructField("run_time", StringType(), False),
    StructField("expectation", StringType(), True),
    StructField("column", StringType(), True),
    StructField("success", BooleanType(), True),
    StructField("observed_value", StringType(), True),  # store as stringified JSON-safe format
])


In [None]:
rows = []
for r in results["results"]:
    observed = r["result"].get("observed_value")
    rows.append({
        "run_id": run_id,
        "run_time": run_time,
        "expectation": r["expectation_config"]["expectation_type"],
        "column": r["expectation_config"]["kwargs"].get("column"),
        "success": r["success"],
        "observed_value": str(observed) if observed is not None else None
    })


In [None]:
rows

In [None]:
log_df = spark.createDataFrame(rows, schema=schema)
log_df.write.format("delta").mode("append").save("/delta/validation_log")


In [None]:
df_log = spark.read.format("delta").load("/delta/validation_log")
df_log.filter("success = false").show(truncate=False)


In [None]:
df_log.printSchema()


In [None]:
df_log.select("run_id", "expectation", "column", "success", "observed_value").show(truncate=False)


In [None]:
batch.expect_column_values_to_be_between("start_position", min_value=9_999_999)
results = batch.validate()


In [None]:
# Generate UUID + rows
run_id = str(uuid.uuid4())
run_time = datetime.utcnow().isoformat()

rows = []
for r in results["results"]:
    observed = r["result"].get("observed_value")
    rows.append({
        "run_id": run_id,
        "run_time": run_time,
        "expectation": r["expectation_config"]["expectation_type"],
        "column": r["expectation_config"]["kwargs"].get("column"),
        "success": r["success"],
        "observed_value": str(observed) if observed is not None else None
    })

log_df = spark.createDataFrame(rows, schema=schema)
log_df.write.format("delta").mode("append").save("/delta/validation_log")


In [None]:
spark.read.format("delta").load("/delta/validation_log").filter("success = false").show(truncate=False)


In [None]:
df_log.select("run_id", "expectation", "column", "success", "observed_value").show(truncate=False)