In [None]:
Oracle AI Data Platform v1.0

Copyright © 2025, Oracle and/or its affiliates.

Licensed under the Universal Permissive License v 1.0 as shown at https://oss.oracle.com/licenses/upl/

# YAML Driven Data Ingestion

You have multiple data sources (CSV, JSON, other file formats or even JDBC) that need to be ingested into delta tables on AI Data Platform. Instead of hardcoding the file paths, schema, and other parameters in the PySpark script, you define them in a YAML configuration file. You can also ingest from other cloud storage platform including AWS S3, Azure ADLS etc the connectors need to be installed in the cluster. This makes the pipeline flexible and easy to maintain.

This notebook demonstrates this using the PyYAML (https://pyyaml.org/wiki/PyYAMLDocumentation) framework. It covers:
 
 1. **Ingesting from cloud storage (also includes external volume)**
 2. **Schema validation and data rules - simple demo**
 3. **Data preparation in Spark SQL as part of ingest**
 4. **Ingesting from a JDBC data source**

 **Prerequisites**

Before you begin, ensure you have:
 - The necessary IAM policies for accessing AI Data Platform. Learn more about permissions.
 - A configured AI Data Platform environment with a compute cluster created - install the requirements file into cluster libraries, this includes;
   - pyyaml
 - Change the config files with information for your file locations - your bucket/namespace or external volume paths

 **Key Benefits:**
 - Config-Driven – No need to modify the script for new data sources.
 - Scalable – Easily add more sources to the YAML file.
 - Flexible – Supports multiple file formats dynamically.
 - Delta Lake Benefits – ACID transactions, schema evolution, and time travel.


**Next Steps**

Now that you’ve explored this sample YAML driven example ingest in the notebook, try it out with your own data sources changing the config files!

In [None]:
%sql
create catalog if not exists lake;
create schema if not exists lake.bronze;

In [1]:
import yaml
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType
from pyspark.sql.functions import col

# Load YAML configuration file
def load_config(config_path):
    with open(config_path, "r") as file:
        return yaml.safe_load(file)

# Function to parse schema from YAML
def get_spark_schema(schema_config):
    type_mapping = {
        "integer": IntegerType(),
        "string": StringType(),
        "double": DoubleType()
    }
    fields = []
    for col_def in schema_config:
        name, dtype = col_def.split(":")
        if dtype not in type_mapping:
            raise ValueError(f"Unsupported data type: {dtype}")
        fields.append(StructField(name, type_mapping[dtype], True))
    return StructType(fields)

# Data quality checks
def validate_data(df, quality_checks):
    for column in quality_checks.get("not_null", []):
        if df.filter(col(column).isNull()).count() > 0:
            raise ValueError(f"Null values found in required column: {column}")

    for column, min_val in quality_checks.get("min_value", {}).items():
        if df.filter(col(column) < min_val).count() > 0:
            raise ValueError(f"Values in column {column} are below minimum allowed: {min_val}")

# Apply preprocessing SQL
def apply_sql_transformations(df, name, sql_query):
    df.createOrReplaceTempView(name+"_stg")
    return spark.sql(sql_query)

def send_alert(message):
    print(f"ALERT: {message}")
    return

# Ingestion function with schema validation, data quality checks, and partitioning
def ingest_data(source):
    try:
        # Get expected schema - this will be checked against input data
        expected_schema = get_spark_schema(source.get("schema", {}))

        # Read data
        path = source.get("path")
        if path is None:
          df = spark.read.format(source["format"]).options(**source.get("options",{})).load()
        else:
          df = spark.read.format(source["format"]).options(**source.get("options",{})).load(source["path"])

        # Validate schema
        if expected_schema is not None and len(expected_schema) > 0:
          df_schema = set(df.schema.names)
          expected_schema_names = set(field.name for field in expected_schema)

          if df_schema != expected_schema_names:
              raise ValueError(f"Schema mismatch for {source['name']}. Expected: {expected_schema_names}, Found: {df_schema}")

        # Apply preprocessing SQL if defined
        if "preprocessing_sql" in source:
            df = apply_sql_transformations(df, source["name"], source["preprocessing_sql"])

        # Perform data quality checks
        validate_data(df, source.get("quality_checks", {}))

        # Write to Delta Lake with partitioning
        partition_cols = source.get("partition_columns", [])
        if partition_cols:
            df.write.format("delta").mode("overwrite").partitionBy(*partition_cols).saveAsTable(source["target_table"])
        else:
            df.write.format("delta").mode("overwrite").saveAsTable(source["target_table"])

    except Exception as e:
        error_message = f"Error processing {source['name']}: {str(e)}"
        send_alert(error_message)

# Ingest CSV, JSON data easily

1. You can define define data sources to import in the YAML.
2. The sources can be any Spark support file type, with type specific options (see CSV and JSON below).
3. The target delta table should be specified as catalog.schema.table.

```
data_sources:
  - name: sales_data
    path: "oci://your_bucket/your_namespace/sales.csv"
    format: "csv"
    options:
      header: "true"
      inferSchema: "true"
    target_table: "lake.bronze.sales"

  - name: user_activity
    path: "oci://your_bucket/your_namespace/user_activity.json"
    format: "json"
    options:
      multiline: "true"
    target_table: "lake.bronze.user_activity"
```
4. If you have configured S3, ADLS etc, you can change the file to s3a://, abfss://  etc

In [1]:
config = load_config("/Workspace/config/config_file_data.yaml")

for source in config["data_sources"]:
        ingest_data(source)

In [1]:
spark.sql("select * from lake.bronze.sales").show()

# Schema enforcement, validation and rules

1. You can define define a schema with the names and datatypes of columns of the data to be validated. Below TransactionID is expected to be an integer.
2. Data quality can be checked by specifying some rules like not_null and min_value
3. The data can be written to the target using partitioning by specifying a partition column.

```
data_sources:
  - name: sales_data
    path: "oci://your_bucket/your_namespace/sales.csv"
    format: "csv"
    options:
      header: "true"
      inferSchema: "true"
    target_table: "lake.bronze.salesv2"
    schema:
      - "TransactionID:integer"
      - "Date:string"
      - "CustomerID:string"
      - "ProductCategory:string"
      - "Quantity:integer"
      - "PricePerUnit:double"
      - "TotalAmount:double"
    partition_columns: ["Date"]
    quality_checks:
      not_null: ["TransactionID", "CustomerID", "TotalAmount"]
      min_value: {"TotalAmount": 0.01}
```

In [1]:
config = load_config("/Workspace/config/config_validation.yaml")

for source in config["data_sources"]:
        ingest_data(source)

In [1]:
spark.sql("select * from lake.bronze.salesv2").show()

# Data Preparation in SQL

Data preparation in Spark SQL(generates temporary view with name {data_source_name}_stg

In the example here, we query the sales_slice_data_stg temporary view on the data being ingested and ingest only the CustomerID column and multiple Quanity by 10 to create NewQuantity.
```
data_sources:
  - name: sales_slice_data
    path: "oci://your_bucket/your_namespace/sales.csv"
    format: "csv"
    options:
      header: "true"
      inferSchema: "true"
    preprocessing_sql: |
      SELECT 
          CustomerID, 
          Quantity*10 NewQuantity
      FROM sales_slice_data_stg
    target_table: "lake.bronze.sales_slicev3"
```

In [1]:
config = load_config("/Workspace/config/config_sql_prep.yaml")

for source in config["data_sources"]:
        ingest_data(source)

In [1]:
spark.sql("select * from lake.bronze.sales_slicev3").show()

# Ingest data from JDBC easily

1. You can define define data sources to import in the YAML including files like above or JDBC.
2. The sources can be any JDBC source and also it can be a table name or a even query.
3. The target delta table should be specified as catalog.schema.table.

```
data_sources:
  - name: user_data
    format: "jdbc"
    options:
      driver: "org.sqlite.JDBC"
      user: "sa"
      password: ""
      fetchsize: "100"
      dbtable: "(SELECT 1 c1, 2 c2)"
      url: "jdbc:sqlite:memory:myDb"
    target_table: "lake.bronze.user_data"
```

In [1]:
config = load_config("/Workspace/config/config_jdbc.yaml")

for source in config["data_sources"]:
        ingest_data(source)

In [1]:

spark.sql("select * from lake.bronze.user_data").show()
