Skip to content

Commit

Permalink
resource allocation (#3306)
Browse files Browse the repository at this point in the history
* resource allocation

* reset the original allocation for the jobs

* adding a couple of missing commands in ProcessingJob.shape
  • Loading branch information
antgonza committed Jul 13, 2023
1 parent 027613f commit a0ed54d
Show file tree
Hide file tree
Showing 2 changed files with 141 additions and 23 deletions.
96 changes: 78 additions & 18 deletions qiita_db/processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,10 @@
import networkx as nx
import qiita_db as qdb
import pandas as pd
from numpy import log as nlog # noqa

from collections import defaultdict, Iterable
from datetime import datetime
from datetime import datetime, timedelta
from itertools import chain
from json import dumps, loads
from multiprocessing import Process, Queue, Event
Expand Down Expand Up @@ -460,7 +461,21 @@ def get_resource_allocation_info(self):
parts = []
error_msg = ('Obvious incorrect allocation. Please '
'contact qiita.help@gmail.com')
for part in allocation.split(' '):
for part in allocation.split('--'):
param = ''
if part.startswith('time '):
param = 'time '
elif part.startswith('mem '):
param = 'mem '
else:
# if parts is empty, this is the first part so no --
if parts:
parts.append(f'--{part.strip()}')
else:
parts.append(part.strip())
continue

part = part[len(param):]
if ('{samples}' in part or '{columns}' in part or
'{input_size}' in part):
# to make sure that the formula is correct and avoid
Expand All @@ -479,19 +494,29 @@ def get_resource_allocation_info(self):
try:
# if eval has something that can't be processed
# it will raise a NameError
mem = eval(part.format(
value = eval(part.format(
samples=samples, columns=columns,
input_size=input_size))
except NameError:
self._set_error(error_msg)
return 'Not valid'
else:
if mem <= 0:
if value <= 0:
self._set_error(error_msg)
return 'Not valid'
part = naturalsize(mem, gnu=True, format='%.0f')

parts.append(part)
if param == 'time ':
td = timedelta(seconds=value)
if td.days > 0:
days = td.days
td = td - timedelta(days=days)
part = f'{days}-{str(td)}'
else:
part = str(td)
else:
part = naturalsize(
value, gnu=True, format='%.0f')
parts.append(f'--{param}{part}'.strip())

allocation = ' '.join(parts)

Expand Down Expand Up @@ -1837,34 +1862,41 @@ def shape(self):
"""
samples = None
columns = None
prep_info = None
study_id = None
analysis_id = None
artifact = None
input_size = None

parameters = self.parameters.values
QUIDError = qdb.exceptions.QiitaDBUnknownIDError

if self.command.name == 'Validate':
# Validate only has two options to calculate it's size: template (a
# job that has a preparation linked) or analysis (is from an
# analysis). However, 'template' can be present and be None
if 'template' in parameters and parameters['template'] is not None:
try:
pt = qdb.metadata_template.prep_template.PrepTemplate(
parameters['template'])
except qdb.exceptions.QiitaDBUnknownIDError:
PT = qdb.metadata_template.prep_template.PrepTemplate
prep_info = PT(parameters['template'])
except QUIDError:
pass
else:
study_id = pt.study_id
study_id = prep_info.study_id
elif 'analysis' in parameters:
analysis_id = parameters['analysis']
elif self.command.name == 'build_analysis_files':
# build analysis is a special case because the analysis doesn't
# exist yet
sanalysis = qdb.analysis.Analysis(parameters['analysis']).samples
samples = sum([len(sams) for sams in sanalysis.values()])
# only count the biom files
input_size = sum([fp['fp_size'] for aid in sanalysis
for fp in qdb.artifact.Artifact(aid).filepaths])
for fp in qdb.artifact.Artifact(aid).filepaths
if fp['fp_type'] == 'biom'])
columns = self.parameters.values['categories']
if columns is not None:
columns = len(columns)
elif self.command.software.name == 'Qiita':
if 'study' in parameters:
study_id = parameters['study']
Expand All @@ -1877,29 +1909,57 @@ def shape(self):
elif 'artifact' in parameters:
try:
artifact = qdb.artifact.Artifact(parameters['artifact'])
except qdb.exceptions.QiitaDBUnknownIDError:
except QUIDError:
pass
elif self.command.name == 'delete_sample_or_column':
v = self.parameters.values
try:
MT = qdb.metadata_template
if v['obj_class'] == 'SampleTemplate':
obj = MT.sample_template.SampleTemplate(v['obj_id'])
else:
obj = MT.prep_template.PrepTemplate(v['obj_id'])
except QUIDError:
pass
samples = len(obj)
elif self.command.name == 'Sequence Processing Pipeline':
body = self.parameters.values['sample_sheet']['body']
samples = body.count('\r')
stemp = body.count('\n')
if stemp > samples:
samples = stemp
elif self.input_artifacts:
artifact = self.input_artifacts[0]
input_size = sum([fp['fp_size'] for a in self.input_artifacts
for fp in a.filepaths])
if artifact.artifact_type == 'BIOM':
input_size = sum([fp['fp_size'] for a in self.input_artifacts
for fp in a.filepaths
if fp['fp_type'] == 'biom'])
else:
input_size = sum([fp['fp_size'] for a in self.input_artifacts
for fp in a.filepaths])

# if there is an artifact, then we need to get the study_id/analysis_id
if artifact is not None:
if artifact.study is not None:
study_id = artifact.study.id
# only count samples in the prep template
prep_info = artifact.prep_templates[0]
study_id = prep_info.study_id
elif artifact.analysis is not None:
analysis_id = artifact.analysis.id

# now retrieve the sample/columns based on study_id/analysis_id
if study_id is not None:
try:
st = qdb.study.Study(study_id).sample_template
except qdb.exceptions.QiitaDBUnknownIDError:
except QUIDError:
pass
else:
samples = len(st)
columns = len(st.categories)
if prep_info is not None:
samples = len(prep_info)
columns = len(prep_info.categories) + len(st.categories)
else:
samples = len(st)
columns = len(st.categories)
elif analysis_id is not None:
try:
analysis = qdb.analysis.Analysis(analysis_id)
Expand Down
68 changes: 63 additions & 5 deletions qiita_db/test/test_processing_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -556,7 +556,7 @@ def test_complete_success(self):
self.assertEqual(
validator.get_resource_allocation_info(),
'-p qiita -N 1 -n 1 --mem 90gb --time 150:00:00')
self.assertEqual(validator.shape, (27, 31, None))
self.assertEqual(validator.shape, (27, 53, None))
# Test the output artifact is going to be named based on the
# input parameters
self.assertEqual(
Expand Down Expand Up @@ -774,18 +774,69 @@ def test_hide(self):
def test_shape(self):
jids = {
# Split libraries FASTQ
'6d368e16-2242-4cf8-87b4-a5dc40bb890b': (27, 31, 116),
'6d368e16-2242-4cf8-87b4-a5dc40bb890b': (27, 53, 116),
# Pick closed-reference OTUs
'80bf25f3-5f1d-4e10-9369-315e4244f6d5': (27, 31, 0),
'80bf25f3-5f1d-4e10-9369-315e4244f6d5': (27, 53, 0),
# Single Rarefaction / Analysis
'8a7a8461-e8a1-4b4e-a428-1bc2f4d3ebd0': (5, 56, 3770436),
# Split libraries
'bcc7ebcd-39c1-43e4-af2d-822e3589f14d': (27, 31, 116)}
'bcc7ebcd-39c1-43e4-af2d-822e3589f14d': (27, 53, 116)}

for jid, shape in jids.items():
job = qdb.processing_job.ProcessingJob(jid)
self.assertEqual(job.shape, shape)

def test_shape_special_cases(self):
# get any given job/command/allocation and make sure nothing changed
pj = qdb.processing_job.ProcessingJob(
'6d368e16-2242-4cf8-87b4-a5dc40bb890b')
command = pj.command
current_allocation = pj.get_resource_allocation_info()
self.assertEqual(current_allocation,
'-p qiita -N 1 -n 1 --mem 120gb --time 80:00:00')

# now, let's update that job allocation and make sure that things
# work as expected
tests = [
# (resource allocation, specific allocation)
# 1. tests that nlog works
('-p qiita -N 1 -n 1 --mem nlog({samples})*100 --time {columns}',
'-p qiita -N 1 -n 1 --mem 329B --time 0:00:53'),
# 2. days in time works fine
('-p qiita -N 1 -n 1 --mem 10g --time {columns}*10000',
'-p qiita -N 1 -n 1 --mem 10g --time 6-3:13:20'),
('-p qiita -N 1 -n 1 --mem 20g --time {columns}*1631',
'-p qiita -N 1 -n 1 --mem 20g --time 1-0:00:43'),
# 3. conditionals work
('-p qiita -N 1 -n 1 --mem 10g --time {columns}*1631 '
'if {columns}*1631 < 86400 else 86400',
'-p qiita -N 1 -n 1 --mem 10g --time 1-0:00:00'),
('-p qiita -N 1 -n 1 --mem 10g --time {columns}*1631 '
'if {columns}*1631 > 86400 else 86400',
'-p qiita -N 1 -n 1 --mem 10g --time 1-0:00:43'),
# --qos=qiita_prio
('-p qiita -N 1 -n 1 --mem 10g --time 1:00:00 --qos=qiita_prio',
'-p qiita -N 1 -n 1 --mem 10g --time 1:00:00 --qos=qiita_prio'),
# all the combinations
('-p qiita -N 1 -n 1 --mem nlog({samples})*100000 --time '
'{columns}*1631 if {columns}*1631 > 86400 else 86400 '
'--qos=qiita_prio',
'-p qiita -N 1 -n 1 --mem 322K --time 1-0:00:43 '
'--qos=qiita_prio'),
]
for ra, sra in tests:
sql = ("UPDATE qiita.processing_job_resource_allocation "
f"SET allocation = '{ra}'"
f"WHERE name = '{command.name}'")
qdb.sql_connection.perform_as_transaction(sql)
self.assertEqual(sra, pj.get_resource_allocation_info())

# return allocation
sql = ("UPDATE qiita.processing_job_resource_allocation "
f"SET allocation = '{current_allocation}'"
f"WHERE name = '{command.name}'")
qdb.sql_connection.perform_as_transaction(sql)

def test_get_resource_allocation_info(self):
jids = {
# Split libraries FASTQ
Expand Down Expand Up @@ -835,7 +886,7 @@ def _set_allocation(memory):
job_not_changed.get_resource_allocation_info(),
'-p qiita -N 1 -n 5 --mem 120gb --time 130:00:00')
self.assertEqual(job_changed.get_resource_allocation_info(),
'-p qiita --mem 59M')
'-p qiita --mem 80M')

# now something real input_size+(2*1e+9)
# 116 +(2*1e+9) ~ 2000000116
Expand All @@ -846,6 +897,13 @@ def _set_allocation(memory):
self.assertEqual(job_changed.get_resource_allocation_info(),
'-p qiita --mem 2G')

# restore allocation
sql = ("UPDATE qiita.processing_job_resource_allocation "
"SET allocation = '-p qiita -N 1 -n 1 --mem 120gb "
"--time 80:00:00' "
"WHERE name = 'Split libraries FASTQ'")
qdb.sql_connection.perform_as_transaction(sql)

def test_notification_mail_generation(self):
# Almost all processing-jobs in testing are owned by test@foo.bar
# and are of type 'Split libraries FASTQ'.
Expand Down

0 comments on commit a0ed54d

Please sign in to comment.