In [1]:
import sys

# Set up redis credentials
redis_host = 'redis'
redis_port = 6379
redis_password = ''

# redis_host = 'redis-19669.c51.ap-southeast-2-1.ec2.cloud.redislabs.com'
# redis_port = 19669
# redis_password = 'yyoTYWdXJwZJLvpufud1eeZ2PwaznuOY'

# When your PluginService is running, you can get your channel value from the Logs, or from the query parameter in your open browser.
# Update this value to match that, so that your commands will run against your live workspace.
redis_channel = 'a5100bcd-3b33-48dc-a8b0-67f88182f24e'

# Increase the recursion limit in order to properly serialize Complexes
recursion_limit = 10000
sys.setrecursionlimit(recursion_limit)


print("Hello")

Hello


In [2]:
import base64
import io
import json
import dill
import redis
import time
import uuid

from nanome import PluginInstance
from nanome.util import Logs


class StreamRedisInterface:
    """Gets wrapped around a stream object on creation, and is used to send data to the stream through redis.

    The PluginService has functions set up to handle streams, because streams on the client side aren't networked.
    This should not be called explicitly, but used through the RedisPluginInterface class.
    """

    def __init__(self, stream, plugin_interface):
        self._stream = stream
        self._plugin_interface = plugin_interface

    @property
    def stream_id(self):
        return self._stream._Stream__id

    def update(self, stream_data):
        response_channel = self._plugin_interface._publish_message(
            'stream_update', args=[self.stream_id, stream_data])
        response = self._plugin_interface._await_response(response_channel)
        return response
    
    def destroy(self):
        response_channel = self._plugin_interface._publish_message(
            'stream_destroy', args=[self.stream_id])
        response = self._plugin_interface._await_response(response_channel)
        return response


class PluginInstanceRedisInterface:
    """Provides interface for publishing PluginInstance requests over Redis.

    The idea is to feel like you're using the standard
    PluginInstance, but all calls are being made through Redis.
    """

    def __init__(self, redis_host, redis_port, redis_password, redis_channel=None):
        """Initialize the Connection to Redis."""
        self.redis = redis.Redis(host=redis_host, port=redis_port, password=redis_password)
        self.plugin_class = PluginInstance
        self.channel = redis_channel

    def set_channel(self, value):
        self.channel = value

    def __getattr__(self, name):
        """Override superclass getattr to provide a proxy for the PluginInstance class.

        If a user calls an attribute on the Interface that exists on the PluginInstance,
        return a proxy call to Redis.
        """
        plugin_instance_api = iter(attr for attr in dir(self.plugin_class) if not attr.startswith('_'))
        interface_override = iter(attr for attr in dir(self) if not attr.startswith('_'))
        # Only intercept if the property is a public property of a PluginInstance,
        # and theres no override on this class.
        if name in plugin_instance_api and name not in interface_override:
            def proxy_redis_message(*args, **kwargs):
                response_channel = self._publish_message(name, args, kwargs)
                response = self._await_response(response_channel)
                return response
            return proxy_redis_message
        return getattr(self, name)

    def create_writing_stream(self, atom_indices, stream_type):
        """Return a stream wrapped in the RedisStreamInterface"""
        function_name = 'create_writing_stream'
        args = [atom_indices, stream_type]
        response_channel = self._publish_message(function_name, args=args)
        stream, error = self._await_response(response_channel)
        if stream:
            stream_interface = StreamRedisInterface(stream, self)
            response = (stream_interface, error)
        return response

    def _publish_message(self, function_name, args=None, kwargs=None):
        """Publish a function request to Redis.

        :rtype: str. Name of response channel to subscribe to for results. 
        """
        args = args or []
        kwargs = kwargs or {}

        # Set random channel name for response
        response_channel = str(uuid.uuid4())
        message = json.dumps({
            'function': function_name,
            'args': args,
            'kwargs': kwargs,
            'args': self.pickle_data(args),
            'kwargs': self.pickle_data(kwargs),
            'response_channel': response_channel
        })
        Logs.message(f"Sending {function_name} Request to Redis Channel {self.channel}")
        # Subscribe to response channel before publishing message
        pubsub = self.redis.pubsub(ignore_subscribe_messages=True)
        pubsub.subscribe(response_channel)
        self.redis.publish(self.channel, message)
        return pubsub

    def _await_response(self, pubsub):
        """Subscribe to provided redis channel, and unpack response data.
        
        :arg: pubsub: Redis pubsub object subscribed to response channel.
        :rtype: dict. Unpickled data from Redis message.
        """
        timeout = time.time() + (30 * 60)
        while time.time() < timeout:
            try:
                message = pubsub.get_message()
            except redis.ConnectionError:
                # Do reconnection attempts here such as sleeping and retrying
                Logs.error("Redis Connection Failure")
                raise Exception
            if message:
                response_channel = next(iter(pubsub.channels.keys())).decode('utf-8')
                Logs.message(f"Response received on channel {response_channel}")
                response_data = self.unpickle_message(message)
                return response_data

    def unpickle_message(self, message):
        """Unpickle data from Redis message, and return contents."""
        pickled_data = message['data']
        response_data = self.unpickle_data(pickled_data)
        return response_data
    
    @staticmethod
    def pickle_data(data):
        """Return the stringified bytes of pickled data."""
        bytes_output = io.BytesIO()
        dill.dump(data, bytes_output)
        bytes_output_base64 = base64.b64encode(bytes_output.getvalue()).decode()
        bytes_output.close()
        return bytes_output_base64

    @staticmethod
    def unpickle_data(pickled_data):
        """Unpickle data into its original python version."""
        pickle_bytes = io.BytesIO(base64.b64decode(pickled_data))
        unpickled_data = dill.loads(pickle_bytes.read())
        pickle_bytes.close()
        return unpickled_data
    
    def upload_shapes(self, shape_list):
        """Upload a list of shapes to the server.

        :arg: shape_list: List of shapes to upload.
        :rtype: list. List of shape IDs.
        """
        function_name = 'upload_shapes'
        args = [shape_list]
        response_channel = self._publish_message(function_name, args=args)
        response = self._await_response(response_channel)
        return response

In [3]:
from nanome.util.enums import StreamType


class ColorStreamPlugin(PluginInstanceRedisInterface):

    def __init__(self, redis_host, redis_port, redis_password, redis_channel=None):
        super().__init__(redis_host, redis_port, redis_password, redis_channel)
        # RGB values of the rainbow
        self.color_index = 0
        self.roygbiv = [
            (255, 0, 0),  # Red
            (255, 127, 0),  # Orange
            (255, 255, 0),  # Yellow
            (0, 255, 0),  # Green
            (0, 0, 255),  # Blue
            (75, 0, 130),  # Indigo
            (148, 0, 211),  # Violet
        ]
    
    def cycle_color(self, comp_indices):
        """For all atom in selected complex, change color."""
        comps = self.request_complexes(comp_indices)
        new_color_rgba = self.roygbiv[self.color_index]

        # Create a writing stream to set colors for every atom in the complexes.
        stream_type = StreamType.color
        atom_indices = []
        for comp in comps:
            atom_indices.extend([atom.index for atom in comp.atoms])

        stream, error = self.create_writing_stream(atom_indices, stream_type)
        if error:
            raise Exception(f"Stream failed to initialize, Please try again. {self.error}")

        # Set the color for every atom in the stream.
        stream_data = []
        for _ in atom_indices:
            stream_data.extend(new_color_rgba)
        self.color_index = (self.color_index + 1) % len(self.roygbiv)
        stream.update(stream_data)


In [4]:
plugin_instance = ColorStreamPlugin(redis_host, redis_port, redis_password, redis_channel=redis_channel)

In [5]:
comps = plugin_instance.request_complex_list()
comp = comps[0]
print(comp)
             

[0mSending request_complex_list Request to Redis Channel 0c66e681-ce60-4f64-8992-4e3a714f7962[0m
[0mResponse received on channel 6b226ffd-9e4e-470e-8b18-5e3d2c8d92d2[0m
<nanome.api.structure.complex.Complex object at 0x7facbafe8760>


In [7]:
# Changing complex color
print("changing comp color")
plugin_instance.cycle_color([comp.index])

changing comp color
[0mSending request_complexes Request to Redis Channel 0c66e681-ce60-4f64-8992-4e3a714f7962[0m
[0mResponse received on channel 3d0a4093-2f57-44a9-be16-8946bb9b28a7[0m
[0mSending create_writing_stream Request to Redis Channel 0c66e681-ce60-4f64-8992-4e3a714f7962[0m
[0mResponse received on channel 06a66578-a1eb-4e69-9454-88e3a0fb3b24[0m
[0mSending stream_update Request to Redis Channel 0c66e681-ce60-4f64-8992-4e3a714f7962[0m
[0mResponse received on channel 797a1647-bce9-46aa-94eb-7365d2c7438a[0m
