In [41]:
import boto3, subprocess
import datetime
import time
import pandas as pd
import numpy as np
import fabric
from fabric.api import run, hide

In [42]:
class benchmark:
    def __init__(self, experiments, instance_type, image_id="ami-a3b3d4b5", key_name='dpcld_test1'):
        self.experiments = experiments
        self.instance_type = instance_type
        self.image_id = image_id
        self.key_name = key_name
        self.run_date = datetime.date.strftime(datetime.date.today(), '%Y%m%d')
        self.client = boto3.client('ec2')
        self.ec2 = boto3.resource('ec2')
        self.p_ids = []      
        #self.instances = self.setUpInstances()
        
        #self.configureInstances()
        
    def setUpInstances(self):
        num_inst = len(experiments)
        
        response = self.client.run_instances(ImageId=self.image_id,
                                InstanceType=self.instance_type,
                                MinCount=num_inst,
                                MaxCount=num_inst,
                                KeyName=self.key_name)

        assert response.get('ResponseMetadata').get('HTTPStatusCode') == 200, "Request ended with an error (HTTPStatusCode != 200)"
        assert len(response.get('Instances')) == num_inst, "Number of instances launched is equal to specified"
        
        instance_ids = []
        instance_ids.extend([_.get('InstanceId') for _ in response.get('Instances')])
              
        waiter = self.client.get_waiter('instance_status_ok')
        waiter.wait(InstanceIds=instance_ids)
        
        print('Set up instances: {}'.format(' '.join(instance_ids)))
        
        self.instances = [self.ec2.Instance(_) for _ in instance_ids]
        
        #return [self.ec2.Instance(_) for _ in instance_ids]
    
    def configureInstances(self):
        [self.configureInstance(i, e) for i, e in zip(self.instances, self.experiments)]
    
    def configureInstance(self, instance, experiment):
        instance.create_tags(Tags=[{'Key':'Name', 'Value':'{}_{} experiment'.format(*experiment)}])
        fabric.api.env.host_string = 'ec2-user@{}'.format(instance.public_dns_name)
        fabric.api.env.key_filename = '~/.ssh/{}.pem'.format(self.key_name)
        fabric.api.env.disable_known_hosts
        fabric.api.hide('output')
        fabric.operations.put('~/Studia/mgr/deepcloud/tests/test_{}_{}.py'.format(*experiment), '.')
        run('sudo pip3 install git+git://github.com/fchollet/keras.git --upgrade')
        run('sudo pip3 install pydot')
        run('sudo pip3 install graphviz')
        
    def runExperiments(self):
        [self.runExperiment(i, e) for i, e in zip(self.instances, self.experiments)]
        
    def runExperiment(self, instance, experiment):
        fabric.api.env.host_string = 'ec2-user@{}'.format(instance.public_dns_name)
        run('nohup python3 -u test_{0}_{1}.py \
        --dataset {0} --architecture {1} --run_date {2} \
        >{2}_{0}_{1}.log 2>{2}_{0}_{1}.err < /dev/null &'.format(*experiment, self.run_date), pty=False)
        self.p_ids.append((instance.id, run('pgrep -f "python3 -u test"')))

    def getExperimentLogs(self):
        running_instances = self.instances
        
        while running_instances:
            for instance in self.instances:
                fabric.api.env.host_string = 'ec2-user@{}'.format(instance.public_dns_name)
                
                status = benchmark_td1.client.describe_instance_status(InstanceIds=[instance.id])\
                .get('InstanceStatuses')[0]\
                .get('InstanceState')\
                .get('Name')

                if status != 'running':
                    print('Instance {} not running'.format(instance.id))
                    running_instances.remove(instance)
                    continue
                    
                self.rSync(instance.public_dns_name)
                
                try:
                    run('pgrep -f "python3 -u test"')
                except:
                    print('Process on instance {} not found'.format(instance.id))
                    running_instances.remove(instance)
                    continue
                    
                time.sleep(1)
                
            print('{}: synchronized logs from all instances'.format(datetime.datetime.now().isoformat()))
        else:
            print('All instances are down or finished their tasks')

    def rSync(self, public_dns):
        fabric.operations.local('rsync -aL -e "ssh -i ~/.ssh/{}.pem -o StrictHostKeyChecking=no" \
        --include="/home/ec2-user/*" --include="*.log" --include="*.err" \
        --include="*.out" --include="*.png" --exclude="*" \
        ec2-user@{}:/home/ec2-user/ \
        ~/Studia/mgr/deepcloud/tests/logs/'\
                                .format(self.key_name, public_dns))
        


In [43]:
experiments = [("mnist","kerasdef")]

In [44]:
benchmark_td1 = benchmark(experiments=experiments,instance_type='t2.micro')

In [45]:
benchmark_td1.setUpInstances()

Set up instances: i-07b2550df46709938


In [46]:
benchmark_td1.configureInstances()

[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] put: /home/roni/Studia/mgr/deepcloud/tests/test_mnist_kerasdef.py -> ./test_mnist_kerasdef.py
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] run: sudo pip3 install git+git://github.com/fchollet/keras.git --upgrade
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out: Collecting git+git://github.com/fchollet/keras.git
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out:   Cloning git://github.com/fchollet/keras.git to /tmp/pip-yeadlr6g-build
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out: Collecting theano (from Keras==2.0.4)
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out:   Downloading Theano-0.9.0.tar.gz (3.1MB)
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out: [?25l
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out: [K    0% |                                | 10kB 19.1MB/s eta 0:00:01
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out: [K    0% |▏                               | 20k

In [47]:
benchmark_td1.runExperiments()

[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] run: nohup python3 -u test_mnist_kerasdef.py         --dataset mnist --architecture kerasdef --run_date 20170530         >20170530_mnist_kerasdef.log 2>20170530_mnist_kerasdef.err < /dev/null &
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] run: pgrep -f "python3 -u test"
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out: 2907
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out: 



In [49]:
def rSync(public_dns):
    fabric.operations.local('rsync -aL -e "ssh -i ~/.ssh/{}.pem -o StrictHostKeyChecking=no" \
    --include="/home/ec2-user/*" --include="*.log" --include="*.err" \
    --include="*.out" --include="*.png" --exclude="*" \
    ec2-user@{}:/home/ec2-user/ \
    ~/Studia/mgr/deepcloud/tests/logs/'\
                            .format('dpcld_test1',public_dns))

In [None]:
benchmark_td1.getExperimentLogs()

[localhost] local: rsync -aL -e "ssh -i ~/.ssh/dpcld_test1.pem -o StrictHostKeyChecking=no"     --include="/home/ec2-user/*" --include="*.log" --include="*.err"     --include="*.out" --include="*.png" --exclude="*"     ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com:/home/ec2-user/     ~/Studia/mgr/deepcloud/tests/logs/
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] run: pgrep -f "python3 -u test"
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out: 2907
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] out: 

2017-05-30T14:35:59.890961: synchronized logs from all instances
[localhost] local: rsync -aL -e "ssh -i ~/.ssh/dpcld_test1.pem -o StrictHostKeyChecking=no"     --include="/home/ec2-user/*" --include="*.log" --include="*.err"     --include="*.out" --include="*.png" --exclude="*"     ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com:/home/ec2-user/     ~/Studia/mgr/deepcloud/tests/logs/
[ec2-user@ec2-54-173-26-62.compute-1.amazonaws.com] run: pgrep -f "python3 -u test

In [40]:
term_responses = [_.terminate() for _ in benchmark_td1.instances]

In [None]:
for i in instance_types:
    print(i)

In [None]:
image_id="ami-a3b3d4b5"
key_name='dpcld_test1'

In [None]:
client = boto3.client('ec2')
ec2 = boto3.resource('ec2')

In [None]:
num_inst = len(experiments)

In [None]:
# rynek spot
'''
spotprices = client.describe_spot_price_history(InstanceTypes=['c4.2xlarge','p2.xlarge'],
                                                StartTime=datetime.datetime.today()-datetime.timedelta(0.5),
                                                EndTime=datetime.datetime.today()
                                               )['SpotPriceHistory']
prices = pd.DataFrame.from_records([(x.get('Timestamp'),x.get('InstanceType'),x.get('SpotPrice')) for x in spotprices])
prices[2] = prices[2].astype(float)
prices[4] = prices[0].apply(lambda x: x.date())
prices.pivot_table(index=4, columns=1, values=2, aggfunc='median')
%matplotlib inline
prices.pivot_table(index=0, columns=1, values=2, aggfunc='mean').plot()
spot_specs = {"ImageID":"ami-4191b524",
             "InstanceType":"t2.micro",
             "Placement":{"AvailabilityZone":"us-east-2"}}
#spot
response = client.request_spot_instances(SpotPrice = ,
                                       LaunchSpecification = )
'''

In [None]:
len(response.get('Instances'))

In [None]:
response.get('HTTPStatusCode')

In [None]:
#key pair
response = client.run_instances(ImageId=image_id,
                                InstanceType=instance_type,
                                MinCount=num_inst,
                                MaxCount=num_inst,
                                KeyName=key_name
                               )
#_instanceId=response.get('Instances')[0].get('InstanceId')

In [None]:
instance_ids = []
instance_ids.extend([_.get('InstanceId') for _ in response.get('Instances')])

In [None]:
instances = [ec2.Instance(_) for _ in instance_ids]

In [None]:
waiter = client.get_waiter('instance_status_ok')
waiter.wait(InstanceIds=instance_ids)

In [None]:
p_ids = []
def configureInstance(instance, p_ids=p_ids):
    env.host_string = 'ec2-user@{}'.format(instance.public_dns_name)
    env.key_filename = '~/.ssh/dpcld_test1.pem'.format(key_name)
    env.disable_known_hosts
    fabric.operations.put('~/Studia/mgr/deepcloud/tests/test_mnist.py', '.')
    run('sudo pip3 install git+git://github.com/fchollet/keras.git --upgrade')
    run('sudo pip3 install pydot')
    run('sudo pip3 install graphviz')
    run('nohup python3 -u test_mnist.py >test_mnist.log 2>test_mnist.err < /dev/null &', pty=False)
    p_ids.append({instance.id:run('pgrep -f "python3 -u test"')})

In [None]:
configureInstance(instances[0])

In [None]:
term_responses = [_.terminate() for _ in instances]

In [None]:
term_responses