Skip to content

Commit

Permalink
Make client.disabled an off-only feature. In the process, split Con…
Browse files Browse the repository at this point in the history
…nection.py into DeltaConnection/Connection, to make WebSocket-specific logic clearer. This makes it easier to write the code to disable streamlit.
  • Loading branch information
tvst committed Nov 13, 2018
1 parent 85786de commit 778e337
Show file tree
Hide file tree
Showing 7 changed files with 265 additions and 222 deletions.
248 changes: 48 additions & 200 deletions lib/streamlit/Connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,27 +9,18 @@
from streamlit.compatibility import setup_2_3_shims
setup_2_3_shims(globals())

import base58
import inspect
import os
import sys
import threading
import time
import urllib
import uuid

from functools import wraps

from tornado import gen
from tornado.ioloop import IOLoop
from tornado.websocket import websocket_connect

from streamlit.util import get_local_id
from streamlit import config
from streamlit.DeltaGenerator import DeltaGenerator
from streamlit.wsutil import write_proto
from streamlit.ReportQueue import ReportQueue, MESSAGE_SIZE_LIMIT
from streamlit.streamlit_msg_proto import new_report_msg
from streamlit import protobuf

from streamlit.logger import get_logger
LOGGER = get_logger()
Expand All @@ -41,73 +32,54 @@
}


def _assert_singleton(method):
"""Assert that method is called on singleton instance of Connection."""
@wraps(method)
def inner(self, *args, **kwargs):
assert self == Connection._connection, \
f'Can only call {method.__name__}() on the singleton Connection.'
return method(self, *args, **kwargs)
return inner


class Connection(object):
"""Represents a single connection to the server for a single report."""
"""Represents/manages the actual websocket connection to the server.
# This is the singleton connection object.
_connection = None
This class should be strictly about handling the actual websocket
connection lifetime and data transfer, without any knowledge of what is
actually being transferred.
"""

# The _proxy_connection_status can take one of these three values:
_PROXY_CONNECTION_DISCONNECTED = 'disconnected'
_PROXY_CONNECTION_CONNECTED = 'connected'
_PROXY_CONNECTION_FAILED = 'failed'

# This is the class through which we can add elements to the Report
def __init__(self, enabled=True):
"""Create a new connection to the server."""
# Create an ID for this Report
self._report_id = base58.b58encode(uuid.uuid4().bytes).decode("utf-8")

# Create a name for this report.
self._name = _create_name(self._report_id)
LOGGER.debug(f'Created a connection with name "{self._name}"')
def __init__(self, uri, new_report_msg, on_connect, on_cleanup):
"""Create a new connection to the server.
Parameters
----------
uri : str
The Proxy URI for this WebSocket connection.
new_report_msg : protobuf.ForwardMsg
First message to send via the connection, as soon as the connection
is established.
on_connect : callable
Function to call when the connection is made.
on_cleanup : callable
Function to call when the connection is destroyed.
"""
# This is the event loop to talk with the proxy.
self._loop = IOLoop(make_current=False)
LOGGER.debug(f'Created io loop {self._loop}.')

root_frame = inspect.stack()[-1]
filename = root_frame[1] # 1 is the filename field in this tuple.

# Full path of the file that caused this connection to be created.
self._source_file_path = '' # Empty string means "no file" to us.

# Check if we're in the REPL by looking at magic filename strings.
# <stdin> is what the basic Python REPL calls the root frame's
# filename, and <string> is what iPython calls it.
if filename not in ('<stdin>', '<string>'):
self._source_file_path = os.path.realpath(filename)

LOGGER.debug(f'source_file_path: {self._source_file_path}.')

# This ReportQueue stores deltas until they're ready to be transmitted
# over the websocket.
#
# VERY IMPORTANT: The key to understanding local threading in Streamlit
# is that self._queue acts like a thread-safe channel for data (in this
# case deltas) from the main thread (e.g. st.write()) to the proxy
# connection thread (e.g. flush_queue()). To ensure the thread-safety of
# this channel, ALL methods called on this ReportQueue must happen in a
# coroutine executed on self._loop. For example, the ReportQueue is
# filled by calling _enqueue_delta which schedules a callback on
# connection thread (e.g. flush_queue()). To ensure the thread-safety
# of this channel, ALL methods called on this ReportQueue must happen
# in a coroutine executed on self._loop. For example, the ReportQueue
# is filled by calling enqueue_delta, which schedules a callback on
# self._loop. The ReportQueue is drained in _transmit_through_websocket
# which also runs on self._loop. Directly manipulating self._queue
# from other threads could cause rare, subtle race conditions!
# which also runs on self._loop. Directly manipulating self._queue from
# other threads could cause rare, subtle race conditions!
self._queue = ReportQueue()

# If True, will not allow adding more items to the queue.
self._allow_enqueueing_items = enabled

# Will stay open until the main thread closes. Then gets set to false to
# cleanly close down the connection. Like self._queue, this variable
# is only ever accessed
Expand All @@ -121,137 +93,51 @@ def __init__(self, enabled=True):
self._proxy_connection_status = (
Connection._PROXY_CONNECTION_DISCONNECTED)

# This is the class through which we can add elements to the Report
self._delta_generator = DeltaGenerator(self._enqueue_delta)

@classmethod
def get_connection(cls, enabled=True):
"""Return the singleton Connection object.
Instantiates one if necessary.
Parameters
----------
enabled : bool
Whether to the connection should be enabled (True) or disabled
(False). The difference is that disabled connections silently drop
all deltas you try to enqueue.
NOTE: If the connection singleton already exists, this sets its
state enabled/disabled state.
"""
# Instantiate the singleton connection if necessary.
if cls._connection is None:
LOGGER.debug('No connection. Registering one.')

# Create the new connection.
Connection(enabled=enabled).register()

else:
cls._connection.set_enabled(enabled)
self._connect(uri, new_report_msg, on_cleanup, on_cleanup)

# Now that we're sure to have a connection, return it.
return cls._connection

def register(self):
"""Set up this connection to be the singleton connection."""
def _connect(self, uri, new_report_msg, on_connect, on_cleanup):
"""Connect to the proxy and set up output thread."""
# Establish this connection and connect to the proxy server.
assert type(self)._connection is None, 'Cannot register two connections'
Connection._connection = self

self._connect_to_proxy()

# Override the default exception handler.
original_excepthook = sys.excepthook
self._connect_to_proxy(uri, new_report_msg)

def streamlit_excepthook(exc_type, exc_value, exc_tb):
self.get_delta_generator().exception(exc_value, exc_tb)
original_excepthook(exc_type, exc_value, exc_tb)
sys.excepthook = streamlit_excepthook
on_connect()

# When the current thread closes, then close down the connection.
main_thread = threading.current_thread()
cleanup_thread = threading.Thread(
target=self._cleanup_on_exit,
args=(main_thread, original_excepthook),
args=(main_thread, on_cleanup),
)
cleanup_thread.daemon = False
cleanup_thread.start()

@_assert_singleton
def unregister(self):
"""Remove this connection from being the singleton connection."""
Connection._connection = None

def set_enabled(self, state):
"""Enable or disable connection.
If you call enqueue() on a disabled connection, it's a no-op.
Parameters
----------
state : bool
Whether to enable (True) or disable (False) the connection.
"""
self._allow_enqueueing_items = state

@_assert_singleton
def get_delta_generator(self):
"""Return the DeltaGenerator for this connection.
This is the object that allows you to dispatch toplevel deltas to the
Report, e.g. adding new elements.
"""
return self._delta_generator

@_assert_singleton
def _enqueue_delta(self, delta):
def enqueue_delta(self, delta):
"""Enqueue the given delta for transmission to the server."""
self._loop.add_callback(lambda: self._queue(delta))

# Read the value of _allow_enqueueing_items at this point rather than
# inside the inner function below because that function gets enqueued
# and executed asynchronously -- at which point the value of
# _allow_enqueueing_items may be changed.
enabled = self._allow_enqueueing_items

def queue_the_delta():
if enabled:
self._queue(delta)
self._loop.add_callback(queue_the_delta)

@_assert_singleton
def _connect_to_proxy(self):
def _connect_to_proxy(self, uri, new_report_msg):
"""Open a connection to the server in a separate thread."""
def connection_thread():
self._loop.make_current()
LOGGER.debug(f'Running proxy on loop {IOLoop.current()}.')
self._loop.run_sync(self._attempt_connection)
self._loop.run_sync(
lambda: self._attempt_connection(uri, new_report_msg))
self._loop.close()
self.unregister()
LOGGER.debug('Exit. (deltas remaining = %s)' % len(self._queue._deltas))
LOGGER.debug(
'Exit. (deltas remaining = %s)' % len(self._queue._deltas))

connection_thread = threading.Thread(target=connection_thread)
connection_thread.daemon = False
connection_thread.start()

@_assert_singleton
@gen.coroutine
def _attempt_connection(self):
def _attempt_connection(self, uri, new_report_msg):
"""Try to establish a connection to the proxy.
Launches the proxy (if necessary), then pumps deltas through the
connection. Updates self._proxy_connection_status to indicate whether
we succeeded in connecting to the proxy.
"""
# Create a connection URI.
server = config.get_option('proxy.server')
port = config.get_option('proxy.port')
local_id = get_local_id()
report_name = urllib.parse.quote_plus(self._name)
uri = f'ws://{server}:{port}/new/{local_id}/{report_name}'

already_connected = (
self._proxy_connection_status ==
Connection._PROXY_CONNECTION_DISCONNECTED)
Expand All @@ -262,7 +148,7 @@ def _attempt_connection(self):
ws = yield websocket_connect(uri, **WS_ARGS)
self._proxy_connection_status = (
Connection._PROXY_CONNECTION_CONNECTED)
yield self._transmit_through_websocket(ws)
yield self._transmit_through_websocket(ws, new_report_msg)
return
except IOError:
LOGGER.info(f'First connection to {uri} failed.')
Expand All @@ -275,15 +161,14 @@ def _attempt_connection(self):
ws = yield websocket_connect(uri, **WS_ARGS)
self._proxy_connection_status = (
Connection._PROXY_CONNECTION_CONNECTED)
yield self._transmit_through_websocket(ws)
yield self._transmit_through_websocket(ws, new_report_msg)
except IOError:
# Indicate that we failed to connect to the proxy so that the
# cleanup thread can now run.
LOGGER.info(f'Failed to connect to proxy at {uri}.')
self._proxy_connection_status = Connection._PROXY_CONNECTION_FAILED
raise ProxyConnectionError(uri)

@_assert_singleton
@gen.coroutine
def _launch_proxy(self):
"""Launch the proxy server."""
Expand All @@ -292,14 +177,11 @@ def _launch_proxy(self):
LOGGER.debug('Sleeping %f seconds while waiting Proxy to start', wait_for_proxy_secs)
yield gen.sleep(wait_for_proxy_secs)

@_assert_singleton
@gen.coroutine
def _transmit_through_websocket(self, ws):
def _transmit_through_websocket(self, ws, new_report_msg):
"""Send queue data across the websocket as it becomes available."""
# Send the header information across.
yield new_report_msg(
self._report_id, os.getcwd(), ['python'] + sys.argv,
self._source_file_path, ws)
yield write_proto(ws, new_report_msg)
LOGGER.debug('Just sent a new_report_msg with: ' + str(sys.argv))

# Send other information across.
Expand All @@ -317,7 +199,7 @@ def _transmit_through_websocket(self, ws):
yield ws.close()
LOGGER.debug('Closed the connection object.')

def _cleanup_on_exit(self, main_thread, original_excepthook):
def _cleanup_on_exit(self, main_thread, on_cleanup):
"""Perform final cleanup after main thread finishes.
This thread waits for the main thread to exit, then does some final
Expand All @@ -327,6 +209,8 @@ def _cleanup_on_exit(self, main_thread, original_excepthook):
LOGGER.debug('Cleanup thread waiting for main thread to end.')
main_thread.join()

on_cleanup()

# Then wait for a certain number of seconds to connect to the proxy
# to make sure that we can flush the connection queue.
start_time = time.time()
Expand Down Expand Up @@ -355,8 +239,7 @@ def _cleanup_on_exit(self, main_thread, original_excepthook):
'_proxy_connection_status illegal value: ' +
str(self._proxy_connection_status))
break
LOGGER.debug('Main thread ended. Restoring excepthook.')
sys.excepthook = original_excepthook

self._loop.add_callback(setattr, self, '_is_open', False)
LOGGER.debug('Submitted callback to stop the connection thread.')

Expand All @@ -375,38 +258,3 @@ def __init__(self, uri):
"""
msg = f'Unable to connect to proxy at {uri}.'
super(ProxyConnectionError, self).__init__(msg)


def _create_name(report_id):
"""Create a name for this report."""
name = ''
if len(sys.argv) >= 2 and sys.argv[0] == '-m':
name = sys.argv[1]
elif len(sys.argv) >= 1:
name = os.path.split(sys.argv[0])[1]
if name.endswith('.py'):
name = name[:-3]
if name == '__main__' and len(sys.argv) >= 2:
name = sys.argv[1]

if name == '':
name = str(_report_id)
return name


if __name__ == '__main__':
c = Connection().get_connection()
queue = Connection()._queue
_id = 0
LOGGER.debug(queue)

while True:
delta = protobuf.Delta()
delta.id = _id
delta.new_element.text.format = protobuf.Text.MARKDOWN
delta.new_element.text.body = '{}'.format(
time.time())
queue(delta)
_id += 1

time.sleep(5)

0 comments on commit 778e337

Please sign in to comment.