In [2]:
import sys
import os
import time

import pandas as pd
import numpy as np
import json as json
import statistics as stat
import sklearn.metrics as skmetrics
import sklearn.model_selection as skms

import pymetry as pym
import pymetry.ds as pym_ds
import pymetry.project as pym_prj
import pymetry.utilities.jobs as pym_jobs
import pymetry.utilities.utils as pym_ut
import pymetry.explore as pym_exp
import pymetry.federation as fed

### Federation & peer names

In [2]:
fed_admin_name = "fed_name_iris_01"

# Peer 0 is the Federation Administrator
peer0_name = "peer0"
peer1_name = "peer1"
peer2_name = "peer2"

### Read config information

In [3]:
pym.init_conf({
    "server" : 'http://YOUR_IP_ADDRESS',
    "sym_key_id" : "YOUR_SYMETRY_KEYID",
    "sym_secret_key" : "YOUR_SYMETRY_SECRET_KEY",
    
    "s3accesskey" : "YOUR_AWS_ACCESS_KEY",
    "s3secretkey" : "YOUR_AWS_S3_SECRET_KEY",
    
    "fedadmin_password" : "YOUR_FEDERATION_PASSWORD",
    "fedadmin_s3_bucket": "YOUR_S3_BUCKET",
    "fedadminuser" : "YOUR_AWS_ACCOUNTID",
    
    "fedpeer1_S3_bucket" : "YOUR_S3_BUCKET",
    "fedpeer1user" : "YOUR_AWS_ACCOUNTID",
    
    "fedpeer2_S3_bucket" : "YOUR_S3_BUCKET",
    "fedpeer2user" : "YOUR_AWS_ACCOUNTID"
})

## Read the data

### Iris Dataset

In [5]:
DATA_FILE = "../../../data/Iris_data.csv"

df = pd.read_csv(DATA_FILE)

# Dropping Columns to avoid Singular Matrix Error
columns_drop = ['sepal_lengt_b2',
                'sepal_width_b2',
                'petal_length_b2',
                'petal_width_b2',
                'Iris_versicolor',
                'Iris_virginica',
                'Iris_setosa']

df = df.drop(columns_drop, 
             axis=1)

# Get list of Attributes & Target
attributes_target = df.columns.tolist()

### Shuffle Dataset & separate into 3 equal parts for each peer of the federation
### Create Dataframes for all 3 peers

In [6]:
shuffled = df.sample(frac=1)
df_out = np.array_split(shuffled, 3) 

In [7]:
def create_train_test(in_df):
    
    train_df, test_df = skms.train_test_split(in_df, 
                                              test_size=0.2, 
                                              random_state=42)

    print(train_df.shape)
    print(test_df.shape)

    smldf_train = pym_ut.pandas_df_to_sml_json(pandas_df=train_df)
    smldf_test = pym_ut.pandas_df_to_sml_json(pandas_df=test_df)
    
    rval = (smldf_train, 
            smldf_test, 
            test_df)
    
    return rval

In [8]:
smldf0_train, smldf0_test, df0_test = create_train_test(df_out[0])
smldf1_train, smldf1_test, df1_test = create_train_test(df_out[1])
smldf2_train, smldf2_test, df2_test = create_train_test(df_out[2])

(40, 8)
(10, 8)
(40, 8)
(10, 8)
(40, 8)
(10, 8)


### Creating Federation Admin

In [9]:
def create_fedadmin_info(in_fed_admin_name,
                         in_fed_peer0_name):
    
    fed_name_tmp = in_fed_admin_name
    fed_local_tmp = in_fed_peer0_name
    fed_local_type_tmp = 'cpu'
    aws_access_key_tmp = pym.conf['s3accesskey']
    aws_secret_key_tmp = pym.conf['s3secretkey']
    aws_region_tmp = 'US_EAST_1'
    s3_bucket_tmp = pym.conf['fedadmin_s3_bucket']
    aws_id_tmp = pym.conf['fedadminuser']
    sync_sched_tmp = 'm1'

    fed_info_out = fed.get_fed_info(fed_name_tmp,
                                    fed_local_tmp,
                                    fed_local_type_tmp,
                                    aws_access_key_tmp,
                                    aws_secret_key_tmp,
                                    aws_region_tmp,
                                    s3_bucket_tmp,
                                    aws_id_tmp,
                                    sync_sched_tmp)
    rval = fed_info_out

    return rval

In [10]:
fed_info_out = create_fedadmin_info(fed_admin_name,
                                    peer0_name)

### Creating Federation Peer & Learning training data

In [11]:
fed_info = fed.create_federation(peer0_name,
                                 fed_info_out)
print(fed_info.content)
histo = pym_prj.enable_histogram(peer0_name)

b'{"statusCode":"CREATED","statusString":"Project Created with id:peer0","values":{}}'
No requestpayload!



In [12]:
pro_learnls = pym_prj.stream_data_to_project(peer0_name, 
                                             smldf0_train, 
                                             client_id='pymetry')
print(pro_learnls.content)

b'{"statusCode":"OK","statusString":"OK","values":{}}'


In [13]:
def fed_join_created(in_fed_encrypt_passwd,
                     in_fed_join_name,
                     in_peer_userid,
                     in_peer_user_s3bucket,
                     in_peer_s3accesskey,
                     in_peer_s3secretkey):
    
    join_local_name_in = in_fed_join_name
    join_local_type_in = 'cpu'
    join_aws_id_in = in_peer_userid
    join_s3_bucket_in =  in_peer_user_s3bucket
    join_access_key_in = in_peer_s3accesskey
    join_secret_key_in = in_peer_s3secretkey
    join_aws_region_in = 'US_EAST_1'

    fed_join_body = fed.get_fed_join_info(in_fed_encrypt_passwd,
                                          join_local_name_in,
                                          join_local_type_in,
                                          join_aws_id_in,
                                          join_s3_bucket_in,
                                          join_access_key_in,
                                          join_secret_key_in,
                                          join_aws_region_in)
    rval = fed_join_body
    return rval

In [14]:
def fed_encrypt_create(in_fed_peer0_name,
                       in_fed_peerx_name,
                       in_peer_userid,
                       in_peer_user_s3bucket,
                       in_peer_s3accesskey,
                       in_peer_s3secretkey):

    fed_encrypt_passwd = pym.conf['fedadmin_password']
    fed_encrypt_rest_host = pym.conf['server']

    fed_pad_passwd = fed.pad_password(fed_encrypt_passwd)

    fed_encrypt_info_out = fed.get_fed_encrypt_info(fed_encrypt_passwd,
                                                    fed_encrypt_rest_host)
    
    fed_encrypt_out = fed.get_fed_encrypt(in_fed_peer0_name, 
                                          fed_encrypt_info_out)

    fed_msg_decrypt = fed_encrypt_out.value['encryptedGroupInfo']

    decrypt_output_tmp = fed.decrypt_fed_info(fed_msg_decrypt, 
                                              fed_pad_passwd)

    fed_join_body_tmp = fed_join_created(fed_encrypt_passwd,
                                         in_fed_peerx_name,
                                         in_peer_userid,
                                         in_peer_user_s3bucket,
                                         in_peer_s3accesskey,
                                         in_peer_s3secretkey)

    fed_json = json.loads(decrypt_output_tmp)

    fed_json.update(fed_join_body_tmp)

    rval = fed_json

    return rval

In [15]:
def join_fed_learn_data(in_fed_peer0_name,
                        in_fed_peerx_name,
                        in_smldf_train,
                        in_peer_userid,
                        in_peer_user_s3bucket,
                        in_peer_s3accesskey,
                        in_peer_s3secretkey):
    
    encrypt_out = fed_encrypt_create(in_fed_peer0_name,
                                     in_fed_peerx_name,
                                     in_peer_userid,
                                     in_peer_user_s3bucket,
                                     in_peer_s3accesskey,
                                     in_peer_s3secretkey)
                                

    join_status = fed.create_join_federation(in_fed_peerx_name, 
                                             encrypt_out)
    
    pym_prj.enable_histogram(in_fed_peerx_name)
    
    print(in_fed_peerx_name + ": %s " % join_status.content)
    time.sleep(2)
    
    pro_learnls = pym_prj.stream_data_to_project(in_fed_peerx_name, 
                                                 in_smldf_train, 
                                                 client_id='pymetry')

### Peer 1 joining Federation

In [16]:
peer1_userid = pym.conf['fedpeer1user']
peer1_s3bucket = pym.conf['fedpeer1_s3_bucket']
peer1_accesskey = pym.conf['s3accesskey']
peer1_secretkey = pym.conf['s3secretkey']

# import ipdb; ipdb.set_trace()

join_fed_learn_data(peer0_name,
                    peer1_name,
                    smldf1_train,
                    peer1_userid,
                    peer1_s3bucket,
                    peer1_accesskey,
                    peer1_secretkey)

No requestpayload!

peer1: b'{"statusCode":"CREATED","statusString":"Fed Project created with join","values":{}}' 


### Peer 2 joining Federation

In [17]:
peer2_userid = pym.conf['fedpeer2user']
peer2_s3bucket = pym.conf['fedpeer2_s3_bucket']
peer2_accesskey = pym.conf['s3accesskey']
peer2_secretkey = pym.conf['s3secretkey']

join_fed_learn_data(peer0_name,
                    peer2_name,
                    smldf2_train,
                    peer2_userid,
                    peer2_s3bucket,
                    peer2_accesskey,
                    peer2_secretkey)

No requestpayload!

peer2: b'{"statusCode":"CREATED","statusString":"Fed Project created with join","values":{}}' 


### Start Pulsing

In [18]:
time.sleep(5)
pulse_out = fed.start_pulse(peer0_name)
print(peer0_name + ": %s " % pulse_out.content)    
    
pulse_out = fed.start_pulse(peer1_name)
print(peer1_name + ": %s " % pulse_out.content)

pulse_out = fed.start_pulse(peer2_name)
print(peer2_name + ": %s " % pulse_out.content)

peer0: b'{"statusCode":"OK","statusString":"OK","values":{}}' 
peer1: b'{"statusCode":"OK","statusString":"OK","values":{}}' 
peer2: b'{"statusCode":"OK","statusString":"OK","values":{}}' 


### Pause for 5 minutes to let all Peers Sync information

In [3]:
pause_time = 300

print("Pausing for %s seconds to allow Federation to sync information \
for all peers " % pause_time)
time.sleep(pause_time)

Pausing for 300 seconds to allow Federation to sync information for all peers 


### Monitor Federation Peers Sync & Error Logs

In [20]:
def get_synclogs(in_project):
    
    rsp = fed.get_fed_sync_log(in_project)
    sync_peer = rsp.json['values']['stringList']['values']
    for i in range(0, len(sync_peer)):
        print(sync_peer[i])

In [21]:
def get_errorlogs(in_project):
    
    rsp = fed.get_fed_error_log(in_project)
    err_peer = rsp.json['values']['stringList']['values']
    for i in range(0, len(err_peer)):
        print(err_peer[i])

#### Peer 0 (Admin) : Sync Logs

In [22]:
get_synclogs(peer0_name)

fedProj rebuilt done name:[peer0] id:[28] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:6630 time:2022-03-09T16.58
fedProj rebuilt done name:[peer0] id:[28] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:3333 time:2022-03-09T16.59
fedProj rebuilt done name:[peer0] id:[28] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:2793 time:2022-03-09T17.00
fedProj rebuilt done name:[peer0] id:[28] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:29 time:2022-03-09T17.01
fedProj rebuilt done name:[peer0] id:[28] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:11 time:2022-03-09T17.02
fedProj rebuilt done name:[peer0] id:[28] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:18 time:2022-03-09T17.03


#### Peer 0 (Admin) : Error Logs

In [23]:
get_errorlogs(peer0_name)

no error


#### Peer 1: Sync Logs

In [24]:
get_synclogs(peer1_name)

fedProj rebuilt done name:[peer1] id:[29] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:5806 time:2022-03-09T16.58
fedProj rebuilt done name:[peer1] id:[29] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:4624 time:2022-03-09T16.59
fedProj rebuilt done name:[peer1] id:[29] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:4250 time:2022-03-09T17.00
fedProj rebuilt done name:[peer1] id:[29] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:2070 time:2022-03-09T17.01
fedProj rebuilt done name:[peer1] id:[29] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:25 time:2022-03-09T17.02
fedProj rebuilt done name:[peer1] id:[29] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:17 time:2022-03-09T17.03


#### Peer 1: Error Logs

In [25]:
get_errorlogs(peer1_name)

no error


#### Peer 2 : Sync Logs

In [26]:
get_synclogs(peer2_name)

fedProj rebuilt done name:[peer2] id:[30] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:5590 time:2022-03-09T16.58
fedProj rebuilt done name:[peer2] id:[30] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:4832 time:2022-03-09T16.59
fedProj rebuilt done name:[peer2] id:[30] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:2491 time:2022-03-09T17.00
fedProj rebuilt done name:[peer2] id:[30] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:35 time:2022-03-09T17.01
fedProj rebuilt done name:[peer2] id:[30] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:13 time:2022-03-09T17.02
fedProj rebuilt done name:[peer2] id:[30] type:[com.rtlm.core.bets.DynamicBET] #peers:2 ms:10 time:2022-03-09T17.03


#### Peer 2 : Error Logs

In [27]:
get_errorlogs(peer2_name)

no error


### Stop Pulsing

In [28]:
time.sleep(5)
 
pulse_out = fed.stop_pulse(peer0_name)
print(peer0_name + ": %s " % pulse_out.content)

pulse_out = fed.stop_pulse(peer1_name)
print(peer1_name + ": %s " % pulse_out.content)

pulse_out = fed.stop_pulse(peer2_name)
print(peer2_name + ": %s " % pulse_out.content)

peer0: b'{"statusCode":"OK","statusString":"OK","values":{}}' 
peer1: b'{"statusCode":"OK","statusString":"OK","values":{}}' 
peer2: b'{"statusCode":"OK","statusString":"OK","values":{}}' 


### Build Regression Models on all 3 peers

In [29]:
def build_models(project_name):   
    
    model_name = 'regression_one'
    prj_name = project_name
    ml_context = {"targets": ["0"],
                  "inputAttributes": ["1", "2", "3", "4", "5", "6", "7"]}

    rsp = pym_prj.build_model(ml_context=ml_context,
                              pid=prj_name,
                              modelid=model_name,
                              algo='mlr')
    print(rsp.content)
    pym_jobs.wait_for_job_finish(rsp)

#### Peer 0 (Admin) : Build Model 

In [30]:
build_models(peer0_name)

b'{"statusCode":"ACCEPTED","statusString":"Job Created","values":{}}'
{'statusCode': 'OK', 'statusString': 'Job is finished', 'values': {}}


#### Peer 1: Build Model

In [31]:
build_models(peer1_name)

b'{"statusCode":"ACCEPTED","statusString":"Job Created","values":{}}'
{'statusCode': 'OK', 'statusString': 'Job is finished', 'values': {}}


#### Peer 2: Build Model

In [32]:
build_models(peer2_name)

b'{"statusCode":"ACCEPTED","statusString":"Job Created","values":{}}'
{'statusCode': 'OK', 'statusString': 'Job is finished', 'values': {}}


### Make Predictions

In [33]:
def predict(peer_name_in,
            model_name_in,
            smldf_test_in):

    rsp = pym_prj.predict_df(peer_name_in,
                             model_name_in,
                             smldf_test_in)
    print(rsp.content)
    
    rst = rsp.json['values']['KSVSMap']['values']
    pred_values = [ ]
    for i in range(0,len(rst)):
        tt = list(rst[i].values())
        pred_values.append(float(tt[0]))
    rval = pred_values
    return rval

#### Peer 0 (Admin) : Prediction

In [34]:
model_name = 'regression_one'

y_pred0 = predict(peer0_name,
                  model_name,
                  smldf0_test)

b'{"statusCode":"OK","statusString":"OK","values":{"KSVSMap":{"values":[{"res":"6.276775361205365"},{"res":"5.8303662248007955"},{"res":"4.899862900172971"},{"res":"4.971069245100589"},{"res":"6.838514958372939"},{"res":"5.612375572889659"},{"res":"4.937745046020856"},{"res":"5.792312897898156"},{"res":"5.0475867443974884"},{"res":"6.226764491416657"}]}}}'


#### Peer 1 : Prediction

In [35]:
y_pred1 = predict(peer1_name,
                  model_name,
                  smldf1_test)

b'{"statusCode":"OK","statusString":"OK","values":{"KSVSMap":{"values":[{"res":"4.902518477357614"},{"res":"6.977722061221451"},{"res":"4.7934299865821135"},{"res":"5.4699096139751475"},{"res":"7.023367740844205"},{"res":"6.193643490115987"},{"res":"6.268055422050321"},{"res":"5.369541062354703"},{"res":"5.011606968133112"},{"res":"7.193243465587808"}]}}}'


#### Peer 2 : Prediction

In [36]:
y_pred2 = predict(peer2_name,
                  model_name,
                  smldf2_test)

b'{"statusCode":"OK","statusString":"OK","values":{"KSVSMap":{"values":[{"res":"5.351635495008084"},{"res":"6.394194263716995"},{"res":"6.810105959281234"},{"res":"5.015015752918875"},{"res":"4.937745046020844"},{"res":"4.828656555245336"},{"res":"5.55788695039732"},{"res":"6.438690781357381"},{"res":"5.482844814576373"},{"res":"7.267890869500263"}]}}}'


### R Square Measure

#### Peer 0 (Admin) : R Square

In [37]:
y_true0 = df0_test['sepal_length'].values.tolist()
rsquare0 = skmetrics.r2_score(y_true0, 
                              y_pred0)
print("R Square for peer0 : %s " % rsquare0)

R Square for peer0 : 0.8733322289180675 


#### Peer 1 : R Square

In [38]:
y_true1 = df1_test['sepal_length'].values.tolist()
rsquare1 = skmetrics.r2_score(y_true1, 
                              y_pred1)
print("R Square for peer1 : %s " % rsquare1)

R Square for peer1 : 0.8995412920135443 


#### Peer 2 : R Square

In [39]:
y_true2 = df2_test['sepal_length'].values.tolist()
rsquare2 = skmetrics.r2_score(y_true2, 
                              y_pred2)
print("R Square for peer2 : %s " % rsquare2)

R Square for peer2 : 0.8885540855221365 
