Here we have to define the infrastructure operations for the first deploy and maybe the update of the infrastructure if a re-deploy is needed

* creation of log group and custom log to be used in data science pipeline
* creation of data science pipeline --> here we should in future define a wrapper to let data scientist define if to use 1 or more steps
* creation of project - task - application - publishing task to application - scheduler - scheduled task

in future maybe also add the monitoring of infrastructure

In [2]:
import oci
from oci.logging.models import CreateLogDetails, UpdateLogDetails, CreateLogGroupDetails
import requests
import json
import ads
from ads.pipeline import  Pipeline, PipelineStep, PipelineRun, GitPythonRuntime, CustomScriptStep
from ads.jobs import DataScienceJob

# Initialize the signer with Resource Principal
resource_principal_signer = oci.auth.signers.get_resource_principals_signer()

# Initialize the authorization with Resource Principal for ads library
ads.set_auth(auth='resource_principal')

# LOAD CONFIG

In [3]:
with open("infra_config.json", "r") as config_file:
    config = json.load(config_file)

# CLIENT CONFIGURATION

In [None]:
# Create a logging client with the signer
log_client = oci.logging.LoggingManagementClient({}, signer=resource_principal_signer)

# Initialize the OCI Data Integration client with the signer
data_integration_client = oci.data_integration.DataIntegrationClient({}, signer=resource_principal_signer)

# LOGGING STEP

In [None]:
# Step 1: Create a log group inside the compartment
log_group = log_client.create_log_group(
    CreateLogGroupDetails(
        compartment_id=config['env_config']['compartment_id'],
        display_name=config['log_config']['log_group_display_name'],
        defined_tags=config['env_config']['defined_tags']
    )
)

# Step 2: Retrieve OCID for the created log group
list_log_groups_response = log_client.list_log_groups( 
    compartment_id=config['env_config']['compartment_id'],
    sort_by="displayName",
    sort_order="DESC")

for log_group in list_log_groups_response.data:    
    if log_group.display_name == config['log_config']['log_group_display_name']:
        log_group_id = log_group.id
        
# Step 3: Create a custom log inside the created log group
log = log_client.create_log(
    log_group_id=log_group_id,
    create_log_details=CreateLogDetails(
        display_name=config['log_config']['custom_log_display_name'],
        log_type="CUSTOM",
        retention_duration=30,
        defined_tags=config['env_config']['defined_tags']
    )
)

# Step 4: Retrieve custom log OCID
list_log_groups_response = log_client.list_logs( 
    log_group_id=log_group_id,
    sort_by="displayName",
    sort_order="DESC")

for log in list_log_groups_response.data:    
    if log.display_name == config['log_config']['custom_log_display_name']:
        log_id = log.id

# PIPELINE DEFINITION

In [None]:
step_1_name = config['env_config']['project_name']+"_model_step"
step_2_name = config['env_config']['project_name']+"_kafka_step"
step_3_name = config['env_config']['project_name']+"_aep_step"
step_4_name = config['env_config']['project_name']+"_evidently_step"
step_5_name = config['env_config']['project_name']+"_ethic_step"

# pipeline steps definition
step_1 = (
    PipelineStep(config['env_config']['project_name']+"_model_step")
    .with_description(config['env_config']['project_name']+"_model_step")
    .with_infrastructure(
        CustomScriptStep()
        .with_log_group_id(log_group_id)
        .with_log_id(log_id)
        .with_shape_name("VM.Standard.E4.Flex") # hardcoded
        .with_shape_config_details(memory_in_gbs=config['ds_config']['step_shape']['model']['step_ram'], 
                                   ocpus=config['ds_config']['step_shape']['model']['step_cpu']
                                   )
        .with_block_storage_size(config['ds_config']['steph_shape']['model']['step_volume'])
        .with_defined_tag(**config['env_config']['defined_tags'])
    )
    .with_runtime(
        GitPythonRuntime()
        .with_environment_variable(GREETINGS="Welcome to OCI Data Science")
        .with_custom_conda(config['ds_config']['custom_conda'])
        .with_source(config['ds_config']['source_url'])
        .with_entrypoint(config['ds_config']['entrypoint']['model'])
        .with_defined_tag(**config['env_config']['defined_tags'])
    )
)
step_2 = (
    PipelineStep(config['env_config']['project_name']+"_kafka_step")
    .with_description(config['env_config']['project_name']+"_kafka_step")
    .with_infrastructure(
        CustomScriptStep()
        .with_log_group_id(log_group_id)
        .with_log_id(log_id)
        .with_shape_name("VM.Standard.E4.Flex") # hardcoded
        .with_shape_config_details(memory_in_gbs=config['ds_config']['step_shape']['kafka']['step_ram'], 
                                   ocpus=config['ds_config']['step_shape']['kafka']['step_cpu']
                                   )
        .with_block_storage_size(config['ds_config']['steph_shape']['kafka']['step_volume'])
        .with_defined_tag(**config['env_config']['defined_tags'])
    )
    .with_runtime(
        GitPythonRuntime()
        .with_environment_variable(GREETINGS="Welcome to OCI Data Science")
        .with_custom_conda(config['ds_config']['custom_conda'])
        .with_source(config['ds_config']['source_url'])
        .with_entrypoint(config['ds_config']['entrypoint']['kafka'])
        .with_defined_tag(**config['env_config']['defined_tags'])
    )
)
step_3 = (
    PipelineStep(config['env_config']['project_name']+"_aep_step")
    .with_description(config['env_config']['project_name']+"_aep_step")
    .with_infrastructure(
        CustomScriptStep()
        .with_log_group_id(log_group_id)
        .with_log_id(log_id)
        .with_shape_name("VM.Standard.E4.Flex") # hardcoded
        .with_shape_config_details(memory_in_gbs=config['ds_config']['step_shape']['aep']['step_ram'], 
                                   ocpus=config['ds_config']['step_shape']['aep']['step_cpu']
                                   )
        .with_block_storage_size(config['ds_config']['steph_shape']['aep']['step_volume'])
        .with_defined_tag(**config['env_config']['defined_tags'])
    )
    .with_runtime(
        GitPythonRuntime()
        .with_environment_variable(GREETINGS="Welcome to OCI Data Science")
        .with_custom_conda(config['ds_config']['custom_conda'])
        .with_source(config['ds_config']['source_url'])
        .with_entrypoint(config['ds_config']['entrypoint']['aep'])
        .with_defined_tag(**config['env_config']['defined_tags'])
    )
)
step_4 = (
    PipelineStep(config['env_config']['project_name']+"_evidently_step")
    .with_description(config['env_config']['project_name']+"_evidently_step")
    .with_infrastructure(
        CustomScriptStep()
        .with_log_group_id(log_group_id)
        .with_log_id(log_id)
        .with_shape_name("VM.Standard.E4.Flex") # hardcoded
        .with_shape_config_details(memory_in_gbs=config['ds_config']['step_shape']['evidently']['step_ram'], 
                                   ocpus=config['ds_config']['step_shape']['evidently']['step_cpu']
                                   )
        .with_block_storage_size(config['ds_config']['steph_shape']['evidently']['step_volume'])
        .with_defined_tag(**config['env_config']['defined_tags'])
    )
    .with_runtime(
        GitPythonRuntime()
        .with_environment_variable(GREETINGS="Welcome to OCI Data Science")
        .with_custom_conda(config['ds_config']['custom_conda'])
        .with_source(config['ds_config']['source_url'])
        .with_entrypoint(config['ds_config']['entrypoint']['evidently'])
        .with_defined_tag(**config['env_config']['defined_tags'])
    )
)
step_5 = (
    PipelineStep(config['env_config']['project_name']+"_ethic_step")
    .with_description(config['env_config']['project_name']+"_ethic_step")
    .with_infrastructure(
        CustomScriptStep()
        .with_log_group_id(log_group_id)
        .with_log_id(log_id)
        .with_shape_name("VM.Standard.E4.Flex") # hardcoded
        .with_shape_config_details(memory_in_gbs=config['ds_config']['step_shape']['ethic']['step_ram'], 
                                   ocpus=config['ds_config']['step_shape']['ethic']['step_cpu']
                                   )
        .with_block_storage_size(config['ds_config']['steph_shape']['ethic']['step_volume'])
        .with_defined_tag(**config['env_config']['defined_tags'])
    )
    .with_runtime(
        GitPythonRuntime()
        .with_environment_variable(GREETINGS="Welcome to OCI Data Science")
        .with_custom_conda(config['ds_config']['custom_conda'])
        .with_source(config['ds_config']['source_url'])
        .with_entrypoint(config['ds_config']['entrypoint']['ethic'])
        .with_defined_tag(**config['env_config']['defined_tags'])
    )
)
# pipeline definition
pipeline = (
      Pipeline(config['env_config']['project_name']+"_pipeline")
      .with_compartment_id(config['env_config']['compartment_id'])
      .with_project_id(config['ds_config']['project_id'])
      .with_log_group_id(log_group_id)
      .with_log_id(log_id)
      .with_enable_service_log(True)
      .with_step_details([step_1, step_2, step_3, step_4, step_5])
      .with_dag([f"{step_1_name} >> ({step_2_name} , {step_3_name} , {step_4_name} , {step_5_name})"])
      .with_defined_tags(config['env_config']['defined_tags'])
  )

# DATA INTEGRATION 

In [None]:
# Step 1: Create a Data Integration project
project = data_integration_client.create_project(
    workspace_id=config['odi_config']['workspace_id'],
    create_project_details=oci.data_integration.models.CreateProjectDetails(
        name=config['odi_config']['odi_project_name'],
        identifier=config['odi_config']['odi_project_name'].str.upper(),
        description=config['odi_config']['odi_project_name']
    )
)


# Step 2: Create a REST Task that points to the Data Science Pipeline
#TODO fix creation of rest task, it doesn't take endpoint and requesto body dictionary
rest_task = data_integration_client.create_task(
    workspace_id=config['odi_config']['workspace_id'],
    create_task_details=oci.data_integration.models.CreateTaskFromRestTask(
        name=config['odi_config']['odi_rest_task_name'],
        identifier=config['odi_config']['odi_rest_task_name'].str.upper(),
        description=config['odi_config']['odi_rest_task_name']
        model_type="REST_TASK", # hardcoded
        auth_config=oci.data_integration.models.ResourcePrincipalAuthConfig(
            resource_principal_source="WORKSPACE" # hardcoded
        ),
        execute_rest_call_config=oci.data_integration.models.ExecuteRestCallConfig(
            method_type="POST", # hardcoded
            config_values=oci.data_integration.models.ConfigValues(
                config_param_values=dict(
                    projectId=oci.data_integration.models.ConfigParameterValue(string_value=config['ds_config']['project_id']),
                    compartmentId=oci.data_integration.models.ConfigParameterValue(string_value=config['env_config']['compartment_id']),
                    pipelineId=oci.data_integration.models.ConfigParameterValue(string_value=pipeline['id']),
                    displayName=oci.data_integration.models.ConfigParameterValue(string_value=pipeline['display_name']),
                    definedTags=oci.data_integration.models.ConfigParameterValue(string_value=config['env_config']['defined_tags'])
                )
                                     
                                    
            )
        ),
        registry_metadata=oci.data_integration.models.RegistryMetadata(
            aggregator_key=project.data.key
        )
    )
)


# Step 3: Create an Application
create_application_response = data_integration_client.create_application(
    workspace_id=config['odi_config']['workspace_id'],
    create_application_details=oci.data_integration.models.CreateApplicationDetails(
        name=config['odi_config']['odi_application_name'],
        identifier=config['odi_config']['odi_application_name'].str.upper(),
        description=config['odi_config']['odi_application_name'],
        display_name=config['odi_config']['odi_application_name'].str.upper(),
        model_type="INTEGRATION_APPLICATION", # hardcoded
        defined_tags=config['env_config']['defined_tags']
        )
    )


# Step 4: Publish REST TAsk in Application
create_patch_response = data_integration_client.create_patch(
    workspace_id=config['odi_config']['workspace_id'],
    application_key=create_application_response.data.key,
    create_patch_details=oci.data_integration.models.CreatePatchDetails(
        name=config['odi_config']['odi_publish_task_patch_name'],
        identifier=config['odi_config']['odi_publish_task_patch_name'].str.upper(), # this should be every time changed with an incremental because other way it fails
        description=config['odi_config']['odi_publish_task_patch_name'],
        patch_type="PUBLISH", # hardcoded
        object_keys=[rest_task.data.key]        
    )
)


# STep 5: Create SCheduler in Application
#TODO here may be better to define apriori a set of sheduler and add their keys in config
#TODO for now let's assume we create the specific scheduler for every project deployment
create_schedule_response = data_integration_client.create_schedule(
    workspace_id=config['odi_config']['workspace_id'],
    application_key=create_application_response.data.key,
    create_schedule_details=oci.data_integration.models.CreateScheduleDetails(
        name=config['odi_config']['odi_app_scheduler_name'],
        identifier=config['odi_config']['odi_app_scheduler_name'].str.upper(),
        description=config['odi_config']['odi_app_scheduler_name'],
        frequency_details=oci.data_integration.models.DailyFrequencyDetails( # one of ["MonthlyFrequencyDetails","MonthlyRuleFrequencyDetails","CustomFrequencyDetails","DailyFrequencyDetails","WeeklyFrequencyDetails","HourlyFrequencyDetails"]
            model_type="DAILY", # hardcoded
            time=oci.data_integration.models.Time(
                hour=6, # hardcoded
                minute=30, # hardcoded
                second=0 # hardcoded
            )
        ),
        timezone="UTC", # hardcoded
        is_daylight_adjustment_enabled=True, # hardcoded
    )
)


# Step 6: Create Scheduled Task combining published task and scheduler
create_task_schedule_response = data_integration_client.create_task_schedule(
    workspace_id=config['odi_config']['workspace_id'],
    application_key=create_application_response.data.key,
    create_task_schedule_details=oci.data_integration.models.CreateTaskScheduleDetails(
        name=config['odi_config']['odi_rest_task_name']+"_scheduled",
        identifier=config['odi_config']['odi_rest_task_name'].str.upper()+"_SCHEDULED",
        description=config['odi_config']['odi_rest_task_name']+"_scheduled",
        registry_metadata=oci.data_integration.models.RegistryMetadata(
            aggregator_key=rest_task.data.key
        ),
        schedule_ref=oci.data_integration.models.Schedule(
            key=create_schedule_response.data.key,
            name=config['odi_config']['odi_app_scheduler_name'],
            identifier=config['odi_config']['odi_app_scheduler_name'].str.upper()
        ),
        is_enabled=True, # hardcoded
        auth_mode="RESOURCE_PRINCIPAL" # hardcoded
    )
)