Skip to content

Commit

Permalink
Merge branch 'master' of github.com:codalab/codalab-worksheets into s…
Browse files Browse the repository at this point in the history
…taging
  • Loading branch information
epicfaace committed Apr 15, 2020
2 parents adaf6cf + 106e0f9 commit 6a9a49c
Show file tree
Hide file tree
Showing 35 changed files with 6,073 additions and 3,415 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
"""Add tag_exclusive workers
Revision ID: 9d0f1ffb18e9
Revises: 75d4288ae265
Create Date: 2020-03-29 04:57:59.567422
"""

# revision identifiers, used by Alembic.
revision = '9d0f1ffb18e9'
down_revision = '75d4288ae265'

from alembic import op
import sqlalchemy as sa


def upgrade():
op.add_column('worker', sa.Column('tag_exclusive', sa.Boolean(), nullable=False, server_default=sa.false()))


def downgrade():
op.drop_column('worker', 'tag_exclusive')
2 changes: 1 addition & 1 deletion codalab/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

# Increment this on master when ready to cut a release.
# http://semver.org/
CODALAB_VERSION = '0.5.10'
CODALAB_VERSION = '0.5.11'
BINARY_PLACEHOLDER = '<binary>'


Expand Down
30 changes: 26 additions & 4 deletions codalab/lib/bundle_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
from collections import defaultdict
from contextlib import closing
from io import BytesIO
from shlex import quote

import argcomplete
from argcomplete.completers import FilesCompleter, ChoicesCompleter
Expand All @@ -36,7 +37,14 @@
from codalab.bundles.make_bundle import MakeBundle
from codalab.bundles.uploaded_bundle import UploadedBundle
from codalab.bundles.run_bundle import RunBundle
from codalab.common import CODALAB_VERSION, NotFoundError, PermissionError, precondition, UsageError
from codalab.common import (
CODALAB_VERSION,
NotFoundError,
PermissionError,
precondition,
UsageError,
ensure_str,
)
from codalab.lib import (
bundle_util,
file_util,
Expand Down Expand Up @@ -313,7 +321,8 @@ def render_args(arguments):
if markdown:
name = HEADING_LEVEL_3 + name
return '%s%s:\n%s\n%s' % (
' ' * indent,
# This is to make GitHub Markdown format compatible with the Read the Docs theme.
' ' * indent if not markdown else '',
name,
'\n'.join((' ' * (indent * 2)) + line for line in command_obj.help),
'\n'.join(render_args(command_obj.arguments)),
Expand Down Expand Up @@ -795,7 +804,9 @@ def collapse_bare_command(argv):
"""
try:
i = argv.index('---')
argv = argv[0:i] + [' '.join(argv[i + 1 :])] # TODO: quote command properly
# Convert the command after '---' to a shell-escaped version of the string.
shell_escaped_command = [quote(x) for x in argv[i + 1 :]]
argv = argv[0:i] + [' '.join(shell_escaped_command)]
except:
pass

Expand Down Expand Up @@ -1052,6 +1063,7 @@ def do_workers_command(self, args):
'tag',
'runs',
'shared_file_system',
'tag_exclusive',
]

data = []
Expand All @@ -1070,6 +1082,7 @@ def do_workers_command(self, args):
'tag': worker['tag'],
'runs': ",".join([uuid[0:8] for uuid in worker['run_uuids']]),
'shared_file_system': worker['shared_file_system'],
'tag_exclusive': worker['tag_exclusive'],
}
)

Expand Down Expand Up @@ -1876,6 +1889,9 @@ def do_rm_command(self, args):
' search .limit=<limit> : Limit the number of results to the top <limit> (e.g., 50).',
' search .offset=<offset> : Return results starting at <offset>.',
'',
' search .before=<datetime> : Returns bundles created before (inclusive) given ISO 8601 timestamp (e.g., .before=2042-3-14).',
' search .after=<datetime> : Returns bundles created after (inclusive) given ISO 8601 timestamp (e.g., .after=2120-10-15T00:00:00-08).',
'',
' search size=.sort : Sort by a particular field (where `size` can be any metadata field).',
' search size=.sort- : Sort by a particular field in reverse (e.g., `size`).',
' search .last : Sort in reverse chronological order (equivalent to id=.sort-).',
Expand Down Expand Up @@ -2344,7 +2360,13 @@ def print_target_info(self, client, target, head=None, tail=None):

contents = client.fetch_contents_blob(info['resolved_target'], **kwargs)
with closing(contents):
shutil.copyfileobj(contents, self.stdout.buffer)
try:
shutil.copyfileobj(contents, self.stdout.buffer)
except AttributeError:
# self.stdout will have buffer attribute when it's an io.TextIOWrapper object. However, when
# self.stdout gets reassigned to an io.StringIO object, self.stdout.buffer won't exist.
# Therefore, we try to directly write file content as a String object to self.stdout.
self.stdout.write(ensure_str(contents.read()))

if self.headless:
print(
Expand Down
22 changes: 22 additions & 0 deletions codalab/model/bundle_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import time
import logging

from dateutil import parser

from uuid import uuid4

from sqlalchemy import and_, or_, not_, select, union, desc, func
Expand Down Expand Up @@ -522,6 +524,26 @@ def make_condition(key, field, value):
clause = cl_bundle.c.uuid.in_(
alias(select([cl_worksheet_item.c.bundle_uuid]).where(condition))
)
elif key in ('.before', '.after'):
target_datetime = parser.isoparse(value)

subclause = None
if key == '.before':
subclause = cl_bundle_metadata.c.metadata_value <= int(
target_datetime.timestamp()
)
if key == '.after':
subclause = cl_bundle_metadata.c.metadata_value >= int(
target_datetime.timestamp()
)

clause = cl_bundle.c.uuid.in_(
alias(
select([cl_bundle_metadata.c.bundle_uuid]).where(
and_(cl_bundle_metadata.c.metadata_key == 'created', subclause)
)
)
)
elif key == 'uuid_name': # Search uuid and name by default
clause = []
clause.append(cl_bundle.c.uuid.like('%' + value + '%'))
Expand Down
3 changes: 3 additions & 0 deletions codalab/model/tables.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,6 +316,9 @@
Column(
'shared_file_system', Boolean, nullable=False
), # Whether the worker and the server have a shared filesystem.
Column(
'tag_exclusive', Boolean, nullable=False
), # Whether worker runs bundles if and only if they match tags.
)

# Store information about all sockets currently allocated to each worker.
Expand Down
3 changes: 3 additions & 0 deletions codalab/model/worker_model.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ def worker_checkin(
free_disk_bytes,
dependencies,
shared_file_system,
tag_exclusive,
):
"""
Adds the worker to the database, if not yet there. Returns the socket ID
Expand All @@ -61,6 +62,7 @@ def worker_checkin(
'free_disk_bytes': free_disk_bytes,
'checkin_time': datetime.datetime.utcnow(),
'shared_file_system': shared_file_system,
'tag_exclusive': tag_exclusive,
}
existing_row = conn.execute(
cl_worker.select().where(
Expand Down Expand Up @@ -185,6 +187,7 @@ def get_workers(self):
'dependencies': row.dependencies
and self._deserialize_dependencies(row.dependencies),
'shared_file_system': row.shared_file_system,
'tag_exclusive': row.tag_exclusive,
}
for row in worker_rows
}
Expand Down
1 change: 1 addition & 0 deletions codalab/rest/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def checkin(worker_id):
request.json.get("free_disk_bytes"),
request.json["dependencies"],
request.json.get("shared_file_system", False),
request.json.get("tag_exclusive", False),
)

for run in request.json["runs"]:
Expand Down
69 changes: 55 additions & 14 deletions codalab/server/bundle_manager.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import copy
from collections import defaultdict
import datetime
import logging
import os
Expand Down Expand Up @@ -314,21 +315,50 @@ def _bring_offline_stuck_running_bundles(self, workers):
def _schedule_run_bundles_on_workers(self, workers, staged_bundles_to_run, user_info_cache):
"""
Schedule STAGED bundles to run on available workers based on the following logic:
1. If the bundle requests to run on a specific worker, tries to schedule the bundle
to run on a worker that has a tag exactly match with request_queue.
2. If the bundle doesn't request to run on a specific worker,
1. For a given user, schedule the highest-priority bundles first, followed by bundles
that request to run on a specific worker.
2. If the bundle requests to run on a specific worker, schedule the bundle
to run on a worker that has a tag that exactly matches the bundle's request_queue.
3. If the bundle doesn't request to run on a specific worker,
(1) try to schedule the bundle to run on a worker that belongs to the bundle's owner
(2) if there is no such qualified private worker, uses CodaLab-owned workers, which have user ID root_user_id.
:param workers: a WorkerInfoAccessor object containing worker related information e.g. running uuid.
:param staged_bundles_to_run: a list of tuples each contains a valid bundle and its bundle resources.
:param user_info_cache: a dictionary mapping user id to user information.
"""
# Reorder the stage_bundles so that bundles which were requested to run on a specific worker
# will be scheduled to run first
staged_bundles_to_run.sort(
key=lambda b: (b[0].metadata.request_queue is not None, b[0].metadata.request_queue),
reverse=True,
)
# Build a dictionary which maps from user id to positions in the queue of the
# user's staged bundles. We use this to sort bundles within each user. For example,
# Suppose we have 4 staged bundles with the following attributes from 2 users:
# Users: [A, B, A, B, A]
# Bundle Priorities: [1, 2, 3, 1, 1]
# Bundle specified request_queue: [False, False, False, False, True]
# Original Bundle Order: [B1, B2, B3, B4, B5]
# Sorted bundle order: [B3, B2, B5, B4, B1]
user_queue_positions = defaultdict(list)
for queue_position, staged_bundle in enumerate(staged_bundles_to_run):
user_queue_positions[staged_bundle[0].owner_id].append(queue_position)

for user, queue_positions in user_queue_positions.items():
assert queue_positions == sorted(queue_positions)
# Get this user's staged bundles
user_staged_bundles = [
staged_bundles_to_run[queue_position] for queue_position in queue_positions
]
# Sort the staged bundles for this user, according to
# (1) their priority (larger values indicate higher priority) and
# (2) whether it requested to run on a specific worker (bundles with a specified
# worker have higher priority).
sorted_user_staged_bundles = sorted(
user_staged_bundles,
key=lambda b: (
b[0].metadata.request_priority is not None,
b[0].metadata.request_priority,
b[0].metadata.request_queue is not None,
),
reverse=True,
)
for queue_position, bundle in zip(queue_positions, sorted_user_staged_bundles):
staged_bundles_to_run[queue_position] = bundle

# Build a dictionary which maps from uuid to running bundle and bundle_resources
running_bundles_info = self._get_running_bundles_info(workers, staged_bundles_to_run)
Expand Down Expand Up @@ -388,6 +418,15 @@ def _filter_and_sort_workers(self, workers_list, bundle, bundle_resources):
# Filter by tag.
if bundle.metadata.request_queue:
workers_list = self._get_matched_workers(bundle.metadata.request_queue, workers_list)
else:
# The bundle is untagged, so we want to keep workers that are not
# tag-exclusive or don't have a tag defined.
# (removing workers that are tag_exclusive and have tags defined).
workers_list = [
worker
for worker in workers_list
if not worker['tag_exclusive'] or not worker['tag']
]

# Filter by CPUs.
workers_list = [
Expand Down Expand Up @@ -428,12 +467,14 @@ def get_sort_key(worker):
num_available_deps = len(needed_deps & deps)

# Subject to the worker meeting the resource requirements of the bundle, we also want to:
# 1. prioritize workers with fewer GPUs (including zero).
# 2. prioritize workers that have more bundle dependencies.
# 3. prioritize workers with fewer CPUs.
# 4. prioritize workers with fewer running jobs.
# 5. break ties randomly by a random seed.
# 1. prioritize workers that are tag-exclusive.
# 2. prioritize workers with fewer GPUs (including zero).
# 3. prioritize workers that have more bundle dependencies.
# 4. prioritize workers with fewer CPUs.
# 5. prioritize workers with fewer running jobs.
# 6. break ties randomly by a random seed.
return (
not worker['tag_exclusive'],
worker['gpus'],
-num_available_deps,
worker['cpus'],
Expand Down
9 changes: 5 additions & 4 deletions codalab/worker/bundle_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -76,8 +76,8 @@ def __init__(
self.is_anonymous = is_anonymous
self.metadata = metadata
self.args = args
self.dependencies = {
DependencyKey(dep["parent_uuid"], dep["parent_path"]): Dependency(
self.dependencies = [
Dependency(
parent_name=dep["parent_name"],
parent_path=dep["parent_path"],
parent_uuid=dep["parent_uuid"],
Expand All @@ -86,13 +86,14 @@ def __init__(
location=dep.get("location", None),
)
for dep in dependencies
} # type: Dict[DependencyKey, Dependency]
] # type: List[Dependency]

self.location = location # set if local filesystem

@property
def as_dict(self):
dct = generic_to_dict(self)
dct['dependencies'] = [v for k, v in dct['dependencies'].items()]
dct['dependencies'] = [generic_to_dict(v) for v in dct['dependencies']]
return dct

def __str__(self):
Expand Down
6 changes: 6 additions & 0 deletions codalab/worker/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,11 @@ def parse_args():
action='store_true',
help='To be used when the server and the worker share the bundle store on their filesystems.',
)
parser.add_argument(
'--tag-exclusive',
action='store_true',
help='To be used when the worker should only run bundles that match the worker\'s tag.',
)
return parser.parse_args()


Expand Down Expand Up @@ -206,6 +211,7 @@ def main():
args.idle_seconds,
bundle_service,
args.shared_file_system,
args.tag_exclusive,
docker_runtime=docker_runtime,
docker_network_prefix=args.network_prefix,
)
Expand Down
4 changes: 2 additions & 2 deletions codalab/worker/reader.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def get_target_info(self, run_state, path, args, reply_fn):
Return target_info of path in bundle as a message on the reply_fn
"""
target_info = None
dep_paths = set([dep.child_path for dep in run_state.bundle.dependencies.values()])
dep_paths = set([dep.child_path for dep in run_state.bundle.dependencies])

# if path is a dependency raise an error
if path and os.path.normpath(path) in dep_paths:
Expand Down Expand Up @@ -93,7 +93,7 @@ def stream_directory(self, run_state, path, args, reply_fn):
"""
Stream the directory at path using a separate thread
"""
dep_paths = set([dep.child_path for dep in run_state.bundle.dependencies.values()])
dep_paths = set([dep.child_path for dep in run_state.bundle.dependencies])
exclude_names = [] if path else dep_paths

def stream_thread(final_path):
Expand Down
3 changes: 3 additions & 0 deletions codalab/worker/worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,7 @@ def __init__(
idle_seconds, # type: int
bundle_service, # type: BundleServiceClient
shared_file_system, # type: bool
tag_exclusive, # type: bool
docker_runtime=docker_utils.DEFAULT_RUNTIME, # type: str
docker_network_prefix='codalab_worker_network', # type: str
):
Expand All @@ -73,6 +74,7 @@ def __init__(

self.id = worker_id
self.tag = tag
self.tag_exclusive = tag_exclusive

self.work_dir = work_dir
self.local_bundles_dir = local_bundles_dir
Expand Down Expand Up @@ -239,6 +241,7 @@ def checkin(self):
'hostname': socket.gethostname(),
'runs': [run.as_dict for run in self.all_runs],
'shared_file_system': self.shared_file_system,
'tag_exclusive': self.tag_exclusive,
}
try:
response = self.bundle_service.checkin(self.id, request)
Expand Down
Loading

0 comments on commit 6a9a49c

Please sign in to comment.