Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions python/lsst/ctrl/execute/allocationConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,11 @@ class AllocatedPlatformConfig(pexConfig.Config):
dtype=str,
default=None,
)
collector = pexConfig.Field(
doc="host where HTCondor collector service is running",
dtype=str,
default=None,
)
loginHostName = pexConfig.Field(doc="the host to login and copy files to", dtype=str, default=None)
utilityPath = pexConfig.Field(
doc="the directory containing the scheduler commands", dtype=str, default=None
Expand Down
43 changes: 39 additions & 4 deletions python/lsst/ctrl/execute/allocator.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,29 @@ def __init__(
self.defaults["USER_SCRATCH"] = user_scratch
self.commandLineDefaults = {}
self.commandLineDefaults["NODE_COUNT"] = self.opts.nodeCount
self.commandLineDefaults["COLLECTOR"] = self.opts.collector
if self.configuration.platform.collector:
self.commandLineDefaults["COLLECTOR"] = self.configuration.platform.collector
if self.opts.collector:
self.commandLineDefaults["COLLECTOR"] = self.opts.collector
self.commandLineDefaults["CPORT"] = self.opts.collectorport
if self.configuration.platform.peakcpus:
self.commandLineDefaults["PEAKCPUS"] = self.configuration.platform.peakcpus
else:
self.commandLineDefaults["PEAKCPUS"] = 256
if self.configuration.platform.peakmemory:
self.commandLineDefaults["PEAKMEMORY"] = self.configuration.platform.peakmemory
else:
self.commandLineDefaults["PEAKMEMORY"] = 1000000
if self.opts.exclusive:
self.commandLineDefaults["CPUS"] = self.configuration.platform.peakcpus
else:
self.commandLineDefaults["CPUS"] = self.opts.cpus
if self.opts.cpus < self.configuration.platform.peakcpus:
self.commandLineDefaults["CPUS"] = self.opts.cpus
else:
self.commandLineDefaults["CPUS"] = self.configuration.platform.peakcpus
self.commandLineDefaults["WALL_CLOCK"] = self.opts.maximumWallClock
self.commandLineDefaults["ACCOUNT"] = self.opts.account
self.commandLineDefaults["MEMPERCORE"] = 4096
self.commandLineDefaults["MEMPERCORE"] = self.opts.mempercore
self.commandLineDefaults["ALLOWEDAUTO"] = 500
self.commandLineDefaults["AUTOCPUS"] = 16
self.commandLineDefaults["MINAUTOCPUS"] = 15
Expand Down Expand Up @@ -223,7 +237,7 @@ def createSubmitFile(self, inputFile):
if not os.path.exists(self.configDir):
os.makedirs(self.configDir)
outfile = self.createFile(inputFile, self.submitFileName)
_LOG.debug("Wrote new Slurm submit file to %s", outfile)
_LOG.debug("Wrote new submit file to %s", outfile)
return outfile

def createCondorConfigFile(self, input):
Expand Down Expand Up @@ -350,6 +364,21 @@ def getCPUs(self):
"""
return self.getParameter("CPUS")

def getPeakcpus(self):
"""Accessor for PEAKCPUS
@return the value of PEAKCPUS

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LSST Developer Guide describes the format for docstrings (mostly Numpydoc style)

"""
return self.getParameter("PEAKCPUS")

def getPeakmemory(self):
"""Accessor for PEAKMEMORY
@return the value of PEAKMEMORY
"""
peakmemory = self.getParameter("PEAKMEMORY")

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does getParameter guarantee a value (the default in the pex config is None)

if self.opts.queue == "torino":

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This conversion seems like it should be in the platform specific package instead of the generic allocator package. Is there a config way to have peak information per queue?

peakmemory = int(3 * peakmemory / 2)
return peakmemory

def getAutoCPUs(self):
"""Size of standard glideins for allocateNodes auto
@return the value of autoCPUs
Expand All @@ -366,6 +395,12 @@ def getMinAutoCPUs(self):
"""
return self.getParameter("MINAUTOCPUS")

def getCollector(self):
"""Accessor for COLLECTOR
@return the value of COLLECTOR
"""
return self.getParameter("COLLECTOR")

def getWallClock(self):
"""Accessor for WALL_CLOCK
@return the value of WALL_CLOCK
Expand Down
11 changes: 10 additions & 1 deletion python/lsst/ctrl/execute/allocatorParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,15 @@ def parseArgs(self, basename) -> argparse.Namespace:
type=int,
required=False,
)
parser.add_argument(
"--mempercore",
action="store",
default=4096,
dest="mempercore",
help="Memory per core in MB to be scheduled by default",
type=int,
required=False,
)
parser.add_argument(
"-s",
"--qos",
Expand All @@ -147,7 +156,7 @@ def parseArgs(self, basename) -> argparse.Namespace:
"--queue",
action="store",
dest="queue",
default="roma,milano",
default="milano",
help="queue / partition name",
)
parser.add_argument(
Expand Down
2 changes: 2 additions & 0 deletions python/lsst/ctrl/execute/condorConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class PlatformConfig(pexConfig.Config):
nodeSetRequired = pexConfig.Field(doc="is the nodeset required", dtype=bool, default=False)
scheduler = pexConfig.Field(doc="scheduler type", dtype=str, default=None)
peakcpus = pexConfig.Field(doc="peakcpus", dtype=int, default=None)
peakmemory = pexConfig.Field(doc="peakmemory", dtype=int, default=None)
collector = pexConfig.Field(doc="collector", dtype=str, default=None)
manager = pexConfig.Field(doc="workflow manager", dtype=str, default=None)
setup_using = pexConfig.Field(doc="environment setup type", dtype=str, default=None)
manager_software_home = pexConfig.Field(
Expand Down
13 changes: 13 additions & 0 deletions python/lsst/ctrl/execute/slurmPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,10 @@ def submit(self):
cpus = self.getCPUs()
memoryPerCore = self.getMemoryPerCore()
totalMemory = cpus * memoryPerCore
peakMemory = self.getPeakmemory()
if totalMemory > peakMemory:
totalMemory = peakMemory
_LOG.debug("Direct: Setting job memory to peak memory on platform.")

# run the sbatch command
template = Template(self.getLocalScratchDirectory())
Expand Down Expand Up @@ -324,6 +328,11 @@ def glideinsFromJobPressure(self):
autoCPUs = cpus
memoryPerCore = self.getMemoryPerCore()
memoryLimit = autoCPUs * memoryPerCore
peakMemory = self.getPeakmemory()
if memoryLimit > peakMemory:
memoryLimit = peakMemory
_LOG.debug("Auto: Setting job memory to peak memory on platform.")

auser = self.getUserName()
anodeset = self.getNodeset()

Expand Down Expand Up @@ -400,6 +409,10 @@ def glideinsFromJobPressure(self):
_LOG.debug("\n%d.%d", ajob["ClusterId"], ajob["ProcId"])
_LOG.debug("%s", ajob)
thisMemory = ajob["RequestMemoryEval"]
peakMemory = self.getPeakmemory()
if thisMemory > peakMemory:
thisMemory = peakMemory
_LOG.debug("Auto large: Setting job memory to peak memory on platform.")
useCores = ajob["RequestCpus"]
clusterid = ajob["ClusterId"]
procid = ajob["ProcId"]
Expand Down
6 changes: 6 additions & 0 deletions tests/test_allocatorParser.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ def test1(self):
"sdfmilan003",
"--nodelist",
"sdfmilan004",
"--mempercore",
"6144",
"--collector",
"sdfiana039",
"-q",
"normal",
"-O",
Expand All @@ -63,6 +67,8 @@ def test1(self):
self.assertEqual(args.maximumWallClock, "00:30:00")
self.assertEqual(args.exclude, "sdfmilan003")
self.assertEqual(args.nodelist, "sdfmilan004")
self.assertEqual(args.mempercore, 6144)
self.assertEqual(args.collector, "sdfiana039")
self.assertEqual(args.queue, "normal")
self.assertEqual(args.outputLog, "outlog")
self.assertEqual(args.errorLog, "errlog")
Expand Down
2 changes: 2 additions & 0 deletions tests/test_condorConfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ def test5(self):
self.assertEqual(self.config.platform.scheduler, "slurm")
self.assertEqual(self.config.platform.setup_using, "getenv")
self.assertEqual(self.config.platform.manager, "dagman")
self.assertEqual(self.config.platform.peakcpus, 120)
self.assertEqual(self.config.platform.peakmemory, 491520)

def test6(self):
path = os.path.join("tests", "testfiles", "config_pegasus.py")
Expand Down
4 changes: 4 additions & 0 deletions tests/test_slurmPlugin.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,12 +88,16 @@ def test1(self):
scheduler: Allocator = schedulerClass(platform, args, configuration, condor_info_file)
self.assertTrue(scheduler)

peakcpus = scheduler.getPeakcpus()
peakmemory = scheduler.getPeakmemory()
autocpus = scheduler.getAutoCPUs()
minautocpus = scheduler.getMinAutoCPUs()
cpus = scheduler.getCPUs()
nodes = scheduler.getNodes()
nodeset = scheduler.getNodeset()
wallclock = scheduler.getWallClock()
self.assertEqual(peakcpus, 120)
self.assertEqual(peakmemory, 737280)
self.assertEqual(autocpus, 16)
self.assertEqual(minautocpus, 15)
self.assertEqual(cpus, 12)
Expand Down
2 changes: 2 additions & 0 deletions tests/testfiles/config_condor_slurm.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# flake8: noqa
config.platform.peakcpus = 120
config.platform.peakmemory = 491520
config.platform.defaultRoot = "/usr"
config.platform.localScratch = "./tests/condor_scratch_slurm"
config.platform.dataDirectory = "/tmp/data_slurm"
Expand Down
1 change: 1 addition & 0 deletions tests/testfiles/config_execconfig.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,3 +4,4 @@
config.platform.fileSystemDomain = "slac.stanford.edu"
config.platform.scheduler = "slurm"
config.platform.peakcpus = 120
config.platform.peakmemory = 737280
Loading