# 合约消息机制及自定义合约消息

合约消息机制用于 AlphaMed 平台上不同节点之间互相传递信号，比如控制信息、状态信号等数据量比较小的场景。在管理控制场景中，多用于与平台其它管理模块的交互。在需要多个参与方配合计算的场景中，多用于控制计算流程、状态切换、传递核心参数等操作。

## 合约消息机制简介

目前 AlphaMed 平台上的合约分为两种：
- 系统级合约，主要用于任务管理器管理、控制任务流程，比如启动任务、关闭任务、上传任务运行结果等。系统级合约由 BASS 层预定义，算法模块在任务的必要阶段调用接口，不能定制接口行为。
- 任务级合约，主要用于指定任务内的相关节点互相协同，比如任务内节点同步状态等。任务级合约内部携带的文本内容可以不同，通过不同的文本内容传递不同的信息。任务级合约只在指定的任务中有效，其它任务中的节点，即使是同一批节点，也无法收到当前任务的任何消息。

## 系统级合约接口

系统级合约分为两类：BASS 系统合约和任务运行时服务合约。

BASS 系统合约可以理解为全局性的控制合约。BASS 系统合约的主要使用场景是：控制学习任务的使用者通过代码触发相关操作，所以在接口调用时需要指定对应任务的 `task_id`，`task_id` 不同操作对象就不同。因此其操作对象不限于某一个特定的任务。

任务运行时服务合约可以理解为处理特定任务内部的控制消息。任务运行时服务合约的使用对象是调度器实现代码，由代码逻辑控制，不由人工控制。所以合约工具初始化时需要指定所属任务的 `task_id`，初始化完成后将与此任务绑定，只针对此任务的上下文操作。

### BASS 系统合约接口介绍

BASS 系统合约接口定义在 `bass.BassProxy` 中，目前有两个接口，一个用于数据验证，一个用于启动计算任务。

In [1]:
def notify_dataset_state(self, task_id: str, verified: bool, cause_of_failuer: str):
    """向任务管理器通报本地数据验证结果。
    
    参数说明:
        task_id: 数据验证相关的任务 ID。
        verified: 验证结果是否成功，成功为 True。
        cause_of_failuer: 失败时提供失败原因说明，成功时可忽略。
    """

***`notify_dataset_state` 接口目前仅有异构联邦任务使用，后续版本迭代时可能会统一修改。***

In [2]:
def launch_task(self, task_id: str, pickle_file_key: str) -> bool:
    """通知任务管理器启动计算任务。

    参数说明:
        task_id: 数据验证相关的任务 ID。
        pickle_file_key: 任务调度器启动任务所需的文件压缩包文件 key，其中包含：开发者在
        Notebook 中编写的任务代码、开发者在 Notebook 环境中上传的相关代码文件（夹）、
        可选的 requirements.txt 依赖文件。调度器事先将这些文件打包上传（通过任务管理器
        提供的上传接口），上传成功后接口返回的访问 key。

    返回值:
        是否启动成功。
    """

### BASS 系统合约接口的使用和调试

要调用 BASS 系统合约接口，需要首先初始化一个 BassProxy 对象，初始化不需要任何参数。

In [None]:
from alphafed.bass import BassProxy

proxy = BassProxy()
proxy.notify_dataset_state(task_id='TASK_ID', verified=False, cause_of_failuer='加载本地数据失败')
proxy.launch_task(task_id='TASK_ID', pickle_file_key='pickle_file_key')

BASS 系统合约接口可以运行在模拟环境中，但是在模拟环境中时不会触发实际操作。因此开发者可以在代码中必要的位置预置好代码，在模拟环境中运行不会产生副作用。需要实际执行的时候，只需要移除模拟环境，接口实现会自动开始工作，减少切换环境时的代码迁移工作量。

在模拟环境下运行的代码如下：

In [3]:
from alphafed import mock_context
from alphafed.bass import BassProxy


with mock_context():
    proxy = BassProxy()
    # 通知任务管理器数据验证状态
    proxy.notify_dataset_state(task_id='TASK_ID', verified=False, cause_of_failuer='加载本地数据失败')
    # 启动指定学习任务
    proxy.launch_task(task_id='TASK_ID', pickle_file_key='pickle_file_key')

Without specifying nodes, query_nodes returns an empty list.


do something before
do something after


### 任务运行时服务合约接口介绍

任务运行时服务合约接口定义在 `contractor.TaskContractor` 中，以下是任务运行时服务合约接口的列表和介绍：

In [4]:
from typing import List, Optional

In [6]:
def query_nodes(self) -> List[str]:
    """查询当前任务所有参与方的 ID 列表。

    模拟模式下返回的列表，是进入模拟环境时通过 nodes 配置的列表，没有配置的话返回空列表。
    """

In [7]:
def upload_file(self, fp, persistent: bool = False, upload_name: str = None) -> str:
    """上传文件。

    模拟模式下调用上传接口依然会执行上传操作，且返回一个可访问的 URL。但是此时不支持长期保存，
    无论 persistent 设置为何值，都只会临时保存文件，很快会被自动清除。

    参数说明:
        fp: 文件指针，可以是文件路径的字符串，也可以是已打开的文件流对象。
        persistent: 是否长期保存？非长期保存的文件会在一定时间后被自动清除。
        upload_name: 上传后使用的文件名，仅在上传文件流是有效。

    返回值:
        可访问的文件 URL。
    """

In [8]:
def report_progress(self, percent: int):
    """上报任务进度，进度值为 0 - 100 之间的整数，代表 N%。

    模拟模式下调用会直接返回。
    """

### 任务运行时服务合约接口的使用和调试

要调用任务运行时服务合约接口，需要首先初始化一个 `TaskContractor` 对象，初始化必须指定所属的任务 ID。如果任务 ID 指定错误，所有接口都不会正常工作。也正是因为初始化 `TaskContractor` 对象时指定了任务 ID，所以里面的所有接口都不需要传递 `task_id` 参数。

In [None]:
from alphafed.contractor import TaskContractor

contractor = TaskContractor(task_id='TASK_ID')
contractor.query_nodes()
contractor.upload_file('LOCAL_FILE', persistent=False, upload_name='NEW_NAME')
contractor.report_progress(35)

模拟环境下运行的行为已经在接口说明中阐述，下面只展示示例代码：

In [9]:
from alphafed import mock_context
from alphafed.contractor import TaskContractor


with mock_context(nodes=['node 1', 'node 2', 'node 3']):
    contractor = TaskContractor(task_id='TASK_ID')
    node_list = contractor.query_nodes()
    print(f'参加任务的节点包括: {node_list}')
    file_url = contractor.upload_file('./6. 合约消息机制简介.ipynb', persistent=False)
    print(f'文件地址为: {file_url}')
    contractor.report_progress(35)

do something before
do something after


## 任务级合约接口

任务级合约主要用来在节点间传递数据，但也有两个用于任务控制的接口，分别用于上传任务运行结果和通知任务管理器任务结束。

In [10]:
from typing import Union
from alphafed.contractor import ContractEvent

In [11]:
def apply_sending_data(self, source: str, target: Union[str, List[str]], **kwargs) -> str:
    """申请发送数据。

    主要用于数据传输流程的控制。AlphaMed 平台已经提供了数据传输工具，因此大部分情况下
    开发者不会用到这个接口。

    参数说明:
        source:
            数据发送源节点 ID。
        target:
            数据发送目标节点 ID 或 ID 列表。
        kwargs:
            其它参数，由具体实现使用。
    """

def deny_sending_data(self,
                      target: str,
                      session_id: str,
                      rejecter: str,
                      cause: str = None) -> None:
    """拒绝接收数据。

    主要用于数据传输流程的控制。AlphaMed 平台已经提供了数据传输工具，因此大部分情况下
    开发者不会用到这个接口。

    参数说明:
        target:
            消息发送目标节点 ID。
        session_id:
            数据发送 Session ID，从发送申请消息中获取。
        rejecter:
            当前拒绝节点 ID。
        cause:
            拒绝原因。
    """

def accept_sending_data(self, target: str, session_id: str, **kwargs) -> None:
    """接受数据传输。

    主要用于数据传输流程的控制。AlphaMed 平台已经提供了数据传输工具，因此大部分情况下
    开发者不会用到这个接口。

    参数说明:
        target:
            消息发送目标节点 ID。
        session_id:
            数据发送 Session ID，从发送申请消息中获取。
        kwargs:
            其它参数，由具体实现使用。
    """

In [12]:
def contract_events(self, timeout: int = 0) -> ContractEvent:
    """返回收到合约消息的迭代器接口。
    
    参数说明:
        timeout: 接收消息的超时时间，达到超时时间后退出消息监听。

    模拟环境中底层实现机制不同，但对调用者而言功能与正式环境一致。
    """

In [13]:
def upload_task_achivement(self,
                           aggregator: str,
                           model_file: str,
                           report_file: str = ''):
    """向任务管理器上传任务运行结果，包括模型文件和/或指标文件。

    参数说明:
        aggregator: 聚合节点 ID。
        model_file: 模型文件地址。
        report_file: 评估指标文件地址。

    模拟环境中依然会发送消息通知各相关方，但不会触发任务管理器执行实际操作。
    """

In [14]:
def notify_task_completion(self, result: bool):
    """通知任务管理器任务完成。

    参数说明:
        result: 是否成功结束。

    模拟环境中依然会发送消息通知各相关方，但不会触发任务管理器执行实际操作。
    """

In [15]:
# 以下三个接口是 TaskContractor 中对应接口的 shortcut，直接转发调用，不再赘述。

def query_nodes(self) -> List[str]:
    ...
def report_progress(self, percent: int):
    ...
def upload_file(self, fp, persistent: bool = False, upload_name: str = None) -> str:
    ...

## 任务级合约接口的使用和调试

要调用任务级合约接口，需要首先初始化一个 `TaskMessageContractor` 对象，初始化必须指定所属的任务 ID。如果任务 ID 指定错误，所有接口都不会正常工作。也正是因为初始化 `TaskMessageContractor` 对象时指定了任务 ID，所以里面的所有接口都不需要传递 `task_id` 参数。

In [None]:
from alphafed.contractor import TaskMessageContractor

contractor = TaskMessageContractor(task_id='TASK_ID')
contractor.upload_task_achivement(aggregator='NODE_ID',
                                  model_file='MODEL_FILE',
                                  report_file='REPORT_FILE')
contractor.notify_task_completion(result=True)

# 调用 contract_events 时如果不指定 timeout，会监听新消息直至永远
for event in contractor.contract_events(timeout=30):
    print(f'收到了一个新消息: {event}')
    break  # 收到新消息后可通过 break、continue、return 等关键字控制跳出循环或继续接收新消息

模拟环境中运行合约消息接口，能够收到发给自己的消息。

In [2]:
from alphafed import mock_context
from alphafed.contractor import TaskMessageContractor


self_node = 'NODE_ID_1'
partner_a = 'NODE_ID_2'
partner_b = 'NODE_ID_3'
with mock_context(id=self_node, nodes=[self_node, partner_a, partner_b]):
    contractor = TaskMessageContractor(task_id='TASK_ID')
    contractor.upload_task_achivement(aggregator=self_node,
                                      model_file='res/model.pt',
                                      report_file='res/report.zip')
    contractor.notify_task_completion(result=True)

    # 调用 contract_events 是如果不指定 timeout，会监听新消息直至永远
    for event in contractor.contract_events(timeout=3):
        print(f'收到了一个新消息: {event}')
        # 收到新消息后可通过 break、continue、return 等关键字控制跳出循环或继续接收新消息

收到了一个新消息: NoticeTaskCompletionEvent(type='notice_task_completion', result=True)


## 自定义任务合约消息

平台定义的消息提供了一套基本功能的实现。实际运行联邦学习时，不同算法的流程不同，就需要使用不同的消息来控制交互流程。即使是同一个算法，在不同的实现中、支持不同的细分功能时，也需要定义各自的流程细节。所以 AlphaMed 平台提供了自定义任务合约消息的机制，已突破平台预置消息类型有限的制约，支持庞大的算法实现。

自定义消息只能是任务级合约消息，且仅有数据传递功能，不会触发任务管理器等其它管理功能模块的操作。

自定义消息机制由三个核心组件组成：`ContractEvent`、`TaskMessageEventFactory`、`TaskMessageContractor`，均位于 `alphafed.contractor` 模块中。下面分别介绍。

### 通过 `ContractEvent` 定义消息内容

需要自定义一个消息类型时，自定义的消息实现要继承 `ContractEvent`，且需要注释为 `@dataclass` 类型。消息类里面定义了消息中包含的数据字段和字段类型。比如定义一个发起方发送给参与方的状态同步消息：

In [5]:
from dataclasses import dataclass
from alphafed.contractor import ContractEvent


@dataclass
class SyncEvent(ContractEvent):

    TYPE = 'sync'  # 消息类型标识，可以是任意字符串，但要保证在算法流程内部唯一

    aggregator: str  # 发起方 ID
    current_round: int  # 当前训练轮次
    participants: List[str]  # 当前参与者的 ID 列表

框架会自动解析消息定义的字段和数据类型，将其转化为字符串形式的合约文本内容，然后通过合约网络发送出去。目前 Python 的常见基础类型都支持自动处理。但是如果使用了复杂类型，比如自定义的类，就不能自动处理了，此时可以通过重新实现 `event_to_contract` 接口将消息内容转化为可以被 json 模块处理的字典类型。

In [6]:
def event_to_contract(self) -> dict:
    """将消息对象转化为可以 jsonify 的字典类型数据，以备发送。"""

上面定义的 `SyncEvent` 消息中的字段都是基本数据类型，所以可以忽略 `event_to_contract`。这样 `SyncEvent` 经过注册后就已经可以使用了。（注册的方法接下来会介绍，暂时略过。）但是这个状态的消息只能发送，不能通过 `contract_events()` 接口接收消息。要使消息能够被接收，还需要实现 `contract_to_event` 接口，将合约中的文本内容反序列化为消息对象。完整的 `SyncEvent` 代码如下：

In [7]:
@dataclass
class SyncEvent(ContractEvent):

    TYPE = 'sync'  # 消息类型标识，可以是任意字符串，但要保证在算法流程内部唯一

    aggregator: str  # 发起方 ID
    current_round: int  # 当前训练轮次
    participants: List[str]  # 当前参与者的 ID 列表

    @classmethod
    def contract_to_event(cls, contract: dict) -> 'SyncEvent':
        event_type = contract.get('type')
        aggregator = contract.get('aggregator')
        current_round = contract.get('current_round')
        participants = contract.get('participants')
        assert event_type == cls.TYPE, f'合约类型错误: {event_type}'
        assert aggregator and isinstance(aggregator, str), f'invalid aggregator: {aggregator}'
        assert current_round and isinstance(current_round, int), f'invalid current_round: {current_round}'
        assert participants and isinstance(participants, list), f'invalid participants: {participants}'
        return SyncEvent(aggregator=aggregator, current_round=current_round, participants=participants)

### 通过 `TaskMessageEventFactory` 注册自定义消息

要注册自定义的合约事件，需要定义一个继承了 `TaskMessageEventFactory` 的事件工厂类。`TaskMessageEventFactory` 中已经注册了一些预置的基础消息类型，把自己定义的事件类型加进列表里就完成了注册。

In [8]:
from alphafed.contractor import TaskMessageEventFactory


class SyncProcessEventFactory(TaskMessageEventFactory):

    _CLASS_MAP = {  # 在这里添加新消息类型
        SyncEvent.TYPE: SyncEvent,  # 自定义新消息类型
        **TaskMessageEventFactory._CLASS_MAP  # 保留支持 TaskMessageEventFactory 中已有的消息类型
    }

**注意如果没有底部的 `**TaskMessageEventFactory._CLASS_MAP` 这一行，`TaskMessageEventFactory` 中预置的消息类型将会丢失，不再支持。**

### 通过 `TaskMessageContractor` 使用自定义消息

最后一步是定义合约收发工具，对业务提供发送合约消息的接口，隐藏合约实现细节。合约收发工具需要继承 `TaskMessageContractor` 类，`TaskMessageContractor` 类提供了绝大部分需要的实现，自定义的工具类只需要补充做两件事情：
1. 将 `TaskMessageContractor` 中的消息工厂替换为前面自定义的消息工厂，这个操作在初始化方法中实现。

In [9]:
from alphafed.contractor import TaskMessageContractor


class SyncProcessContractor(TaskMessageContractor):

    def __init__(self, task_id: str):
        super().__init__(task_id=task_id)
        self._event_factory = SyncProcessEventFactory  # 替换为自定义事件工厂

2. 为自定义的消息类型提供发送接口，这样业务逻辑就可以和合约消息机制充分隔离了。一般不需要定义接收接口，`contract_events()` 会处理，除非有特殊需要。

In [10]:
from alphafed.contractor import TaskMessageContractor


class SyncProcessContractor(TaskMessageContractor):

    def __init__(self, task_id: str):
        super().__init__(task_id=task_id)
        self._event_factory = SyncProcessEventFactory  # 替换为自定义事件工厂

    def sync_state(self, aggregator: str, current_round: int, participants: List[str], querier: str):
        """发送状态同步消息。"""
        event = SyncEvent(aggregator=aggregator,
                          current_round=current_round,
                          participants=participants)
        self._new_contract(targets=[querier], event=event)

### 通过自定义消息支持自定义流程的示例

下面通过一个实例演示一遍完整的自定义流程设计实现。假设设计一个同步状态的流程，参与方先向全网广播状态查询消息，发起方收到消息后向参与方发送状态同步消息，参与方收到后向发起方发送状态同步响应，发起方收到后确认参与方准备就绪。

首先需要定义流程中使用的三个自定义消息。

In [11]:
@dataclass
class QueryStateEvent(ContractEvent):
    """状态查询消息。"""

    TYPE = 'query'

    querier: str  # 查询者 ID

    @classmethod
    def contract_to_event(cls, contract: dict) -> 'QueryStateEvent':
        event_type = contract.get('type')
        querier = contract.get('querier')
        assert event_type == cls.TYPE, f'合约类型错误: {event_type}'
        assert querier and isinstance(querier, str), f'invalid querier: {querier}'
        return QueryStateEvent(querier=querier)


@dataclass
class SyncEvent(ContractEvent):
    """状态同步消息。"""

    TYPE = 'sync'  # 消息类型标识，可以是任意字符串，但要保证在算法流程内部唯一

    aggregator: str  # 发起方 ID
    current_round: int  # 当前训练轮次
    participants: List[str]  # 当前参与者的 ID 列表

    @classmethod
    def contract_to_event(cls, contract: dict) -> 'SyncEvent':
        event_type = contract.get('type')
        aggregator = contract.get('aggregator')
        current_round = contract.get('current_round')
        participants = contract.get('participants')
        assert event_type == cls.TYPE, f'合约类型错误: {event_type}'
        assert aggregator and isinstance(aggregator, str), f'invalid aggregator: {aggregator}'
        assert current_round and isinstance(current_round, int), f'invalid current_round: {current_round}'
        assert participants and isinstance(participants, list), f'invalid participants: {participants}'
        return SyncEvent(aggregator=aggregator, current_round=current_round, participants=participants)


@dataclass
class SyncRespEvent(ContractEvent):
    """状态同步响应。"""

    TYPE = 'sync_resp'

    querier: str  # 查询者 ID

    @classmethod
    def contract_to_event(cls, contract: dict) -> 'SyncRespEvent':
        event_type = contract.get('type')
        querier = contract.get('querier')
        assert event_type == cls.TYPE, f'合约类型错误: {event_type}'
        assert querier and isinstance(querier, str), f'invalid querier: {querier}'
        return SyncRespEvent(querier=querier)

注册三个新定义的消息。

In [12]:
class SyncProcessEventFactory(TaskMessageEventFactory):

    _CLASS_MAP = {  # 在这里添加新消息类型
        QueryStateEvent.TYPE: QueryStateEvent,
        SyncEvent.TYPE: SyncEvent,
        SyncRespEvent.TYPE: SyncRespEvent,
        **TaskMessageEventFactory._CLASS_MAP  # 保留支持 TaskMessageEventFactory 中已有的消息类型
    }

设计扩充合约工具。

In [13]:
class SyncProcessContractor(TaskMessageContractor):

    def __init__(self, task_id: str):
        super().__init__(task_id=task_id)
        self._event_factory = SyncProcessEventFactory  # 替换为自定义事件工厂

    def query_state(self, querier: str):
        """发送状态查询消息。"""
        event = QueryStateEvent(querier=querier)
        self._new_contract(targets=self.EVERYONE, event=event)  # 指定 self.EVERYONE 广播消息

    def sync_state(self, aggregator: str, current_round: int, participants: List[str], querier: str):
        """发送状态同步消息。"""
        event = SyncEvent(aggregator=aggregator,
                          current_round=current_round,
                          participants=participants)
        self._new_contract(targets=[querier], event=event)  # 指定 querier 消息只会发送给 querier

    def response_sync(self, aggregator: str, querier: str):
        """发送状态同步响应消息。"""
        event = SyncRespEvent(querier=querier)
        self._new_contract(targets=[aggregator], event=event)  # 指定 aggregator 消息只会发送给 aggregator

同步流程的消息和工具定义完成。下面设计一个发起方、一个参与方，模拟运行一下同步流程。

In [None]:
# 发起方模拟脚本

task_id = '79ce0d22-22f8-4f5d-8d0f-35ad0b26db7b'  # 随机指定一个 ID
self_node = 'd54a3af2-83e2-4da6-bc8d-2dc03634612c'  # 随机指定一个 ID
# 模拟指定两个已经完成同步的参与方节点
some_others = [
    '8983ba98-74ac-41eb-9588-2c2d57ecf8cb',  # 随机指定一个 ID
    '4c6db456-3ece-4e2f-a165-585d0a6f175c'  # 随机指定一个 ID
]

def sync_state():
    # 实例化合约工具
    contractor = SyncProcessContractor(task_id=task_id)

    print('等待状态同步请求中 ...')
    for _event in contractor.contract_events():
        assert isinstance(_event, QueryStateEvent), '消息类型错误，期望收到状态查询消息'
        querier = _event.querier
        print(f'收到状态查询消息，查询者为: {querier}')
        break  # 退出消息监听
    # 发送状态同步消息
    contractor.sync_state(aggregator=self_node,
                          current_round=11,
                          participants=[self_node, *some_others],
                          querier=querier)
    print('等待状态同步响应中 ...')
    for _event in contractor.contract_events():
        assert isinstance(_event, SyncRespEvent), '消息类型错误，期望收到同步状态响应消息'
        assert querier == _event.querier, '同步异常，请求者匹配失败'
    print(f'与 {querier} 同步状态完成.')

# 模拟调试流程
nodes = [  # 消息广播会发给 nodes 中所有节点，所以需要配置 nodes 才能正常广播消息
    self_node,
    *some_others,
    '2d7514b7-f2fc-4f6c-8d88-849339bd268a' # 参与方ID
]
with mock_context(id=self_node, nodes=nodes):
    sync_state()

In [None]:
# 参与方模拟脚本

task_id = '79ce0d22-22f8-4f5d-8d0f-35ad0b26db7b'  # 必须与发起方的 task_id 相同
self_node = '2d7514b7-f2fc-4f6c-8d88-849339bd268a'  # 随机指定一个 ID

def sync_state():
    # 实例化合约工具
    contractor = SyncProcessContractor(task_id=task_id)

    print('发送状态查询请求')
    contractor.query_state(querier=self_node)
    print('等待状态同步消息 ...')
    for _event in contractor.contract_events():
        assert isinstance(_event, SyncEvent), '消息类型错误，期望收到状态同步消息'
        aggregator = _event.aggregator
        print('收到状态同步消息')
        print(f'任务发起方为: {aggregator}')
        print(f'当前训练轮次为: {_event.current_round}')
        print(f'当前任务参与方有: {_event.participants}')
        break  # 退出消息监听
    # 发送状态同步响应
    contractor.response_sync(aggregator=aggregator, querier=self_node)
    print(f'与 {aggregator} 同步状态完成.')

# 模拟调试流程
some_others = [
    '8983ba98-74ac-41eb-9588-2c2d57ecf8cb',
    '4c6db456-3ece-4e2f-a165-585d0a6f175c'
]
nodes = [  # 消息广播会发给 nodes 中所有节点，所以需要配置 nodes 才能正常广播消息
    self_node,
    *some_others,
    'd54a3af2-83e2-4da6-bc8d-2dc03634612c' # 发起方ID
]
with mock_context(id=self_node, nodes=nodes):
    sync_state()

将发起方脚本和参与方脚本分别复制到独立的 Notebook 脚本文件中，加载自定义的消息工具代码，然后运行脚本。或者可以使用整理好的[发起方脚本](res/6_initiator.ipynb)和[参与方脚本](res/6_collaborator.ipynb)，脚本启动顺序随意，看看是否能够成功完成同步流程。