In [215]:
from grid.ipfsapi.client import Client
import json
import numpy as np
import pickle
import base64
client = Client()

In [229]:
""" This module contains an implementation of an IPFS
version-control system, which is structured as a directed in-tree with nodes
represented by the bytes representation of the VersionTreeNode class. """
from typing import Optional, Iterator

from grid import ipfsapi

# TODO: Unit tests.
# TODO: Do we want to store the hash on the node after it's been committed?
class VersionTreeNode:
    """ Thin wrapper around a piece of IPFS-versioned data and the
    IPFS multihash of its parent. """
    # Delimiter for serializing packed object. Should not be alphanumeric.
    DELIMITER = b"|"

    def __init__(self,
                 contents: bytes,
                 id_hash: Optional[str] = None,
                 parent_hash: Optional[str] = None,
                 ipfs_client: ipfsapi.Client = None):
        """ parent_hash is a UTF-8 IPFS multihash identifying
        this node's parent in the version tree. If parent_hash is None,
        this node is the root of a version tree. """
        self.contents = contents
        # Convert empty string to None to minimize typing bugs.
        self.id_hash = id_hash or None
        self.parent_hash = parent_hash or None
        self.ipfs_client = ipfs_client

    def commit(self, ipfs_client: ipfsapi.Client = None) -> str:
        """ Commits the node to the version tree, and returns the
        UTF-8 multihash representing its IPFS ID"""
        
        self.id_hash = (ipfs_client or self.ipfs_client).add_bytes(self.to_bytes())
        return self.id_hash
    
    @classmethod
    def get_node_by_hash(cls,
                         multihash: str,
                         ipfs_client: ipfsapi.Client) -> "VersionTreeNode":
        """ Retrieve and deserialize a VersionTreeNode addressed
        by it's UTF-8 multihash IPFS ID. """
        return cls.from_bytes(ipfs_client.cat(multihash))

    def get_with_ancestors(
            self,
            ipfs_client: ipfsapi.Client = None) -> Iterator["VersionTreeNode"]:
        """ Return an iterator containing this node and all its
        direct ancestors in the version tree, in that order. """
        yield self
        parent_hash = self.parent_hash
        while parent_hash is not None:
            parent_node = self.get_node_by_hash(
                parent_hash,
                (ipfs_client or self.ipfs_client))
            parent_hash = parent_node.parent_hash
            yield parent_node

    @classmethod
    def get_node_with_ancestors_by_hash(
            cls,
            multihash: str,
            ipfs_client: ipfsapi.Client) -> Iterator["VersionTreeNode"]:
        """ Convenience method to get an iterator of the node identified by the
        provided UTF-8 IPFS multihash, along with all of its ancestors, in
        that order."""
        return cls.get_node_by_hash(
            multihash, ipfs_client).get_with_ancestors(ipfs_client)

    def to_bytes(self) -> bytes:
        """ For contents b"foo", parent_hash "bar", and DELIMITER b"|",
        returns b"foo|bar" """
        parent_hash_bytes = self.parent_hash.encode("utf-8") if \
            self.parent_hash else \
            b""
        id_hash_bytes = self.id_hash.encode("utf-8") if \
            self.id_hash else \
            b""
        return self.DELIMITER.join((self.contents, id_hash_bytes, parent_hash_bytes))

    @classmethod
    def from_bytes(cls, b: bytes) -> "VersionTreeNode":
        """ In case the contents section happens to contain the DELIMITER
        string, only splits on the final occurrence of DELIMITER. The
        multihash is hexadecimal, so it won't contain the non-hex DELIMITER."""
        contents, id_hash_bytes, parent_hash_bytes = b.rsplit(cls.DELIMITER, maxsplit=1)
        return cls(contents, id_hash_bytes.decode("utf-8"), parent_hash_bytes.decode("utf-8"))
    
    def to_json(self):
        message = {'id_hash': self.id_hash, 'parent_hash': self.parent_hash}
        return json.dumps(message)
    
    def publish_children(self, ipfs_client: ipfsapi.Client):
        
        channel = 'children_of_' + str(self.parent_hash)
        
        ipfs_client.pubsub_pub(topic = channel, payload=self.to_json())
        
    def listen_for_children(self, ipfs_client: ipfsapi.Client):
        channel = 'children_of_' + str(self.parent_hash)
        
        new_messages = ipfs_client.pubsub_sub(topic=channel, stream=True)
        #message = self.decode_message(new_messages)
        
        for m in new_messages:
            message = self.decode_message(m)
            if message is not None:
                print(message)
        
    
    def decode_message(self, encoded):
        if ('from' in encoded):
            decoded = {}
            decoded['from'] = base64.standard_b64decode(encoded['from'])
            decoded['data'] = base64.standard_b64decode(
                encoded['data']).decode('ascii')
            decoded['seqno'] = base64.standard_b64decode(encoded['seqno'])
            decoded['topicIDs'] = encoded['topicIDs']
            decoded['encoded'] = encoded
            return decoded
        else:
            return None 

In [4]:
# When commit publish on channel children of
# When commit listen for children
# Will have to include in the VersionTreeNode the ipfs address

In [5]:
def publish(self, channel, message):
        """
        This method sends a message over an IPFS channel. The number of people
        who receive it is purely based on the number of people who happen
        to be listening.
        """

        if isinstance(message, dict) or isinstance(message, list):
            self.api.pubsub_pub(topic=channel, payload=json.dumps(message))
        else:
            self.api.pubsub_pub(topic=channel, payload=message)

    def request_response(self, channel, message, response_handler, timeout=10):
        """
        This method makes a request over a channel to a specific node and
        will hang until it receives a response from that node. Note that
        the channel used for the response is random.
        """

        random_channel = self.id + "_" + str(random.randint(0, 1e10))

        def timeout_message(seconds):
            time.sleep(int(seconds))
            self.publish(
                channel=random_channel,
                message=["timeout after " + str(seconds) + " seconds"])

        def send():
            self.publish(channel=channel, message=[message, random_channel])
            t1 = Thread(target=timeout_message, args={timeout})
            t1.start()

        response = self.listen_to_channel_sync(random_channel,
                                               response_handler, send)

        if (len(response) == 1):
            if ('timeout' in response[0]):
                raise TimeoutError(response[0])
        return response

    def listen_to_channel_sync(self, *args):
        """
        Synchronous version of listen_to_channel
        """

        return self.listen_to_channel_impl(*args)

    def listen_to_channel(self, *args):
        """
        Listens for IPFS pubsub sub messages asynchronously.
        This function will create the listener and call back your handler
        function on a new thread.
        """
        t1 = Thread(target=self.listen_to_channel_impl, args=args)
        t1.start()

    def listen_to_channel_impl(self,
                               channel,
                               handle_message,
                               init_function=None,
                               ignore_from_self=False):
        """
        Do not call directly.  Use listen_to_channel or listen_to_channel_sync instead.
        """

        first_proc = True

        if channel not in self.subscribed:
            new_messages = self.api.pubsub_sub(topic=channel, stream=True)
            self.subscribed.append(channel)

        else:
            return

        # new_messages is a generator which will keep yield new messages until
        # you return from the loop. If you do return from the loop, we will no
        # longer be subscribed.
        for m in new_messages:
            if init_function is not None and first_proc:
                init_function()
                first_proc = False

            message = self.decode_message(m)
            if message is not None:
                fr = base58.encode(message['from'])
                if not ignore_from_self or fr != self.id:
                    out = handle_message(message)
                    if out is not None:
                        return out
                else:
                    print('ignore mssage from self')

    def decode_message(self, encoded):
        if ('from' in encoded):
            decoded = {}
            decoded['from'] = base64.standard_b64decode(encoded['from'])
            decoded['data'] = base64.standard_b64decode(
                encoded['data']).decode('ascii')
            decoded['seqno'] = base64.standard_b64decode(encoded['seqno'])
            decoded['topicIDs'] = encoded['topicIDs']
            decoded['encoded'] = encoded
            return decoded
        else:
            return None

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 13)

In [230]:
om = 'openmined'

#children_of = f'{om}:children_of'
def children_of(id):
    channel = "children_of_" + str(id)
    return channel

In [231]:
import grid.ipfsapi.client

In [232]:
children_of(4353)

'children_of_4353'

In [233]:
message = {'content' : [34,6,624,546], 'parent_id': '4353'}

In [234]:
c = Client()

In [235]:
c.pubsub_pub(topic = children_of(4353), payload=json.dumps(message))

[]

In [236]:
data = np.array([45,356])
pickle.dumps(data)

b'\x80\x03cnumpy.core.multiarray\n_reconstruct\nq\x00cnumpy\nndarray\nq\x01K\x00\x85q\x02C\x01bq\x03\x87q\x04Rq\x05(K\x01K\x02\x85q\x06cnumpy\ndtype\nq\x07X\x02\x00\x00\x00i8q\x08K\x00K\x01\x87q\tRq\n(K\x03X\x01\x00\x00\x00<q\x0bNNNJ\xff\xff\xff\xffJ\xff\xff\xff\xffK\x00tq\x0cb\x89C\x10-\x00\x00\x00\x00\x00\x00\x00d\x01\x00\x00\x00\x00\x00\x00q\rtq\x0eb.'

In [237]:
n1 = VersionTreeNode(contents=pickle.dumps(data),parent_hash='4353',ipfs_client=c)

In [238]:
n1.to_json()

'{"id_hash": null, "parent_hash": "4353"}'

In [239]:
n1.commit()

'QmW12KVNRP83k4hUaKARWbse32wbocrZvKTgUkKsf8KG1a'

In [240]:
n1.publish_children(c)

In [241]:
n1.listen_for_children(c)

{'from': b'\x12 @\x9d\xbe]8\xb4R)\xb7Z\xfdj\xfd\xf41\xc5\xf5\xc4\x8d;^\xf3\xd4\x12\xeeZ\xa40\x8b\x9d\xd8|', 'data': '{"id_hash": "QmW12KVNRP83k4hUaKARWbse32wbocrZvKTgUkKsf8KG1a", "parent_hash": "4353"}', 'seqno': b"\x15!\xca\xef\xca\xd9('", 'topicIDs': ['children_of_4353'], 'encoded': {'from': 'EiBAnb5dOLRSKbda/Wr99DHF9cSNO17z1BLuWqQwi53YfA==', 'data': 'eyJpZF9oYXNoIjogIlFtVzEyS1ZOUlA4M2s0aFVhS0FSV2JzZTMyd2JvY3JadktUZ1VrS3NmOEtHMWEiLCAicGFyZW50X2hhc2giOiAiNDM1MyJ9', 'seqno': 'FSHK78rZKCc=', 'topicIDs': ['children_of_4353']}}
{'from': b'\x12 @\x9d\xbe]8\xb4R)\xb7Z\xfdj\xfd\xf41\xc5\xf5\xc4\x8d;^\xf3\xd4\x12\xeeZ\xa40\x8b\x9d\xd8|', 'data': '{"id_hash": "QmW12KVNRP83k4hUaKARWbse32wbocrZvKTgUkKsf8KG1a", "parent_hash": "4353"}', 'seqno': b'\x15!\xcb:\x06\x82,\xf1', 'topicIDs': ['children_of_4353'], 'encoded': {'from': 'EiBAnb5dOLRSKbda/Wr99DHF9cSNO17z1BLuWqQwi53YfA==', 'data': 'eyJpZF9oYXNoIjogIlFtVzEyS1ZOUlA4M2s0aFVhS0FSV2JzZTMyd2JvY3JadktUZ1VrS3NmOEtHMWEiLCAicGFyZW50X2hhc2giOiAiNDM1MyJ9'

KeyboardInterrupt: 

In [173]:
for i in yo:
    print(i)

{}
{'from': 'EiBAnb5dOLRSKbda/Wr99DHF9cSNO17z1BLuWqQwi53YfA==', 'data': 'eyJjb250ZW50IjogWzM0LCA2LCA2MjQsIDU0Nl0sICJwYXJlbnRfaWQiOiAiNDM1MyJ9', 'seqno': 'FSHFXsFIDU4=', 'topicIDs': ['children_of_4353']}
{'from': 'EiBAnb5dOLRSKbda/Wr99DHF9cSNO17z1BLuWqQwi53YfA==', 'data': 'eyJpZF9oYXNoIjogIlFtVzEyS1ZOUlA4M2s0aFVhS0FSV2JzZTMyd2JvY3JadktUZ1VrS3NmOEtHMWEiLCAicGFyZW50X2hhc2giOiAiNDM1MyJ9', 'seqno': 'FSHFYAGbVcI=', 'topicIDs': ['children_of_4353']}
{'from': 'EiBAnb5dOLRSKbda/Wr99DHF9cSNO17z1BLuWqQwi53YfA==', 'data': 'eyJpZF9oYXNoIjogIlFtVzEyS1ZOUlA4M2s0aFVhS0FSV2JzZTMyd2JvY3JadktUZ1VrS3NmOEtHMWEiLCAicGFyZW50X2hhc2giOiAiNDM1MyJ9', 'seqno': 'FSHFZj+w7AY=', 'topicIDs': ['children_of_4353']}


KeyboardInterrupt: 

In [170]:
for m in n1.listen_for_children():
    print(m)

TypeError: listen_for_children() missing 1 required positional argument: 'ipfs_client'

In [76]:
n1.listen_for_children()

TypeError: 'generator' object is not subscriptable