submit a shell command as a batch job.
See https://github.com/sciserver/SciScript-Python/blob/Feature_Jobs/py3/SciServer/Jobs.py

In [82]:
from SciServer import Jobs,Authentication,Config
import json
import requests
import ipywidgets as widgets
from os import listdir
from os.path import isfile, join
import time
from datetime import datetime

## Parameters to submit the job

In [83]:
def pathForUserVolume(uv):
    return '{0}/{1}/{2}'.format(uv['rootVolumeName'],uv['owner'],uv['name'])

In [84]:
#Your sciserver username and password
#USERNAME='howsercw'
#PASSWORD='------'

# define the required job environment
DOMAIN='Small Jobs Domain'  # change with name of new compute domain
IMAGE='SciServer Essentials'

# define lists of user and data volumes that should be mounted
# Add those that you 
USERVOLUMES=['Storage/howsecw1/persistent','Temporary/howsecw1/scratch']
DATAVOLUMES=['COVID-19']

RESULTSFOLDERPATH = "/home/idies/workspace/Temporary/howsecw1/scratch/guppy-fast"
JOBALIAS = "guppy-fast-cwh"

domains=Jobs.getDockerComputeDomains()
domain=None
image=None
volumes=[]
userVolumes=[]
dataVolumes=[]
for d in domains:
    if d['name'] == DOMAIN:
        domain=d
        for im in d['images']:
            if im['name'] == IMAGE:
                image = im
        for v in d['volumes']:
            if v['name'] in DATAVOLUMES:
                dataVolumes.append({"name":v['name'],'needsWriteAccess':False})
        for uv in d['userVolumes']:
            path=pathForUserVolume(uv)
            if path in USERVOLUMES:
                userVolumes.append({'name':uv['name'],'rootVolumeName':uv['rootVolumeName']
                                    ,'owner':uv['owner'],'needsWriteAccess':True})
        break

## Parameters for the analysis 

Insert any command line arguements needed for scipt.  If more than 1 arguement is needed, you will have to modify the CMD variable below with extra arguements.  My example is providing a path where you want analysis results stored.

Insert your bash script

In [85]:
script='bash -x /home/idies/workspace/Temporary/howsercw/scratch/guppy-fast_small-batch.sh'

Other parameters...

In [86]:
num_callers=1

In [87]:
cpus_per_caller=8

## Files for the analysis

In [88]:
fast5_path='/home/idies/workspace/covid19/sequencing_runs/20200405_0422_GA30000_FAN30842_00fb1614/fast5_pass'

In [89]:
files = [f for f in listdir(fast5_path) if isfile(join(fast5_path, f))]

In [90]:
len(files)

3607

Subset file list into sublists of size n....

In [91]:
def chunks(files, n):
    lst=[]
    sublst=[]
    for i in range(len(files)):
        if i==0:
            sublst.append(files[i])
        elif i%n == 0:
            lst.append(sublst)
            sublst=[]
            sublst.append(files[i])
        else:
            sublst.append(files[i])
    lst.append(sublst)
    return lst

In [92]:
file_sublists=chunks(files,5)

Converting sublist to string...

In [93]:
def listToString(s):  
    
    # initialize an empty string 
    str1 = ""  
    
    # traverse in the string   
    for ele in s:  
        str1 += ele + " "   
    
    # return string   
    return str1  

Create command: merge bash script with parameters and input files for analysis...

In [94]:
commands=[]
for i in range(len(file_sublists)):
    job_id=i
    command = script + " " + str(job_id) + " " + str(num_callers) + " " + str(cpus_per_caller) + " " + listToString(file_sublists[i])
    commands.append(command)


In [95]:
commands[3]

'bash -x /home/idies/workspace/Temporary/howsercw/scratch/guppy-fast_small-batch.sh 3 1 8 FAN30842_d6d3150416e0daf48d962bf781d6861ef13c9666_1003.fast5 FAN30842_d6d3150416e0daf48d962bf781d6861ef13c9666_1026.fast5 FAN30842_d6d3150416e0daf48d962bf781d6861ef13c9666_1004.fast5 FAN30842_d6d3150416e0daf48d962bf781d6861ef13c9666_1027.fast5 FAN30842_d6d3150416e0daf48d962bf781d6861ef13c9666_1005.fast5 '

In [96]:
len(commands)

722

## Run the analysis

### submit All Batches

In [97]:
def chunk_commands(lst, n):
    for i in range(0, len(lst), n):
        yield lst[i:i + n]

In [98]:
def submit_batch(commands):
    jobs=[]
    for command in commands:
        job=Jobs.submitShellCommandJob(shellCommand=command
                                    , dockerComputeDomain = domain
                                    , dockerImageName = IMAGE
                                    , userVolumes = userVolumes, dataVolumes=dataVolumes
                                    , resultsFolderPath = "/home/idies/workspace/Temporary/howsercw/scratch/jobs"
                                    , jobAlias = JOBALIAS)
        jobs.append(job)
        time.sleep(30)
    return jobs
    

In [None]:
#instantiate generator to grab 4 commands at a time in succession from the commands list object
chunk_generator=chunk_commands(commands[177:],4)

#get next chunk of commands
next_chunk=next(chunk_generator)

#run this batch of jobs, report to STDOUT
jobs=submit_batch(next_chunk)
batch_no=1    
print(str(datetime.now()) + " Now Running Batch #" + str(batch_no) + ", Jobs: ", end="\t")
print(*jobs, sep = ", ")

#set batch flag to false
doneFlag=False  


#monitor jobs in each batch
#upon full batch complete, grab and submit next chunk of jobs
#continue until all jobs are complete
while doneFlag==False:
    batchStatus=[]
    for job in jobs:
        batchStatus.append(Jobs.getJobStatus(job)['status'])
    if all(status >= 32 for status in batchStatus):
        print(str(datetime.now()) + " Jobs done!", end="\n")
        try:
            next_chunk = next(chunk_generator)
        except StopIteration:
            doneFlag=True
        if doneFlag==False:
            jobs=submit_batch(next_chunk)
            batch_no= batch_no + 1
            print(str(datetime.now()) + " Now Running Batch #" + str(batch_no) + ", Jobs: ", end="\t")
            print(*jobs, sep = ", ")
        else: 
            print(str(datetime.now()) + " All Batches Complete! ", end="\n")
    else:
        doneFlag=False
        time.sleep(60)

2020-04-24 20:55:55.786721 Now Running Batch #1, Jobs: 	63743, 63744, 63745, 63746


### submit Single

In [242]:
command=commands[14]
job=Jobs.submitShellCommandJob(shellCommand=command
                            , dockerComputeDomain = domain
                            , dockerImageName = IMAGE
                            , userVolumes = userVolumes, dataVolumes=dataVolumes
                            , resultsFolderPath = "/home/idies/workspace/Temporary/howsercw/scratch/jobs"
                            , jobAlias = JOBALIAS)
print(job)

63469


In [186]:
#cancel single job
Jobs.cancelJob(63415)

### submit 1 Batch

In [32]:
#submit batch jobs
jobs=[]
for command in commands[41:46]:
    job=Jobs.submitShellCommandJob(shellCommand=command
                                , dockerComputeDomain = domain
                                , dockerImageName = IMAGE
                                , userVolumes = userVolumes, dataVolumes=dataVolumes
                                , resultsFolderPath = "/home/idies/workspace/Temporary/howsercw/scratch/jobs"
                                , jobAlias = JOBALIAS)
    jobs.append(job)
    time.sleep(30)
    #print(job)

In [217]:
#cancel batch jobs
#for job in jobs:
#    Jobs.cancelJob(job)