# Dataplex Metadata Export Demo (Self-Contained)

This notebook demonstrates how to export Dataplex metadata and create a BigQuery reporting table. 
All logic is contained within this notebook for ease of use.

**Configuration is defined via variables in the cell below.**

In [None]:
import sys
import os
import json
import time
from dataclasses import dataclass
from typing import List, Dict, Any

from google.cloud import dataplex_v1
from google.cloud import storage
from google.cloud import bigquery

## Configuration Variables

Set your project details here.

In [None]:
# --- CONFIGURATION ---
PROJECT_ID = "your-project-id"
LOCATION = "us-central1"

# Dataplex Export Scope
DATAPLEX_PROJECT_IDS = ["your-project-id"]
DATAPLEX_ITEM_TYPES = ["ASPECT_TYPE", "ENTRY"] # Options: ASPECT_TYPE, ENTRY, ASPECT, FULL

# GCS Output
GCS_BUCKET_NAME = "your-bucket-name"
GCS_OUTPUT_PATH = "dataplex-export/"

# BigQuery Reporting Table
BQ_DATASET_ID = "your_dataset_id"
# ---------------------

## Config Classes

Helper classes to structure the configuration.

In [None]:
@dataclass
class DataplexConfig:
    scope: Dict[str, Any]

@dataclass
class GCSConfig:
    bucket_name: str
    output_path: str

@dataclass
class BigQueryConfig:
    dataset_id: str

# Instantiate Config Objects from Variables
dataplex_config = DataplexConfig(scope={
    "project_ids": DATAPLEX_PROJECT_IDS,
    "item_types": DATAPLEX_ITEM_TYPES
})

gcs_config = GCSConfig(
    bucket_name=GCS_BUCKET_NAME,
    output_path=GCS_OUTPUT_PATH
)

bq_config = BigQueryConfig(
    dataset_id=BQ_DATASET_ID
)

print(f"Configuration loaded for project: {PROJECT_ID}")

## Dataplex Logic

Functions to trigger the metadata export job.

In [None]:
def trigger_export_job(project_id: str, location: str, dataplex_config: DataplexConfig, gcs_config: GCSConfig) -> str:
    """Triggers a Dataplex metadata export job."""
    client = dataplex_v1.CatalogServiceClient()
    
    parent = f"projects/{project_id}/locations/{location}"
    
    metadata_job = dataplex_v1.MetadataJob()
    metadata_job.type_ = dataplex_v1.MetadataJob.Type.EXPORT
    
    export_spec = dataplex_v1.MetadataJob.ExportSpec()
    export_spec.output_path = f"gs://{gcs_config.bucket_name}/{gcs_config.output_path}"
    
    # Construct the Scope object manually to be safe and support item_types
    scope_msg = dataplex_v1.MetadataJob.ExportSpec.Scope()
    
    if 'project_ids' in dataplex_config.scope:
        scope_msg.project_ids.extend(dataplex_config.scope['project_ids'])
        
    if 'item_types' in dataplex_config.scope:
        # Mapping strings to enums
        item_type_map = {
            "ASPECT_TYPE": dataplex_v1.MetadataJob.ExportSpec.Scope.ItemType.ASPECT_TYPE,
            "ENTRY": dataplex_v1.MetadataJob.ExportSpec.Scope.ItemType.ENTRY,
            "ASPECT": dataplex_v1.MetadataJob.ExportSpec.Scope.ItemType.ASPECT,
            "FULL": dataplex_v1.MetadataJob.ExportSpec.Scope.ItemType.FULL
        }
        
        for it in dataplex_config.scope['item_types']:
            if it in item_type_map:
                scope_msg.item_types.append(item_type_map[it])
            else:
                print(f"Warning: Unknown item type {it}, skipping.")

    export_spec.scope = scope_msg
    
    request = dataplex_v1.CreateMetadataJobRequest(
        parent=parent,
        metadata_job=metadata_job
    )
    
    metadata_job.export_spec = export_spec
    
    operation = client.create_metadata_job(request=request)
    print("Triggered metadata export job...")
    response = operation.result() # Wait for completion
    
    return response.name

## GCS Logic

Functions to read exported metadata.

In [None]:
def read_exported_metadata(project_id: str, config: GCSConfig) -> List[Dict[str, Any]]:
    """Reads exported metadata JSONL files from GCS."""
    client = storage.Client(project=project_id)
    bucket = client.bucket(config.bucket_name)
    
    blobs = bucket.list_blobs(prefix=config.output_path)
    
    metadata = []
    for blob in blobs:
        if blob.name.endswith('.jsonl'):
            print(f"Reading {blob.name}...")
            content = blob.download_as_text()
            for line in content.splitlines():
                if line.strip():
                    metadata.append(json.loads(line))
                    
    return metadata

## BigQuery Logic

Functions to create the BigLake reporting table.

In [None]:
def create_metadata_reporting_table(project_id: str, location: str, bq_config: BigQueryConfig, gcs_config: GCSConfig):
    """Creates a BigLake table for reporting on exported Dataplex metadata."""
    client = bigquery.Client(project=project_id, location=location)
    
    dataset_ref = bigquery.DatasetReference(project_id, bq_config.dataset_id)
    
    # Ensure dataset exists
    try:
        client.get_dataset(dataset_ref)
    except Exception:
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = location
        client.create_dataset(dataset)
        print(f"Created dataset {bq_config.dataset_id}")

    table_name = "dataplex_metadata_export"
    table_ref = dataset_ref.table(table_name)
    
    # Construct ExternalConfig for BigLake
    source_uri = f"gs://{gcs_config.bucket_name}/{gcs_config.output_path.rstrip('/')}/*"
    
    external_config = bigquery.ExternalConfig("HIVE_PARTITIONING")
    external_config.source_uris = [source_uri]
    external_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
    
    # Configure Hive partitioning
    external_config.hive_partitioning = bigquery.HivePartitioningOptions()
    external_config.hive_partitioning.mode = "AUTO"
    external_config.hive_partitioning.source_uri_prefix = f"gs://{gcs_config.bucket_name}/{gcs_config.output_path.rstrip('/')}"

    table = bigquery.Table(table_ref)
    table.external_data_configuration = external_config
    
    # Try to create (or update)
    try:
        client.create_table(table, exists_ok=True)
        print(f"Created/Updated BigLake table {bq_config.dataset_id}.{table_name}")
    except Exception as e:
        print(f"Failed to create table {table_name}: {e}")

## Execution

Trigger job, verify data, and create table.

In [None]:
# Trigger Job
print("Triggering Dataplex metadata export job...")
# job_name = trigger_export_job(PROJECT_ID, LOCATION, dataplex_config, gcs_config)
# print(f"Job triggered: {job_name}")

# Verify Data
print("Reading exported metadata from GCS...")
metadata = read_exported_metadata(PROJECT_ID, gcs_config)
print(f"Read {len(metadata)} metadata items.")
if metadata:
    print("Sample item:")
    print(json.dumps(metadata[0], indent=2))

# Create Table
print("Creating BigLake table for metadata reporting...")
create_metadata_reporting_table(PROJECT_ID, LOCATION, bq_config, gcs_config)
print("Done.")