# Nodes

In [2]:
import time
import torch as th
th.set_num_threads(1)
import syft as sy
from syft.grid.clients.data_centric_fl_client import DataCentricFLClient
hook = sy.TorchHook(th)
alice = DataCentricFLClient(hook, "ws://localhost:7600")
bob = DataCentricFLClient(hook, "ws://localhost:7601")
my_grid = sy.PrivateGridNetwork(alice,bob)

In [3]:
import argparse
import ast
import threading
import time

import pyarrow
import pyarrow.flight
import pyarrow.csv as csv

In [4]:
def list_flights(args, client, connection_args={}):
    print('Flights\n=======')
    for flight in client.list_flights():
        descriptor = flight.descriptor
        if descriptor.descriptor_type == pyarrow.flight.DescriptorType.PATH:
            print("Path:", descriptor.path)
        elif descriptor.descriptor_type == pyarrow.flight.DescriptorType.CMD:
            print("Command:", descriptor.command)
        else:
            print("Unknown descriptor type")

        print("Total records:", end=" ")
        if flight.total_records >= 0:
            print(flight.total_records)
        else:
            print("Unknown")

        print("Total bytes:", end=" ")
        if flight.total_bytes >= 0:
            print(flight.total_bytes)
        else:
            print("Unknown")

        print("Number of endpoints:", len(flight.endpoints))
        print("Schema:")
        print(flight.schema)
        print('---')

    print('\nActions\n=======')
    for action in client.list_actions():
        print("Type:", action.type)
        print("Description:", action.description)
        print('---')


def do_action(args, client, connection_args={}):
    try:
        buf = pyarrow.allocate_buffer(0)
        action = pyarrow.flight.Action(args.action_type, buf)
        print('Running action', args.action_type)
        for result in client.do_action(action):
            print("Got result", result.body.to_pybytes())
    except pyarrow.lib.ArrowIOError as e:
        print("Error calling action:", e)


def push_data(args, client, connection_args={}):
    print('File Name:', args.file)
    my_table = csv.read_csv(args.file)
    print('Table rows=', str(len(my_table)))
    df = my_table.to_pandas()
    print(df.head())
    writer, _ = client.do_put(
        pyarrow.flight.FlightDescriptor.for_path(args.file), my_table.schema)
    writer.write_table(my_table)
    writer.close()


def get_flight(args, client, connection_args={}):
    if args.path:
        descriptor = pyarrow.flight.FlightDescriptor.for_path(*args.path)
    else:
        descriptor = pyarrow.flight.FlightDescriptor.for_command(args.command)

    info = client.get_flight_info(descriptor)
    for endpoint in info.endpoints:
        print('Ticket:', endpoint.ticket)
        for location in endpoint.locations:
            print(location)
            get_client = pyarrow.flight.FlightClient(location,
                                                     **connection_args)
            reader = get_client.do_get(endpoint.ticket)
            df = reader.read_pandas()
            print(df)

In [5]:
scheme = "grpc+tcp"
host="localhost"
port="7604"
 
client = pyarrow.flight.FlightClient(f"{scheme}://{host}:{port}")
print(client)

<pyarrow._flight.FlightClient object at 0x7fc68c7808d0>


In [6]:
action = pyarrow.flight.Action("healthcheck", b"")
print(action)
options = pyarrow.flight.FlightCallOptions(timeout=1)
print(options)
list(client.do_action(action, options=options))

<pyarrow._flight.Action object at 0x7fc68c758270>
<pyarrow._flight.FlightCallOptions object at 0x7fc68b63c150>


FlightUnavailableError: gRPC returned unavailable error, with message: failed to connect to all addresses

In [7]:
list_flights({}, client)

Flights


FlightUnavailableError: gRPC returned unavailable error, with message: failed to connect to all addresses

In [78]:
import pandas as pd
import numpy as np
import pyarrow as pa
import struct
import json

In [10]:
n = np.random.randint(255, size=(1, 1_000_000), dtype=np.uint8)
print(n)
print(n.nbytes)

[[189  97 145 ... 101  36 110]]
1000000


In [11]:
d = pd.DataFrame(n)
print(d.head())
print(np.asarray(d))

   0       1       2       3       4       5       6       7       8       \
0     189      97     145     119      95     138     130     148     186   

   9       ...  999990  999991  999992  999993  999994  999995  999996  \
0     224  ...      28      66     132      90     178     245     122   

   999997  999998  999999  
0     101      36     110  

[1 rows x 1000000 columns]
[[189  97 145 ... 101  36 110]]


In [12]:
# a = pyarrow.array(n)
# a

In [13]:
# t = pyarrow.Table.from_pandas(d)
# # t = pyarrow.Tensor.from_numpy(n)
# # print(t)
# print(t.nbytes)

In [14]:
# writer, _ = client.do_put(
#         pyarrow.flight.FlightDescriptor.for_path("/hee"), t.schema)

# # writer, _ = client.do_put(
# #         pyarrow.flight.FlightDescriptor.for_path("/hee"), schema=pa.schema([("tensor", pa.uint8())]))

# print(writer)

In [15]:
# writer.write_table(t)
# writer.close()

In [16]:
# writer_2, _ = client.do_put(
#         pyarrow.flight.FlightDescriptor.for_command("feed_crypto_store_fssb"), t.schema)

In [17]:
# writer_2.write_table(t)
# writer_2.close()

In [18]:
# writer_2, _ = client.do_put(
#         pyarrow.flight.FlightDescriptor.for_command("feed_crypto_store_fssb"), t.schema)
# # writer_2.write_batch(pa.record_batch(pa.serialize(n).to_buffer())
# writer_2.close()

In [19]:
sn = pa.serialize(n)
sn

<pyarrow.lib.SerializedPyObject at 0x7fc689dae4a0>

In [83]:
a = pa.array([pa.serialize(n).to_buffer().to_pybytes()])
print(a.nbytes)
print(type(a))

1000776
<class 'pyarrow.lib.BinaryArray'>


In [88]:
b = json.dumps({"message_dict": 0}).encode("utf-8")
r = pa.RecordBatch.from_arrays([pa.array([b])], names=[""])
r

pyarrow.RecordBatch
: binary

In [80]:
# r = pa.RecordBatch.from_arrays([a], names=[""])
# r

pyarrow.RecordBatch
: int64

In [97]:
writer_2, reader = client.do_put(
        pyarrow.flight.FlightDescriptor.for_command("feed_crypto_store_fssb"), r.schema)

In [98]:
# metadata = struct.pack('<i', 1)
# writer_2.write_with_metadata(r, metadata)
writer_2.write_batch(r)
response_buf = reader.read().to_pybytes()
print(type(response_buf))
print(response_buf)
writer_2.close()

# print(type(response_buf))
# i, = struct.unpack('<i', response_buf.to_pybytes())
# i

<class 'bytes'>
b'\x01\x00\x00\x00\x00\x00\x00\x00'


In [26]:
a = reader
a
print(a.read())

None


In [27]:
writer_2.write_batch(r)
writer_2.close()

FlightInternalError: Could not write record batch to stream

In [None]:
r

In [None]:
r[0].nbytes

In [None]:
r[0].buffers()

In [None]:
r[0].buffers()[1].size

In [None]:
r[0].buffers()[2].size

In [None]:
nn = pa.deserialize(r[0].buffers()[2])

In [None]:
type(nn)

In [None]:
nn.size

In [None]:
r[0])

In [None]:
# batch = pa.record_batch(pa.array(sn.to_buffer()))
# batch

In [None]:
buffer = pa.serialize(n).to_buffer()
type(buffer)

In [None]:
stream = pa.input_stream(buffer)
print(stream)
print(stream.read(2))

In [None]:
pa.ipc.RecordBatchFileReader(stream)