Skip to content

Commit

Permalink
Move stderr and stdout channels to system namespace (#96)
Browse files Browse the repository at this point in the history
  • Loading branch information
aniezurawski committed May 10, 2019
1 parent 7438949 commit e874e10
Show file tree
Hide file tree
Showing 8 changed files with 139 additions and 25 deletions.
2 changes: 1 addition & 1 deletion Makefile
Expand Up @@ -5,7 +5,7 @@ prepare:
pip install -r requirements.txt -r test_requirements.txt

build:
python setup.py git_version sdist
python setup.py sdist

tests: checkstyle_tests unit_tests

Expand Down
36 changes: 36 additions & 0 deletions neptune/client.py
Expand Up @@ -378,6 +378,42 @@ def create_channel(self, experiment, name, channel_type):
except HTTPConflict:
raise ChannelAlreadyExists(channel_name=name, experiment_short_id=experiment.id)

@with_api_exceptions_handler
def create_system_channel(self, experiment, name, channel_type):
ChannelParams = self.backend_swagger_client.get_model('ChannelParams')

try:
params = ChannelParams(
name=name,
channelType=channel_type
)

channel = self.backend_swagger_client.api.createSystemChannel(
experimentId=experiment.internal_id,
channelToCreate=params
).response().result

return self._convert_channel_to_channel_with_last_value(channel)
except HTTPNotFound:
# pylint: disable=protected-access
raise ExperimentNotFound(
experiment_short_id=experiment.id, project_qualified_name=experiment._project_full_id)
except HTTPConflict:
raise ChannelAlreadyExists(channel_name=name, experiment_short_id=experiment.id)

@with_api_exceptions_handler
def get_system_channels(self, experiment):
try:
channels = self.backend_swagger_client.api.getSystemChannels(
experimentId=experiment.internal_id,
).response().result

return channels
except HTTPNotFound:
# pylint: disable=protected-access
raise ExperimentNotFound(
experiment_short_id=experiment.id, project_qualified_name=experiment._project_full_id)

@with_api_exceptions_handler
def send_channels_values(self, experiment, channels_with_values):
InputChannelValues = self.backend_swagger_client.get_model('InputChannelValues')
Expand Down
55 changes: 47 additions & 8 deletions neptune/experiments.py
Expand Up @@ -25,7 +25,7 @@

from neptune.api_exceptions import ExperimentAlreadyFinished
from neptune.exceptions import FileNotFound, InvalidChannelValue, NoChannelValue, NoExperimentContext
from neptune.internal.channels.channels import ChannelValue, ChannelType
from neptune.internal.channels.channels import ChannelValue, ChannelType, ChannelNamespace
from neptune.internal.channels.channels_values_sender import ChannelsValuesSender
from neptune.internal.execution.execution_context import ExecutionContext
from neptune.internal.storage.storage_utils import upload_to_storage
Expand Down Expand Up @@ -228,6 +228,35 @@ def get_channels(self):
channels[ch.name] = ch
return channels

def get_system_channels(self):
"""Retrieve all system channel names along with their representations for this experiment.
Returns:
dict: A dictionary mapping a channel name to channel.
Examples:
Instantiate a session.
>>> from neptune.sessions import Session
>>> session = Session()
Fetch a project and a list of experiments.
>>> project = session.get_projects('neptune-ml')['neptune-ml/Salt-Detection']
>>> experiments = project.get_experiments(state=['aborted'], owner=['neyo'], min_running_time=100000)
Get an experiment instance.
>>> experiment = experiments[0]
Get experiment channels.
>>> experiment.get_system_channels()
"""
channels = self._client.get_system_channels(self)
return dict((ch.name, ch) for ch in channels)

def upload_source_files(self, source_files):
"""
Raises:
Expand Down Expand Up @@ -600,17 +629,27 @@ def _get_channels(self, channels_names_with_types):
channels_by_name[channel.name] = channel
return channels_by_name

def _get_channel(self, channel_name, channel_type):
channel = self._find_channel(channel_name)
def _get_channel(self, channel_name, channel_type, channel_namespace=ChannelNamespace.USER):
channel = self._find_channel(channel_name, channel_namespace)
if channel is None:
channel = self._create_channel(channel_name, channel_type)
channel = self._create_channel(channel_name, channel_type, channel_namespace)
return channel

def _find_channel(self, channel_name):
return self.get_channels().get(channel_name, None)
def _find_channel(self, channel_name, channel_namespace):
if channel_namespace == ChannelNamespace.USER:
return self.get_channels().get(channel_name, None)
elif channel_namespace == ChannelNamespace.SYSTEM:
return self.get_system_channels().get(channel_name, None)
else:
raise RuntimeError("Unknown channel namesapce {}".format(channel_namespace))

def _create_channel(self, channel_name, channel_type):
return self._client.create_channel(self, channel_name, channel_type)
def _create_channel(self, channel_name, channel_type, channel_namespace=ChannelNamespace.USER):
if channel_namespace == ChannelNamespace.USER:
return self._client.create_channel(self, channel_name, channel_type)
elif channel_namespace == ChannelNamespace.SYSTEM:
return self._client.create_system_channel(self, channel_name, channel_type)
else:
raise RuntimeError("Unknown channel namesapce {}".format(channel_namespace))


_experiments_stack = []
Expand Down
10 changes: 9 additions & 1 deletion neptune/internal/channels/channels.py
Expand Up @@ -17,7 +17,10 @@
from collections import namedtuple
from enum import Enum

ChannelNameWithType = namedtuple("ChannelNameWithType", ['channel_name', 'channel_type'])
ChannelNameWithTypeAndNamespace = namedtuple(
"ChannelNameWithType",
['channel_name', 'channel_type', 'channel_namespace']
)
ChannelIdWithValues = namedtuple('ChannelIdWithValues', ['channel_id', 'channel_values'])


Expand All @@ -27,6 +30,11 @@ class ChannelType(Enum):
IMAGE = 'image'


class ChannelNamespace(Enum):
USER = 'user'
SYSTEM = 'system'


class ChannelValue(object):

def __init__(self, x, y, ts):
Expand Down
35 changes: 27 additions & 8 deletions neptune/internal/channels/channels_values_sender.py
Expand Up @@ -22,14 +22,18 @@
from future.moves import queue

from neptune.api_exceptions import NeptuneApiException
from neptune.internal.channels.channels import ChannelIdWithValues, ChannelNameWithType, ChannelValue, ChannelType
from neptune.internal.channels.channels import ChannelIdWithValues, ChannelNameWithTypeAndNamespace, ChannelValue,\
ChannelType, ChannelNamespace
from neptune.internal.threads.neptune_thread import NeptuneThread

_logger = logging.getLogger(__name__)


class ChannelsValuesSender(object):
_QUEUED_CHANNEL_VALUE = namedtuple("QueuedChannelValue", ['channel_name', 'channel_type', 'channel_value'])
_QUEUED_CHANNEL_VALUE = namedtuple(
"QueuedChannelValue",
['channel_name', 'channel_type', 'channel_value', 'channel_namespace']
)

__LOCK = threading.RLock()

Expand All @@ -38,15 +42,16 @@ def __init__(self, experiment):
self._values_queue = None
self._sending_thread = None

def send(self, channel_name, channel_type, channel_value):
def send(self, channel_name, channel_type, channel_value, channel_namespace=ChannelNamespace.USER):
with self.__LOCK:
if not self._is_running():
self._start()

self._values_queue.put(self._QUEUED_CHANNEL_VALUE(
channel_name=channel_name,
channel_type=channel_type,
channel_value=channel_value
channel_value=channel_value,
channel_namespace=channel_namespace
))

def join(self):
Expand Down Expand Up @@ -108,18 +113,32 @@ def _process_batch(self):
self._sleep_time = self._SLEEP_TIME - (time.time() - send_start)

def _send_values(self, queued_channels_values):
channel_key = lambda value: ChannelNameWithType(value.channel_name, value.channel_type)
def channel_key(value):
return ChannelNameWithTypeAndNamespace(
value.channel_name,
value.channel_type,
value.channel_namespace
)

queued_grouped_by_channel = {channel: list(values)
for channel, values
in groupby(sorted(queued_channels_values, key=channel_key),
channel_key)}
channels_with_values = []
# pylint: disable=protected-access
channels_by_name = self._experiment._get_channels(list(queued_grouped_by_channel.keys()))
user_channels_by_name = self._experiment._get_channels([
(ch.channel_name, ch.channel_type)
for ch in queued_grouped_by_channel.keys()
if ch.channel_namespace == ChannelNamespace.USER
])
system_channels_by_name = self._experiment.get_system_channels()

for channel_key in queued_grouped_by_channel:
channel = channels_by_name[channel_key.channel_name]
last_x = channel.x if channel.x else 0
if channel_key.channel_namespace == ChannelNamespace.USER:
channel = user_channels_by_name[channel_key.channel_name]
else:
channel = system_channels_by_name[channel_key.channel_name]
last_x = channel.x if hasattr(channel, 'x') and channel.x else 0
channel_values = []
for queued_value in queued_grouped_by_channel[channel_key]:
x = queued_value.channel_value.x if queued_value.channel_value.x is not None else last_x + 1
Expand Down
18 changes: 14 additions & 4 deletions neptune/internal/streams/channel_writer.py
Expand Up @@ -19,14 +19,17 @@
import re
import time

from neptune.internal.channels.channels import ChannelNamespace, ChannelValue, ChannelType


class ChannelWriter(object):
__SPLIT_PATTERN = re.compile(r'[\n\r]{1,2}')

def __init__(self, experiment, channel_name):
def __init__(self, experiment, channel_name, channel_namespace=ChannelNamespace.USER):
self.time_started_ms = time.time() * 1000
self._experiment = experiment
self._channel_name = channel_name
self._channel_namespace = channel_namespace
self._data = None

def write(self, data):
Expand All @@ -36,10 +39,17 @@ def write(self, data):
self._data += data
lines = self.__SPLIT_PATTERN.split(self._data)
for line in lines[:-1]:
self._experiment.send_text(
channel_name=self._channel_name,
value = ChannelValue(
x=time.time() * 1000 - self.time_started_ms,
y=str(line)
y=dict(text_value=str(line)),
ts=None
)
# pylint: disable=protected-access
self._experiment._channels_values_sender.send(
channel_name=self._channel_name,
channel_type=ChannelType.TEXT.value,
channel_value=value,
channel_namespace=self._channel_namespace
)

self._data = lines[-1]
5 changes: 3 additions & 2 deletions neptune/internal/streams/stdstream_uploader.py
Expand Up @@ -15,15 +15,16 @@
#
import sys

from neptune.internal.channels.channels import ChannelNamespace
from neptune.internal.streams.channel_writer import ChannelWriter


class StdStreamWithUpload(object):

def __init__(self, experiment, channel_name, stream):
# pylint:disable=protected-access
self._channel = experiment._get_channel(channel_name, 'text')
self._channel_writer = ChannelWriter(experiment, channel_name)
self._channel = experiment._get_channel(channel_name, 'text', ChannelNamespace.SYSTEM)
self._channel_writer = ChannelWriter(experiment, channel_name, ChannelNamespace.SYSTEM)
self._stream = stream

def write(self, data):
Expand Down
3 changes: 2 additions & 1 deletion tests/neptune/test_channel_writer.py
Expand Up @@ -31,7 +31,8 @@ def test_write_data_to_channel_writer(self):

writer.write('some\ndata')

experiment.send_text.assert_called_once()
# pylint: disable=protected-access
experiment._channels_values_sender.send.assert_called_once()


if __name__ == '__main__':
Expand Down

0 comments on commit e874e10

Please sign in to comment.