
# Changes - Stage 1 (Bronze)

### Import Libraries

In [0]:
import json

from open_commerce_data_pipelines.core.pipelines.bronze.changes import CdcChangesBronzeStage1Pipeline, \
    CdcChangesBronzeStage1PipelineConfig


### Widget Definitions

In [0]:
dbutils.widgets.text("bronze_database_path", "", "Bronze-Tier Database Path")
dbutils.widgets.text("changes_raw_input_region", "us-east-1", "Raw Changes Input Stream Region")
dbutils.widgets.text("changes_raw_input_stream", "", "Raw Changes Input Stream Name")
dbutils.widgets.text("changes_raw_input_stream_opts", "{}", "Raw Changes Input Stream Options")

### Configuration Parameters

In [0]:
BRONZE_DATABASE_LOCATION = dbutils.widgets.get("bronze_database_path")
if not BRONZE_DATABASE_LOCATION:
    raise RuntimeError("bronze_database_path is required")
print(f"BRONZE_DATABASE_LOCATION={BRONZE_DATABASE_LOCATION}")

CHANGES_RAW_INPUT_REGION = dbutils.widgets.get("changes_raw_input_region")
print(f"CHANGES_RAW_INPUT_REGION={CHANGES_RAW_INPUT_REGION}")

CHANGES_RAW_INPUT_STREAM = dbutils.widgets.get("changes_raw_input_stream")
if not CHANGES_RAW_INPUT_STREAM:
    raise RuntimeError("changes_raw_input_stream is required")
print(f"CHANGES_RAW_INPUT_STREAM={CHANGES_RAW_INPUT_STREAM}")

CHANGES_RAW_INPUT_STREAM_OPTS = dbutils.widgets.get("changes_raw_input_stream_opts")
print(f"CHANGES_RAW_INPUT_STREAM_OPTS={CHANGES_RAW_INPUT_STREAM_OPTS}")

### Pipeline Invocation

In [0]:
if CHANGES_RAW_INPUT_STREAM_OPTS:
    opts = json.loads(CHANGES_RAW_INPUT_STREAM_OPTS)

    config = CdcChangesBronzeStage1PipelineConfig(changes_raw_input_region=CHANGES_RAW_INPUT_REGION,
                                                  input_location=CHANGES_RAW_INPUT_STREAM,
                                                  output_database_location=BRONZE_DATABASE_LOCATION,
                                                  **opts)
else:
    config = CdcChangesBronzeStage1PipelineConfig(changes_raw_input_region=CHANGES_RAW_INPUT_REGION,
                                                  input_location=CHANGES_RAW_INPUT_STREAM,
                                                  output_database_location=BRONZE_DATABASE_LOCATION)

pipeline = CdcChangesBronzeStage1Pipeline(config)

In [0]:
query = pipeline.run()
if not query.awaitTermination(21600):
    query.stop()