Skip to content

Commit

Permalink
Rosetta wrapper submits and waits properly. Now need to check for exp…
Browse files Browse the repository at this point in the history
…ected number of outputs
  • Loading branch information
mcianfrocco committed Apr 20, 2017
1 parent a5dc1f6 commit 3864cf2
Show file tree
Hide file tree
Showing 3 changed files with 160 additions and 57 deletions.
2 changes: 1 addition & 1 deletion rosetta/rosetta_prepare_input_files.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ def setupParserOptions():
help="EM map in .mrc format")
parser.add_option("--fasta",dest="fasta",type="string",metavar="FILE",
help=".fasta file for the structure")
parser.add_option("--num",dest="num",type="int",metavar="INTEGER",default=5,
parser.add_option("--num",dest="num",type="int",metavar="INTEGER",default=1,
help="number of structures per CPU (Default = 1)")
parser.add_option("-r", action="store_true",dest="relax",default=False,
help="run rosetta relax instead of CM")
Expand Down
75 changes: 19 additions & 56 deletions rosetta/rosetta_refinement_on_aws.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#!/usr/bin/env python

import pickle
import datetime
import shutil
import optparse
Expand Down Expand Up @@ -123,6 +123,8 @@ def checkConflicts(params,outdir):
#================================================
if __name__ == "__main__":

print '\nStarting Rosetta model refinement in the cloud ...\n'

##Hard coded values
sizeneeded=100
instance='m4.xlarge'
Expand All @@ -135,7 +137,6 @@ def checkConflicts(params,outdir):
numToRequest=numthreads*((numToRequest % numthreads)+1)

numInstances=(numToRequest/numthreads)
print numInstances
if len(params['outdir']) == 0:
startTime=datetime.datetime.utcnow()
params['outdir']=startTime.strftime('%Y-%m-%d-%H%M%S')
Expand Down Expand Up @@ -183,7 +184,6 @@ def checkConflicts(params,outdir):
subprocess.Popen(cmd,shell=True).wait()
###Get volID from logfile
volID=linecache.getline('%s/awsebs_%i.log'%(params['outdir'],counter),5).split('ID: ')[-1].split()[0]
print volID
volIDlist.append(volID)
time.sleep(10)
counter=counter+1
Expand Down Expand Up @@ -253,62 +253,25 @@ def checkConflicts(params,outdir):
subprocess.Popen(cmd,shell=True).wait()

#Run job
cmd='ssh -o "StrictHostKeyChecking no" -q -n -f -i %s ubuntu@%s "export PATH=/usr/bin/$PATH && export PATH=/home/Rosetta/2017_08/main/source/:$PATH && ./run_final.sh > /home/ubuntu/rosetta.out 2> /home/ubuntu/rosetta.err < /dev/null &"' %(keypair,instanceIPlist[counter])
cmd='ssh -o "StrictHostKeyChecking no" -q -n -f -i %s ubuntu@%s "export PATH=/usr/bin/$PATH && export PATH=/home/Rosetta/2017_08/main/source/:$PATH && /usr/local/bin/parallel -j%i ./run_final.sh {} ::: {1..%i}> /home/ubuntu/rosetta.out 2> /home/ubuntu/rosetta.err < /dev/null &"' %(keypair,instanceIPlist[counter],numthreads,numthreads)
subprocess.Popen(cmd,shell=True)

counter=counter+1

#Start waiting script: Should be in teh background so users can log out
print 'Rosetta job submitted on AWS! Monitor output file: %s/rosetta_output.out to check status of job\n' %(params['outdir'])

sys.exit()
#Start monitoring status
startTime=datetime.datetime.utcnow()
time.sleep(60)

isdone=0
while isdone == 0:
#If cloudwatch has load < 5% BUT there aren't finished PDBs yet, job crashed. Terminate.
currentTime=datetime.datetime.utcnow()
if os.path.exists('%s/cloudwatchtmp.log'%(params['outdir'])):
os.remove('%s/cloudwatchtmp.log'%(params['outdir']))
cmd='aws cloudwatch get-metric-statistics --metric-name CPUUtilization --start-time %s --period 300 --namespace AWS/EC2 --statistics Average --dimensions Name=InstanceId,Value=%s --end-time %s > %s/cloudwatchtmp.log' %(startTime.strftime('%Y-%m-%dT%H:%M:00'),instanceID,currentTime.strftime('%Y-%m-%dT%H:%M:00'),params['outdir'])
subprocess.Popen(cmd,shell=True).wait()
o1=open('%s/cloudwatchtmp.log'%(params['outdir']),'r')
for line in o1:
if len(line.split())>0:
if line.split()[0] == '"Average":':
load=float(line.split()[1][:-1])
if load < loadMin:
print 'finished'
#Download all pdb files, take note if you didn't download enough files consdiering number of models requested
print 'Rosetta job submitted on AWS! Monitor output file: %s/rosetta.out to check status of job\n' %(params['outdir'])

cmd='touch %s/rosetta.out' %(params['outdir'])
subprocess.Popen(cmd,shell=True).wait()

isdone=1
o1.close()
os.remove('%s/cloudwatchtmp.log'%(params['outdir']))
time.sleep(60)

print 'Rosetta refinement finished. Shutting down instance.'

cmd='aws ec2 terminate-instances --instance-ids %s > %s/tmp4949585940.txt' %(instanceID,params['outdir'])
subprocess.Popen(cmd,shell=True).wait()

isdone=0
while isdone == 0:
status=subprocess.Popen('aws ec2 describe-instances --instance-ids %s --query "Reservations[*].Instances[*].{State:State}" | grep Name'%(instanceID),shell=True, stdout=subprocess.PIPE).stdout.read().strip().split()[-1].split('"')[1]
if status == 'terminated':
isdone=1
time.sleep(10)

cmd='aws ec2 delete-volume --volume-id %s > %s/tmp4949585940.txt' %(volID,params['outdir'])
subprocess.Popen(cmd,shell=True).wait()

now=datetime.datetime.now()
finday=now.day
finhr=now.hour
finmin=now.minute
if finday != startday:
finhr=finhr+24
deltaHr=finhr-starthr
if finmin > startmin:
deltaHr=deltaHr+1
#Write instance, volume, and IP lists
with open('%s/instanceIPlist.txt' %(params['outdir']),'wb') as fp:
pickle.dump(instanceIPlist,fp)
with open('%s/instanceIDlist.txt' %(params['outdir']),'wb') as fp:
pickle.dump(instanceIDlist,fp)
with open('%s/volIDlist.txt' %(params['outdir']),'wb') as fp:
pickle.dump(volIDlist,fp)

cmd='%s/rosetta_waiting.py --instanceIPlist=%s/instanceIPlist.txt --instanceIDlist=%s/instanceIDlist.txt --volIDlist=%s/volIDlist.txt --numModels=%i --numPerInstance=%i --outdir=%s&' %(rosettadir,params['outdir'],params['outdir'],params['outdir'],numToRequest,numthreads,params['outdir'])
subprocess.Popen(cmd,shell=True)

140 changes: 140 additions & 0 deletions rosetta/rosetta_waiting.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,140 @@
#!/usr/bin/env python

import pickle
import datetime
import shutil
import optparse
from sys import *
import os,sys,re
from optparse import OptionParser
import glob
import subprocess
from os import system
import linecache
import time
import string

#=========================
def setupParserOptions():
parser = optparse.OptionParser()
parser.set_usage("This program will wait for a rosetta refinement to finish on AWS.")
parser.add_option("--instanceIDlist",dest="instanceID",type="string",metavar="FILE",
help="Instance ID list (pickle dump)")
parser.add_option("--instanceIPlist",dest="instanceIP",type="string",metavar="FILE",
help="Instance IP list (pickle dump)")
parser.add_option("--volIDlist",dest="volID",type="string",metavar="FILE",
help="Volume ID list (pickle dump)")
parser.add_option("--numModels",dest="numModels",type="int",metavar="INT",
help="Total number of models in rosetta run")
parser.add_option("--numPerInstance",dest="numPerInstance",type="int",metavar="Number",
help="Number of models per instance requested")
parser.add_option("--outdir",dest='outdir',type="string",metavar='FILE',
help='Output directory')
options,args = parser.parse_args()

if len(args) > 0:
parser.error("Unknown commandline options: " +str(args))
if len(sys.argv) <= 2:
parser.print_help()
sys.exit()
params={}
for i in parser.option_list:
if isinstance(i.dest,str):
params[i.dest] = getattr(options,i.dest)
return params

#================================================
if __name__ == "__main__":

params=setupParserOptions()
startTime=datetime.datetime.utcnow()
startday=now.day
starthr=now.hour
startmin=now.minute

keypair=subprocess.Popen('echo $KEYPAIR_PATH',shell=True, stdout=subprocess.PIPE).stdout.read().strip()
time.sleep(60)
loadMin=5
#Read in pickle files
with open (params['instanceIP'], 'rb') as fp:
instanceIPlist = pickle.load(fp)
with open (params['volID'], 'rb') as fp:
volIDlist = pickle.load(fp)
with open (params['instanceID'], 'rb') as fp:
instanceIDlist = pickle.load(fp)

l='%s/rosetta.out' %(params['outdir'])

cmd="echo 'Starting to check status of Rosetta refinement at %sUTC' >> %s" %(startTime.strftime('%Y-%m-%dT%H:%M:00'),l)
subprocess.Popen(cmd,shell=True).wait()

os.makedirs('%s/output' %(params['outdir']))

counter=0
instanceCounter=1
while counter < len(instanceIPlist):
isdone=0
while isdone == 0:
#If cloudwatch has load < 5% BUT there aren't finished PDBs yet, job crashed. Terminate.
currentTime=datetime.datetime.utcnow()
if os.path.exists('%s/cloudwatchtmp.log'%(params['outdir'])):
os.remove('%s/cloudwatchtmp.log'%(params['outdir']))
cmd='aws cloudwatch get-metric-statistics --metric-name CPUUtilization --start-time %s --period 300 --namespace AWS/EC2 --statistics Average --dimensions Name=InstanceId,Value=%s --end-time %s > %s/cloudwatchtmp.log' %(startTime.strftime('%Y-%m-%dT%H:%M:00'),instanceIDlist[counter],currentTime.strftime('%Y-%m-%dT%H:%M:00'),params['outdir'])
subprocess.Popen(cmd,shell=True).wait()
o1=open('%s/cloudwatchtmp.log'%(params['outdir']),'r')
for line in o1:
if len(line.split())>0:
if line.split()[0] == '"Average":':
load=float(line.split()[1][:-1])
cmd="echo 'Current load on instance %i is %f at %sUTC' >> %s" %(counter+1,load,currentTime.strftime('%Y-%m-%dT%H:%M:00'),l)
subprocess.Popen(cmd,shell=True).wait()
if load < loadMin:
#Download all pdb files, take note if you didn't download enough files consdiering number of models requested
isdone=1
cmd='echo "--------> Instance %i finished at %sUTC" >> %s' %(counter+1,currentTime.strftime('%Y-%m-%dT%H:%M:00'),l)
subprocess.Popen(cmd,shell=True).wait()
currCounter=1
while currCounter <= params['numPerInstance']:
cmd='scp -q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s ubuntu@%s:~/S_%i_0001.pdb %s/output/S_%i_0001.pdb' %(keypair,instanceIPlist[counter],currCounter,params['outdir'],instanceCounter)
subprocess.Popen(cmd,shell=True).wait()

cmd='scp -q -o UserKnownHostsFile=/dev/null -o StrictHostKeyChecking=no -i %s ubuntu@%s:~/score_%i.sc %s/output/S_%i_0001_score.sc' %(keypair,instanceIPlist[counter],currCounter,params['outdir'],instanceCounter)
subprocess.Popen(cmd,shell=True).wait()
instanceCounter=instanceCounter+1
currCounter=currCounter+1

cmd='aws ec2 terminate-instances --instance-ids %s > %s/tmp4949585940.txt' %(instanceIDlist[counter],params['outdir'])
subprocess.Popen(cmd,shell=True).wait()

o1.close()
os.remove('%s/cloudwatchtmp.log'%(params['outdir']))
time.sleep(60)
counter=counter+1

cmd='echo "Rosetta refinement finished. Shutting down instances %s" >> %s' %(currentTime.strftime('%Y-%m-%dT%H:%M:00'),l)
subprocess.Popen(cmd,shell=True).wait()

counter=0
while counter < len(instanceIPlist):
isdone=0
while isdone == 0:
status=subprocess.Popen('aws ec2 describe-instances --instance-ids %s --query "Reservations[*].Instances[*].{State:State}" | grep Name'%(instanceIDlist[counter]),shell=True, stdout=subprocess.PIPE).stdout.read().strip().split()[-1].split('"')[1]
if status == 'terminated':
isdone=1
time.sleep(10)

cmd='aws ec2 delete-volume --volume-id %s > %s/tmp4949585940.txt' %(volIDlist[counter],params['outdir'])
subprocess.Popen(cmd,shell=True).wait()
counter=counter+1

#now=datetime.datetime.now()
#finday=now.day
#finhr=now.hour
#finmin=now.minute
#if finday != startday:
# finhr=finhr+24
#deltaHr=finhr-starthr
#if finmin > startmin:
# deltaHr=deltaHr+1


0 comments on commit 3864cf2

Please sign in to comment.