# Node

In [13]:
import time
import torch as th
th.set_num_threads(1)
import syft as sy
from syft.grid.clients.data_centric_fl_client import DataCentricFLClient
hook = sy.TorchHook(th)
alice = DataCentricFLClient(hook, "ws://localhost:7600")
bob = DataCentricFLClient(hook, "ws://localhost:7601")
my_grid = sy.PrivateGridNetwork(alice,bob)



In [14]:
import argparse
import ast
import threading
import time

import pyarrow
import pyarrow.flight
import pyarrow.csv as csv

In [15]:
def list_flights(args, client, connection_args={}):
    print('Flights\n=======')
    for flight in client.list_flights():
        descriptor = flight.descriptor
        if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH:
            print("Path:", descriptor.path)
        elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD:
            print("Command:", descriptor.command)
        else:
            print("Unknown descriptor type")

        print("Total records:", end=" ")
        if flight.total_records >= 0:
            print(flight.total_records)
        else:
            print("Unknown")

        print("Total bytes:", end=" ")
        if flight.total_bytes >= 0:
            print(flight.total_bytes)
        else:
            print("Unknown")

        print("Number of endpoints:", len(flight.endpoints))
        print("Schema:")
        print(flight.schema)
        print('---')

    print('\nActions\n=======')
    for action in client.list_actions():
        print("Type:", action.type)
        print("Description:", action.description)
        print('---')


def do_action(args, client, connection_args={}):
    try:
        buf = pyarrow.allocate_buffer(0)
        action = pyarrow.flight.Action(args.action_type, buf)
        print('Running action', args.action_type)
        for result in client.do_action(action):
            print("Got result", result.body.to_pybytes())
    except pyarrow.lib.ArrowIOError as e:
        print("Error calling action:", e)


def push_data(args, client, connection_args={}):
    print('File Name:', args.file)
    my_table = csv.read_csv(args.file)
    print('Table rows=', str(len(my_table)))
    df = my_table.to_pandas()
    print(df.head())
    writer, _ = client.do_put(
        pyarrow.flight.FlightDescriptor.for_path(args.file), my_table.schema)
    writer.write_table(my_table)
    writer.close()


def get_flight(args, client, connection_args={}):
    if args.path:
        descriptor = pyarrow.flight.FlightDescriptor.for_path(*args.path)
    else:
        descriptor = pyarrow.flight.FlightDescriptor.for_command(args.command)

    info = client.get_flight_info(descriptor)
    for endpoint in info.endpoints:
        print('Ticket:', endpoint.ticket)
        for location in endpoint.locations:
            print(location)
            get_client = pyarrow.flight.FlightClient(location,
                                                     **connection_args)
            reader = get_client.do_get(endpoint.ticket)
            df = reader.read_pandas()
            print(df)

In [75]:
scheme = "grpc+tcp"
host="localhost"
port="7604"
 
client = pyarrow.flight.FlightClient(f"{scheme}://{host}:{port}")
print(client)

<pyarrow._flight.FlightClient object at 0x7f826723f7b0>


In [76]:
action = pyarrow.flight.Action("healthcheck", b"")
print(action)
options = pyarrow.flight.FlightCallOptions(timeout=1)
print(options)
list(client.do_action(action, options=options))

<pyarrow._flight.Action object at 0x7f82577c0cf0>
<pyarrow._flight.FlightCallOptions object at 0x7f82670629c0>


[]

In [77]:
list_flights({}, client)

Flights

Actions
Type: clear
Description: Clear the stored flights.
---
Type: shutdown
Description: Shut down this server.
---


In [122]:
import pandas as pd
import numpy as np
import pyarrow as pa

In [123]:
n = np.random.randint(255, size=(100_000, 1_000), dtype=np.uint8)
print(n)
print(n.nbytes)

[[207 130 118 ... 104  50  89]
 [ 78  64 231 ...  18 183 212]
 [237 222 209 ...  23 134 115]
 ...
 [102 217 159 ... 149 109 230]
 [158  47 169 ... 128  11  72]
 [105 201  41 ... 126  95 187]]
100000000


In [124]:
d = pd.DataFrame(n)
print(d.head())
print(np.asarray(d))

   0    1    2    3    4    5    6    7    8    9    ...  990  991  992  993  \
0  207  130  118  105   41   73   35  132   21  185  ...  161  236  167  143   
1   78   64  231   13  171   70  237   37  141   42  ...  178  199   40  219   
2  237  222  209  138  247  204   83  128  215  124  ...   58   61  122   86   
3  225   38  138  140    4  252  163  132  220  181  ...  203  225  136  191   
4   53  208    5  245  155  149  164   77    8  216  ...   74   94  224   51   

   994  995  996  997  998  999  
0  230  135  227  104   50   89  
1   37  146    6   18  183  212  
2  173  151  159   23  134  115  
3   10  219   38  252  186   21  
4  130    8  241  195   99   20  

[5 rows x 1000 columns]
[[207 130 118 ... 104  50  89]
 [ 78  64 231 ...  18 183 212]
 [237 222 209 ...  23 134 115]
 ...
 [102 217 159 ... 149 109 230]
 [158  47 169 ... 128  11  72]
 [105 201  41 ... 126  95 187]]


In [125]:
# a = pyarrow.array(n)
# a

In [126]:
t = pyarrow.Table.from_pandas(d)
# t = pyarrow.Tensor.from_numpy(n)
# print(t)
print(t.nbytes)

100000000


In [127]:
writer, _ = client.do_put(
        pyarrow.flight.FlightDescriptor.for_path("/hee"), t.schema)

# writer, _ = client.do_put(
#         pyarrow.flight.FlightDescriptor.for_path("/hee"), schema=pa.schema([("tensor", pa.uint8())]))

print(writer)

<pyarrow._flight.FlightStreamWriter object at 0x7f82658421e0>


In [128]:
writer.write_table(t)
writer.close()

In [129]:
writer_2, _ = client.do_put(
        pyarrow.flight.FlightDescriptor.for_command("feed_crypto_store_fssb"), t.schema)

In [130]:
writer_2.write_table(t)
writer_2.close()

In [133]:
writer_2, _ = client.do_put(
        pyarrow.flight.FlightDescriptor.for_command("feed_crypto_store_fssb"), t.schema)
# writer_2.write_batch(pa.record_batch(pa.serialize(n).to_buffer())
writer_2.close()

TypeError: Expected pandas DataFrame or list of arrays