# Federated Learning Training Plan: Host Plan & Model

### Note: This notebook is inspired from [here](https://github.com/OpenMined/PySyft/blob/master/examples/experimental/FL%20Training%20Plan/Host%20Plan.ipynb)

Here we load Plan and Model params created earlier in "Create Plan" notebook
and host them on PyGrid.

After that it should be possible to run FL worker using
SwiftSyft, KotlinSyft, syft.js, or FL python worker
and train the hosted model using local worker's data.

In [1]:
%load_ext autoreload
%autoreload 2
import warnings
warnings.filterwarnings("ignore")

import websockets
import json
import requests
import torch

import syft as sy
from syft.grid.grid_client import GridClient
from syft.serde import protobuf
from syft_proto.execution.v1.plan_pb2 import Plan as PlanPB
from syft_proto.execution.v1.state_pb2 import State as StatePB

sy.make_hook(globals())
# force protobuf serialization for tensors
hook.local_worker.framework = None

Setting up Sandbox...
Done!


In [2]:
async def sendWsMessage(data):
    async with websockets.connect('ws://' + gatewayWsUrl) as websocket:
        await websocket.send(json.dumps(data))
        message = await websocket.recv()
        return json.loads(message)

def deserializeFromBin(worker, filename, pb):
    with open(filename, "rb") as f:
        bin = f.read()
    pb.ParseFromString(bin)
    return protobuf.serde._unbufferize(worker, pb)

## Step 4a: Host in PyGrid

Here we load "ops list" Plan.
PyGrid should translate it to other types (e.g. torchscript) automatically. 

In [3]:
# Load files with protobuf created in "Create Plan" notebook.
training_plan = deserializeFromBin(hook.local_worker, "cryptly_training_plan.pb", PlanPB())
model_params_state = deserializeFromBin(hook.local_worker, "cryptly_model_params.pb", StatePB())

Follow PyGrid README.md to build `openmined/grid-gateway` image from the latest `dev` branch 
and spin up PyGrid using `docker-compose up --build`.

In [4]:
# Default gateway address when running locally 
gatewayWsUrl = "127.0.0.1:5001"
grid = GridClient(id="test", address=gatewayWsUrl, secure=False)
grid.connect()

Define name, version, configs.

In [5]:
# These name/version you use in worker
name = "Cryptly"
version = "1.0.0"
client_config = {
            "name": name,  
            "version": version,
            "batch_size": 1,  ################## Change
            "lr": 0.005,
            "max_updates": 100  # custom syft.js option that limits number of training loops per worker
        }

server_config = {
            "min_workers": 3,  # temporarily this plays role "min # of worker's diffs" for triggering cycle end event
            "max_workers": 3,
            "pool_selection": "random",
            "num_cycles": 5,
            "do_not_reuse_workers_until_cycle": 4,
            "cycle_length": 28800,
            "minimum_upload_speed": 0,
            "minimum_download_speed": 0
        }

Shoot!

If everything's good, success is returned.
If the name/version already exists in PyGrid, change them above or cleanup PyGrid db by re-creating docker containers (e.g. `docker-compose up --force-recreate`). 


In [6]:
response = grid.host_federated_training(
    model=model_params_state,
    client_plans={'training_plan': training_plan},
    client_protocols={},
    server_averaging_plan=None,
    client_config=client_config,
    server_config=server_config
)

print("Host response:", response)

Host response: {'type': 'federated/host-training', 'data': {'status': 'success'}}


Let's double-check that data is loaded by requesting a cycle.

(Request is made directly, will be methods on grid client in the future)

In [7]:
auth_request = {
    "type": "federated/authenticate",
    "data": {}
}
auth_response = await sendWsMessage(auth_request)
print('Auth response: ', json.dumps(auth_response, indent=2))

cycle_request = {
    "type": "federated/cycle-request",
    "data": {
        "worker_id": auth_response['data']['worker_id'],
        "model": name,
        "version": version,
        "ping": 1,
        "download": 10000,
        "upload": 10000,
    }
}
cycle_response = await sendWsMessage(cycle_request)
print('Cycle response:', json.dumps(cycle_response, indent=2))

worker_id = auth_response['data']['worker_id']
request_key = cycle_response['data']['request_key']
model_id = cycle_response['data']['model_id'] 
training_plan_id = cycle_response['data']['plans']['training_plan']

Auth response:  {
  "type": "federated/authenticate",
  "data": {
    "status": "success",
    "worker_id": "3fc8bf58-f7c7-45a8-91b7-bf4a95eba306"
  }
}
Cycle response: {
  "type": "federated/cycle-request",
  "data": {
    "status": "accepted",
    "request_key": "291b6118ff361d496d349bea4ff8cd78244480adac5b4a4c07a916889bbeb0dd",
    "version": "1.0.0",
    "model": "Cryptly",
    "plans": {
      "training_plan": 4
    },
    "protocols": {},
    "client_config": {
      "name": "Cryptly",
      "version": "1.0.0",
      "batch_size": 1,
      "lr": 0.005,
      "max_updates": 100
    },
    "model_id": 2
  }
}


Let's download model and plan (both versions) and check they are actually workable.

In [8]:
# Model
req = requests.get(f"http://{gatewayWsUrl}/federated/get-model?worker_id={worker_id}&request_key={request_key}&model_id={model_id}")
model_data = req.content
pb = StatePB()
pb.ParseFromString(req.content)
model_params_downloaded = protobuf.serde._unbufferize(hook.local_worker, pb)
print("Params shapes:", [p.shape for p in model_params_downloaded.tensors()])

Params shapes: [torch.Size([120, 6000]), torch.Size([120]), torch.Size([2, 120]), torch.Size([2])]


In [9]:
# Plan "list of ops"
req = requests.get(f"http://{gatewayWsUrl}/federated/get-plan?worker_id={worker_id}&request_key={request_key}&plan_id={training_plan_id}&receive_operations_as=list")
pb = PlanPB()
pb.ParseFromString(req.content)
plan_ops = protobuf.serde._unbufferize(hook.local_worker, pb)
print(plan_ops.code)
print(plan_ops.torchscript)

def training_plan(arg_1, arg_2, arg_3, arg_4, out_3, out_4, out_5, out_6):
    2 = arg_1.dim()
    var_0 = out_3.t()
    var_1 = arg_1.matmul(var_0)
    var_2 = out_4.add(var_1)
    var_3 = var_2.relu()
    2 = var_3.dim()
    var_4 = out_5.t()
    var_5 = var_3.matmul(var_4)
    var_6 = out_6.add(var_5)
    var_7 = var_6.max()
    var_8 = var_6.sub(var_7)
    var_9 = var_8.exp()
    var_10 = var_9.sum(dim=1, keepdim=True)
    var_11 = var_10.log()
    var_12 = var_8.sub(var_11)
    var_13 = arg_2.mul(var_12)
    var_14 = var_13.sum()
    var_15 = var_14.neg()
    out_1 = var_15.div(arg_3)
    var_16 = out_1.mul(0)
    var_17 = var_16.add(1)
    var_18 = var_17.div(arg_3)
    var_19 = var_18.mul(-1)
    var_20 = var_13.mul(0)
    var_21 = var_20.add(1)
    var_22 = var_21.mul(var_19)
    var_23 = var_22.mul(var_12)
    var_24 = var_22.mul(arg_2)
    var_25 = var_23.copy()
    var_26 = var_24.add(0)
    var_27 = var_24.mul(-1)
    var_28 = var_27.sum(dim=[1], keepdim=True)
    var_29 = 

In [10]:
# Plan "torchscript"
req = requests.get(f"http://{gatewayWsUrl}/federated/get-plan?worker_id={worker_id}&request_key={request_key}&plan_id={training_plan_id}&receive_operations_as=torchscript")
pb = PlanPB()
pb.ParseFromString(req.content)
plan_ts = protobuf.serde._unbufferize(hook.local_worker, pb)
print(plan_ts.code)
print(plan_ts.torchscript.code)

def training_plan(arg_1, arg_2, arg_3, arg_4, out_3, out_4, out_5, out_6):
    return out_1, out_2, out_3, out_4, out_5, out_6
def forward(self,
    argument_1: Tensor,
    argument_2: Tensor,
    argument_3: Tensor,
    argument_4: Tensor,
    argument_5: List[Tensor]) -> Tuple[Tensor, Tensor, Tensor, Tensor, Tensor, Tensor]:
  _0, _1, _2, _3, = argument_5
  _4 = torch.add(_1, torch.matmul(argument_1, torch.t(_0)), alpha=1)
  _5 = torch.matmul(torch.relu(_4), torch.t(_2))
  _6 = torch.add(_3, _5, alpha=1)
  _7 = torch.sub(_6, torch.max(_6), alpha=1)
  _8 = torch.sum(torch.exp(_7), [1], True, dtype=None)
  _9 = torch.mul(argument_2, torch.sub(_7, torch.log(_8), alpha=1))
  _10 = torch.sum(_9, dtype=None)
  _11 = torch.eq(torch.argmax(_6, 1, False), torch.argmax(argument_2, 1, False))
  _12 = torch.to(torch.sum(_11, dtype=None), 6, False, False, None)
  _13 = (torch.div(torch.neg(_10), argument_3), torch.div(_12, argument_3), _0, _1, _2, _3)
  return _13



## Step 5a: Train

To train hosted model, use one of the existing FL workers:
 * Python FL Client: see "[Execute Plan with Python FL Client](Execute%20Plan%20with%20Python%20FL%20Client.ipynb)" notebook that
has example of using python FL worker.
 * [SwiftSyft](https://github.com/OpenMined/SwiftSyft)
 * [KotlinSyft](https://github.com/OpenMined/KotlinSyft)
 * [syft.js](https://github.com/OpenMined/syft.js)


