In [1]:
import ai_flow as af
from ai_flow.api.ai_flow_context import init_notebook_context
from ai_flow_plugins.job_plugins import flink

from color_processor.push_processor import ModelPushProcessor
from color_processor.sample_processor import SampleProcessor, RawInputReader, QueueSinkProcessor, \
    FileSinkProcessor, DataStreamEnv, UserProfileReader, UserClickReader
from color_processor.train_procssor import BatchTrainDataReader, BatchTrainProcessor, StreamTrainProcessor, \
    TrainFlinkEnv
from color_processor.validate_processor import BatchValidateProcessor, StreamValidateProcessor, ValidateDataReader
from recommendation import config
from ai_flow.project.project_config import ProjectConfig
from ai_flow.workflow.workflow_config import WorkflowConfig
from ai_flow.workflow.job_config import JobConfig


def workflow():
    project_config:ProjectConfig = ProjectConfig()
    project_config.set_project_name('color_project')
    project_config.set_server_uri('localhost:50051')
    project_config.set_notification_server_uri('localhost:50052')
    project_config['blob'] = {'blob_manager_class': 'ai_flow_plugins.blob_manager_plugins.local_blob_manager.LocalBlobManager'}
    workflow_config: WorkflowConfig = WorkflowConfig('color_new')
    workflow_config.add_job_config('data_process', JobConfig(job_name='data_process', job_type='flink', properties={'run_mode': 'local'}))
    workflow_config.add_job_config('batch_train', JobConfig(job_name='batch_train', job_type='flink', properties={'run_mode': 'local'}))
    workflow_config.add_job_config('stream_train', JobConfig(job_name='stream_train', job_type='flink', properties={'run_mode': 'local'}))
    workflow_config.add_job_config('batch_validate', JobConfig(job_name='batch_validate', job_type='python'))
    workflow_config.add_job_config('stream_validate', JobConfig(job_name='stream_validate', job_type='python'))
    workflow_config.add_job_config('model_push', JobConfig(job_name='model_push', job_type='python'))
    af.init_notebook_context(project_config, workflow_config)

    af.register_dataset(name=config.RawQueueDataset, uri="{},{}".format(config.KafkaConn, config.RawQueueName))
    af.register_dataset(name=config.SampleFileDataset, uri=config.SampleFileDir)
    af.register_dataset(name=config.SampleQueueDataset, uri="{},{}".format(config.KafkaConn, config.SampleQueueName))
    af.register_dataset(name=config.ValidateDataset, uri=config.ValidateFileDir)
    af.register_dataset(name=config.UserProfileDataset, uri=config.UserProfileTableName)
    af.register_dataset(name=config.UserClickSnapshotDataset, uri=config.UserClickSnapshotTableName)
    af.register_dataset(name=config.UserClickDataset, uri=config.UserClickTableName)

    af.register_metric_meta(metric_name=config.BatchACC,
                            metric_type=af.MetricType.MODEL,
                            project_name=af.current_project_config().get_project_name())

    af.register_metric_meta(metric_name=config.StreamACC,
                            metric_type=af.MetricType.MODEL,
                            project_name=af.current_project_config().get_project_name())

    af.register_model(model_name=config.BatchModelName)
    af.register_model(model_name=config.StreamModelName)
    
    with af.job_config(job_name='data_process'):
        flink.set_flink_env(DataStreamEnv())
        raw_input = af.read_dataset(dataset_info=config.RawQueueDataset, read_dataset_processor=RawInputReader())
        user_profile = af.read_dataset(dataset_info=config.UserProfileDataset,
                                       read_dataset_processor=UserProfileReader())
        user_click = af.read_dataset(dataset_info=config.UserClickDataset, read_dataset_processor=UserClickReader())
        validate_sample, sample = \
            af.user_define_operation(input=[raw_input, user_profile, user_click], processor=SampleProcessor(), output_num=2)

        af.write_dataset(input=sample, dataset_info=config.SampleQueueDataset,
                         write_dataset_processor=QueueSinkProcessor())
        af.write_dataset(input=sample, dataset_info=config.SampleFileDataset, write_dataset_processor=FileSinkProcessor())
        af.write_dataset(input=validate_sample, dataset_info=config.ValidateDataset, write_dataset_processor=FileSinkProcessor())

    with af.job_config(job_name='batch_train'):
        flink.set_flink_env(TrainFlinkEnv())
        sample = af.read_dataset(config.SampleFileDataset, read_dataset_processor=BatchTrainDataReader())
        af.train(sample, model_info=config.BatchModelName, training_processor=BatchTrainProcessor(200))

    with af.job_config(job_name='batch_validate'):
        sample = af.read_dataset(dataset_info=config.ValidateDataset, read_dataset_processor=ValidateDataReader())
        af.user_define_operation(input=sample, processor=BatchValidateProcessor())

    with af.job_config(job_name='stream_train'):
        flink.set_flink_env(TrainFlinkEnv())
        sample = af.read_dataset(config.SampleQueueDataset, read_dataset_processor=BatchTrainDataReader())
        af.train(sample, model_info=config.StreamModelName, base_model_info=config.BatchModelName,
                 training_processor=StreamTrainProcessor())

    with af.job_config(job_name='stream_validate'):
        sample = af.read_dataset(dataset_info=config.SampleQueueDataset, read_dataset_processor=ValidateDataReader())
        af.user_define_operation(input=sample, processor=StreamValidateProcessor())

    with af.job_config(job_name='model_push'):
        af.push_model(model_info=config.StreamModelName, pushing_model_processor=ModelPushProcessor())

    af.action_on_job_status("batch_validate", "batch_train")

    af.action_on_model_version_event("stream_train", config.BatchModelName, 'MODEL_VALIDATED')
    af.action_on_event("stream_train", config.BatchModelName, "*", event_type='MODEL_VALIDATED',
                       sender="batch_validate", action=af.JobAction.NONE)

    af.action_on_model_version_event("stream_validate", config.StreamModelName, 'MODEL_GENERATED')
    af.action_on_event("stream_validate", config.StreamModelName, "*", event_type='MODEL_GENERATED',
                       sender="stream_train", action=af.JobAction.NONE)

    af.action_on_model_version_event("model_push", config.StreamModelName, 'MODEL_VALIDATED')
    af.action_on_event("model_push", config.StreamModelName, "*", event_type='MODEL_VALIDATED',
                       sender="stream_validate", action=af.JobAction.NONE)

    # Run workflow
    af.workflow_operation.stop_all_workflow_executions(af.current_workflow_config().workflow_name)
    af.workflow_operation.submit_workflow(af.current_workflow_config().workflow_name)
    workflow_execution = af.workflow_operation.start_new_workflow_execution(af.current_workflow_config().workflow_name)
    print(workflow_execution)


if __name__ == '__main__':
    workflow()

{"__af_object_type__": "jsonable", "__class__": "WorkflowExecutionInfo", "__module__": "ai_flow.plugin_interface.scheduler_interface", "_context": null, "_end_date": "0", "_properties": {}, "_start_date": "0", "_status": "INIT", "_workflow_execution_id": "7", "_workflow_info": {"__af_object_type__": "jsonable", "__class__": "WorkflowInfo", "__module__": "ai_flow.plugin_interface.scheduler_interface", "_namespace": "color_project", "_properties": {}, "_scheduling_rules": null, "_workflow_name": "color_new"}}
