Skip to content

Commit

Permalink
Schema upgrade
Browse files Browse the repository at this point in the history
  • Loading branch information
kofalt committed May 16, 2016
1 parent b3cf66d commit 7a2d935
Show file tree
Hide file tree
Showing 5 changed files with 74 additions and 27 deletions.
22 changes: 11 additions & 11 deletions api/dao/containerutil.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,32 +36,32 @@ def add_id_to_subject(subject, pid):


# A FileReference tuple holds all the details of a scitran file needed to uniquely identify it.
FileReference = namedtuple('FileReference', ['container_type', 'container_id', 'filename'])
FileReference = namedtuple('FileReference', ['type', 'id', 'name'])

def create_filereference_from_dictionary(d):
if d['container_type'].endswith('s'):
if d['type'].endswith('s'):
raise Exception('Container type cannot be plural :|')

return FileReference(
container_type= d['container_type'],
container_id = d['container_id'],
filename = d['filename']
type = d['type'],
id = d['id'],
name = d['name']
)

# A ContainerReference tuple holds all the details of a scitran container needed to uniquely identify it.
ContainerReference = namedtuple('ContainerReference', ['container_type', 'container_id'])
ContainerReference = namedtuple('ContainerReference', ['type', 'id'])

def create_containerreference_from_dictionary(d):
if d['container_type'].endswith('s'):
if d['type'].endswith('s'):
raise Exception('Container type cannot be plural :|')

return ContainerReference(
container_type= d['container_type'],
container_id = d['container_id']
type = d['type'],
id = d['id']
)

def create_containerreference_from_filereference(fr):
return ContainerReference(
container_type= fr.container_type,
container_id = fr.container_id
type = fr.type,
id = fr.id
)
18 changes: 9 additions & 9 deletions api/jobs/jobs.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@


class Job(object):
def __init__(self, algorithm_id, inputs, destination=None, tags=None, attempt=1, previous_job_id=None, created=None, modified=None, state='pending', request=None, _id=None):
def __init__(self, name, inputs, destination=None, tags=None, attempt=1, previous_job_id=None, created=None, modified=None, state='pending', request=None, _id=None):
"""
Creates a job.
Parameters
----------
algorithm_id: string
name: string
Unique name of the algorithm
inputs: string -> FileReference map
The inputs to be used by this job
Expand Down Expand Up @@ -60,12 +60,12 @@ def __init__(self, algorithm_id, inputs, destination=None, tags=None, attempt=1,
destination = create_containerreference_from_filereference(inputs[key])

# A job is always tagged with the name of the gear
tags.append(algorithm_id)
tags.append(name)

# Trim tags array to unique members...
tags = list(set(tags))

self.algorithm_id = algorithm_id
self.name = name
self.inputs = inputs
self.destination = destination
self.tags = tags
Expand All @@ -89,7 +89,7 @@ def load(cls, d):

d['_id'] = str(d['_id'])

return cls(d['algorithm_id'], d['inputs'], destination=d['destination'], tags=d['tags'], attempt=d['attempt'], previous_job_id=d.get('previous_job_id', None), created=d['created'], modified=d['modified'], state=d['state'], request=d.get('request', None), _id=d['_id'])
return cls(d['name'], d['inputs'], destination=d['destination'], tags=d['tags'], attempt=d['attempt'], previous_job_id=d.get('previous_job_id', None), created=d['created'], modified=d['modified'], state=d['state'], request=d.get('request', None), _id=d['_id'])

@classmethod
def get(cls, _id):
Expand Down Expand Up @@ -137,7 +137,7 @@ def generate_request(self, gear=None):
Parameters
----------
gear: map (optional)
A gear_list map from the singletons.gears table. Will be loaded by the job's algorithm_id otherwise.
A gear_list map from the singletons.gears table. Will be loaded by the job's name otherwise.
"""

r = {
Expand All @@ -159,21 +159,21 @@ def generate_request(self, gear=None):
}

if gear is None:
gear = gears.get_gear_by_name(self.algorithm_id)
gear = gears.get_gear_by_name(self.name)

# Add the gear
r['inputs'].append(gear['input'])

# Map destination to upload URI
r['outputs'][0]['uri'] = '/engine?level=' + self.destination.container_type + '&id=' + self.destination.container_id
r['outputs'][0]['uri'] = '/engine?level=' + self.destination.type + '&id=' + self.destination.id

# Add the files
for input_name in self.inputs.keys():
i = self.inputs[input_name]

r['inputs'].append({
'type': 'scitran',
'uri': '/' + i.container_type + 's/' + i.container_id + '/files/' + i.filename,
'uri': '/' + i.type + 's/' + i.id + '/files/' + i.name,
'location': '/flywheel/v0/input/' + input_name,
})

Expand Down
6 changes: 3 additions & 3 deletions api/jobs/queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,11 +138,11 @@ def search(container, states=None, tags=None):

filter = """
for (var key in this['inputs']) {
var ct = this['inputs'][key]['container_type']
var ci = this['inputs'][key]['container_id']
var ct = this['inputs'][key]['type']
var ci = this['inputs'][key]['id']
if (ct === '$cT$' && ci == '$cI$') { return true }
}
""".replace('$cT$', container.container_type).replace('$cI$', container.container_id)
""".replace('$cT$', container.type).replace('$cI$', container.id)

query = { "$where": filter }

Expand Down
2 changes: 1 addition & 1 deletion api/jobs/rules.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ def create_jobs(db, container, container_type, file_):
for rule in rules:
if eval_rule(rule, file_, container):
alg_name = rule['alg']
input = FileReference(container_type=container_type, container_id=str(container['_id']), filename=file_['name'])
input = FileReference(type=container_type, id=str(container['_id']), name=file_['name'])

queue_job_legacy(db, alg_name, input)
job_list.append(alg_name)
Expand Down
53 changes: 50 additions & 3 deletions bin/database.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from api import config

CURRENT_DATABASE_VERSION = 8 # An int that is bumped when a new schema change is made
CURRENT_DATABASE_VERSION = 9 # An int that is bumped when a new schema change is made

def get_db_version():

Expand Down Expand Up @@ -204,12 +204,12 @@ def upgrade_to_7():
gear_name = job['algorithm_id']
input_name = input_name_for_gear[gear_name]

# # Move single input to named input map
# Move single input to named input map
input_ = job['input']
input_.pop('filehash', None)
inputs = { input_name: input_ }

# # Destination is required, and (for these jobs) is always the same container as the input
# Destination is required, and (for these jobs) is always the same container as the input
destination = copy.deepcopy(input_)
destination.pop('filename', None)

Expand Down Expand Up @@ -247,6 +247,51 @@ def upgrade_to_8():
config.db.drop_collection('version')
config.db.drop_collection('config')

def upgrade_to_9():
"""
scitran/core issue #301
Makes the following key renames, all in the jobs table.
FR is a FileReference, CR is a ContainerReference:
job.algorithm_id --> job.name
FR.container_type --> type
FR.container_id --> id
FR.filename --> name
CR.container_type --> type
CR.container_id --> id
"""

def switch_keys(doc, x, y):
doc[y] = doc[x]
doc.pop(x, None)


jobs = config.db.jobs.find({'destination.container_type': {'$exists': True}})

for job in jobs:
switch_keys(job, 'algorithm_id', 'name')

for key in job['inputs'].keys():
inp = job['inputs'][key]

switch_keys(inp, 'container_type', 'type')
switch_keys(inp, 'container_id', 'id')
switch_keys(inp, 'filename', 'name')


dest = job['destination']
switch_keys(dest, 'container_type', 'type')
switch_keys(dest, 'container_id', 'id')

config.db.jobs.update(
{'_id': job['_id']},
job
)


def upgrade_schema():
"""
Upgrades db to the current schema version
Expand All @@ -272,6 +317,8 @@ def upgrade_schema():
upgrade_to_7()
if db_version < 8:
upgrade_to_8()
if db_version < 9:
upgrade_to_9()
except Exception as e:
logging.exception('Incremental upgrade of db failed')
sys.exit(1)
Expand Down

0 comments on commit 7a2d935

Please sign in to comment.