In [None]:
# =======================================
# Setup: Imports & Configuration
# =======================================
import os
import json
import requests
from datetime import datetime
from pathlib import Path

# Spark libraries
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, ArrayType

# Initialize Spark session
spark = SparkSession.builder.appName("ADME Ingestion").getOrCreate()

# ========== USER CONFIGURATION ==========
TENANT_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
CLIENT_ID = "xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx"
CLIENT_SECRET = "xxxxxxxxxx"
RESOURCE = "https://<your_adme_instance>.energy.azure.com"
PARTITION_ID = "xxxxx"
BASE_URL = f"{RESOURCE}/api"
SCOPE = f"{CLIENT_ID}/.default"

LEGAL_TAG = "xxxxx"
DATA_ROOT = "/lakehouse/default/Files/nz-data-pack-small"
REFERENCE_DATA_FOLDER = f"{DATA_ROOT}/reference-data"
MASTER_DATA_WELL = f"{DATA_ROOT}/master-data/Well.1.3.0.json"
MASTER_DATA_WELLBORE = f"{DATA_ROOT}/master-data/Wellbore.1.5.0.json"
METADATA_WELLLOG = f"{DATA_ROOT}/work-product-components/WellLog.1.4.0.json"
METADATA_MARKERSET = f"{DATA_ROOT}/work-product-components/WellboreMarkerSet.1.4.0.json"
METADATA_TRAJECTORY = f"{DATA_ROOT}/work-product-components/WellboreTrajectory.1.3.0.json"
PARQUET_LOGS_FOLDER = f"{DATA_ROOT}/work-product-components/logs"
PARQUET_TRAJECTORIES_FOLDER = f"{DATA_ROOT}/work-product-components/trajectories"

# Construct endpoint URLs
STORAGE_URL = f"{BASE_URL}/storage/v2/records"
WELLLOG_DMS_URL = f"{BASE_URL}/os-wellbore-ddms/ddms/v3/welllogs"
WELL_DMS_URL = f"{BASE_URL}/os-wellbore-ddms/ddms/v3/wells"
WELLBORE_DMS_URL = f"{BASE_URL}/os-wellbore-ddms/ddms/v3/wellbores"
TRAJECTORY_DMS_URL = f"{BASE_URL}/os-wellbore-ddms/ddms/v3/wellboretrajectories"
MARKERSET_DMS_URL = f"{BASE_URL}/os-wellbore-ddms/ddms/v3/wellboremarkersets"

headers = {}


StatementMeta(, 9ff45185-ec9c-4107-a1a2-7a048ed9bf7d, 15, Finished, Available, Finished)

In [14]:
# =======================================
# Authentication
# =======================================
def get_adme_token(tenant_id, client_id, client_secret, scope):
    try:
        url = f"https://login.microsoftonline.com/{tenant_id}/oauth2/v2.0/token"
        payload = {
            "grant_type": "client_credentials",
            "client_id": client_id,
            "client_secret": client_secret,
            "scope": scope
        }
        headers = {
            "Content-Type": "application/x-www-form-urlencoded"
        }
        response = requests.post(url, data=payload, headers=headers)
        response.raise_for_status()
        return response.json()["access_token"]
    except Exception as e:
        raise Exception(f"Failed to get token: {str(e)}")

access_token = get_adme_token(TENANT_ID, CLIENT_ID, CLIENT_SECRET, SCOPE)
headers = {
    "Authorization": f"Bearer {access_token}",
    "data-partition-id": PARTITION_ID,
    "Content-Type": "application/json"
}
#print(access_token)

StatementMeta(, 9ff45185-ec9c-4107-a1a2-7a048ed9bf7d, 16, Finished, Available, Finished)

In [15]:
# =======================================
# Ingest Reference Data by file
# =======================================
def ingest_reference_data_file(file_path):
    try:
        with open(file_path, "r") as f:
            records = json.load(f)
        for record in records:
            response = requests.post(STORAGE_URL, headers=headers, json=record)
            if response.status_code not in [200, 201, 409]:
                print(f"Failed to ingest reference record: {response.text}")
        print(f"Ingested reference data from {file_path}")
    except Exception as e:
        print(f"Reference Data Error: {e}")

StatementMeta(, 9ff45185-ec9c-4107-a1a2-7a048ed9bf7d, 17, Finished, Available, Finished)

In [16]:
# =======================================
# Ingest Reference Data
# =======================================
def ingest_reference_data_folder(directory_path):
    try:
        for filename in os.listdir(directory_path):
            if os.path.isfile(os.path.join(directory_path, filename)) and filename.endswith('.json'):
                file_path = os.path.join(directory_path, filename)
                with open(file_path, "r") as f:
                    records = json.load(f)
                # Send the entire list of records at once
                response = requests.put(STORAGE_URL, headers=headers, data=json.dumps(records))
                if response.status_code not in [200, 201, 409]:
                    print(f"Failed to ingest reference data from {file_path}: {response.text}")
                else:
                    print(f"Ingested reference data from {file_path}")
    except Exception as e:
        print(f"Reference Data Error: {e}")


StatementMeta(, 9ff45185-ec9c-4107-a1a2-7a048ed9bf7d, 18, Finished, Available, Finished)

In [17]:
# =======================================
# Ingest Master Data: Well & Wellbore
# =======================================
def ingest_master_data(file_path, url, label):
    try:
        with open(file_path, "r") as f:
            record = json.load(f)
        response = requests.post(url, headers=headers, json=record)
        response.raise_for_status()
        print(f"Ingested {label} data successfully.")
    except Exception as e:
        print(f"Master Data Error ({label}): {e}")


StatementMeta(, 9ff45185-ec9c-4107-a1a2-7a048ed9bf7d, 19, Finished, Available, Finished)

In [18]:
# =======================================
# Ingest Meta Data
# =======================================
def ingest_metadata(file_path, url, label):
    try:
        with open(file_path, "r") as f:
            records = json.load(f)  # Load the full JSON array

        for record in records:
            # Wrap the record in a list to match the expected payload
            response = requests.post(url, headers=headers, json=[record])
            if response.status_code in [200, 201, 409]:
                record_id = response.json()["recordIds"][0]
                print(f"Ingested {label} record with ID: {record_id}")
            else:
                print(f"Failed to ingest {label} record: {response.text}")

        print(f"Finished ingesting {label} data from {file_path}")

    except Exception as e:
        print(f"{label} Ingestion Error: {e}")

StatementMeta(, 9ff45185-ec9c-4107-a1a2-7a048ed9bf7d, 20, Finished, Available, Finished)

In [19]:
# =======================================
# Helper functions to get and delete records
# =======================================

def get_record(url, record_id):
    url = f"{url}/{record_id}"
    headers = {'Content-Type': 'application/json'}
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status()
        return response
    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred while getting record {record_id}: {http_err}")
    except Exception as err:
        print(f"Unexpected error occurred while getting record {record_id}: {err}")
    return None

def delete_record(url, record_id):
    url = f"{url}/{record_id}"
    headers = {'Content-Type': 'application/json'}
    try:
        response = requests.delete(url, headers=headers)
        response.raise_for_status()
        return response
    except requests.exceptions.HTTPError as http_err:
        print(f"HTTP error occurred while deleting record {record_id}: {http_err}")
    except Exception as err:
        print(f"Unexpected error occurred while deleting record {record_id}: {err}")
    return None



StatementMeta(, 9ff45185-ec9c-4107-a1a2-7a048ed9bf7d, 21, Finished, Available, Finished)

In [20]:
# =======================================
# Ingest Meta Data by handling bulkURI if it already exist [Provided for convenience, not used in this example]
# 'bulkURI' is added when you ingest data for the first time by OSDU, if re-ingesting, 
# ensure to remove the record before adding it again
# =======================================
def ingest_metadata_handle_bulk_uri(file_path, url, label):
    try:
        with open(file_path, "r") as f:
            records = json.load(f)

        key_to_remove = "ExtensionProperties"
        record_ids = []

        for record in records:
            # Remove ExtensionProperties if it exists
            if key_to_remove in record.get("data", {}):
                record["data"].pop(key_to_remove)

            record_id = record["id"]
            existing = get_record(url, record_id)

            if existing.status_code == 200:
                try:
                    # Check if bulkURI exists in the existing record
                    bulk_uri = existing.json()["data"]["ExtensionProperties"]["wdms"]["bulkURI"]
                    print(f"Existing record has bulkURI: {bulk_uri}")
                    print("Deleting and recreating the record...")
                    delete_record(url, record_id)
                    response = requests.post(url, headers=headers, json=[record])
                except KeyError:
                    print("No bulkURI found, updating record without delete.")
                    response = requests.post(url, headers=headers, json=[record])
            else:
                response = requests.post(url, headers=headers, json=[record])

            if response.status_code in [200, 201, 409]:
                new_record_id = response.json()["recordIds"][0]
                print(f"Ingested {label} record with ID: {new_record_id}")
                record_ids.append(new_record_id)
            else:
                print(f"Failed to ingest {label} record: {response.text}")

        print(f"Finished ingesting {label} data from {file_path}")
        return record_ids

    except Exception as e:
        print(f"{label} Ingestion Error: {e}")



StatementMeta(, 9ff45185-ec9c-4107-a1a2-7a048ed9bf7d, 22, Finished, Available, Finished)

In [21]:

# =======================================
# Ingest Parquet Time Series for Well Logs
# =======================================
import glob

def ingest_welllog_parquet_data(welllog_json_path, parquet_folder):
    try:
        with open(welllog_json_path, "r") as f:
            records = json.load(f)

        for record in records:
            log_id_full = record.get("data", {}).get("Id")
            if not log_id_full or ":" not in log_id_full:
                print("Skipping record with missing or malformed ID.")
                continue

            log_id = log_id_full.split(":")[-1]
            parquet_path = os.path.join(parquet_folder, f"{log_id}.parquet")

            if not os.path.exists(parquet_path):
                print(f"Parquet file not found for log ID: {log_id}")
                continue

            with open(parquet_path, "rb") as f:
                file_bytes = f.read()

            upload_url = f"{WELLLOG_DMS_URL}/{log_id}/data"
            headers_bin = headers.copy()
            headers_bin["Content-Type"] = "application/octet-stream"

            response = requests.post(upload_url, headers=headers_bin, data=file_bytes)
            if response.status_code in [200, 201]:
                print(f"Successfully uploaded parquet for log ID: {log_id}")
            else:
                print(f"Failed to upload parquet for {log_id}: {response.status_code} - {response.text}")

    except Exception as e:
        print(f"Parquet Ingestion Error: {e}")

StatementMeta(, 9ff45185-ec9c-4107-a1a2-7a048ed9bf7d, 23, Finished, Available, Finished)

In [22]:

# =======================================
# Ingest Parquet Time Series for Trajectories
# =======================================
def ingest_trajectory_parquet_data(trajectory_json_path, parquet_folder):
    try:
        with open(trajectory_json_path, "r") as f:
            records = json.load(f)

        for record in records:
            traj_id_full = record.get("data", {}).get("Id")
            if not traj_id_full or ":" not in traj_id_full:
                print("Skipping record with missing or malformed ID.")
                continue

            traj_id = traj_id_full.split(":")[-1]
            parquet_path = os.path.join(parquet_folder, f"{traj_id}.parquet")

            if not os.path.exists(parquet_path):
                print(f"Parquet file not found for trajectory ID: {traj_id}")
                continue

            with open(parquet_path, "rb") as f:
                file_bytes = f.read()

            upload_url = f"{TRAJECTORY_DMS_URL}/{traj_id}/data"
            headers_bin = headers.copy()
            headers_bin["Content-Type"] = "application/octet-stream"

            response = requests.post(upload_url, headers=headers_bin, data=file_bytes)
            if response.status_code in [200, 201]:
                print(f"Successfully uploaded parquet for trajectory ID: {traj_id}")
            else:
                print(f"Failed to upload parquet for {traj_id}: {response.status_code} - {response.text}")

    except Exception as e:
        print(f"Trajectory Parquet Ingestion Error: {e}")

StatementMeta(, 9ff45185-ec9c-4107-a1a2-7a048ed9bf7d, 24, Finished, Available, Finished)

In [None]:

# =======================================
# FINAL INGESTION PIPELINE
# =======================================
print("Starting ADME Ingestion Workflow...")

# Reference Data
ingest_reference_data_folder(REFERENCE_DATA_FOLDER)

# Master Data
ingest_master_data(MASTER_DATA_WELL, WELL_DMS_URL, "Well")
ingest_master_data(MASTER_DATA_WELLBORE, WELLBORE_DMS_URL, "Wellbore")

# Time Series Metadata
ingest_metadata(METADATA_WELLLOG, WELLLOG_DMS_URL, "Well Log")
ingest_metadata(METADATA_MARKERSET, MARKERSET_DMS_URL, "Wellbore Marker Set")
ingest_metadata(METADATA_TRAJECTORY, TRAJECTORY_DMS_URL, "Trajectory")

# Parquet Uploads
ingest_welllog_parquet_data(
    welllog_json_path=METADATA_WELLLOG,
    parquet_folder=PARQUET_LOGS_FOLDER
)

ingest_trajectory_parquet_data(
    trajectory_json_path=METADATA_TRAJECTORY,
    parquet_folder=PARQUET_TRAJECTORIES_FOLDER
)

print("ADME Ingestion Workflow Complete.")
