# Apache Mesos HTTP API

This notebook will show how to connect to a running Master/Slave and launch commands via a simple `CommandInfo` protocol buffer.

The main goal of this notebook is to show how to interact with the new [Mesos HTTP API](https://github.com/apache/mesos/blob/master/docs/scheduler_http_api.md) in Python.

## Prerequisites

- you have RTFM (link above);
- you know how to build/run Apache Mesos locally (see the [Starting Guide](http://mesos.apache.org/gettingstarted/))
- you are familiar with Python [Requests](http://www.python-requests.org/en/latest/) framework.

## Starting Mesos

Nothing unusal here, start ZooKeeper (`zkServer.sh start`) then start Master/Slave:
```
cd /path/to/mesos/build
make -j 4 V=0

# Optional, but recommended:
make -j 4 V=0 check

# If all tests pass:
./bin/mesos-master.sh --zk=zk://localhost:2181/mesos/test --work_dir=/tmp/mesos-24 --quorum=1 --port=5051           

.... lots of logging here

# In another shell:
./bin/mesos-slave.sh --master=zk://localhost:2181/mesos/test --work_dir=/tmp/slave --port=5055
```

Then navigate to the [Mesos Web UI](http://localhost:5051) and make sure all it's working just fine.

If the above doesn't work, it's unlikely that anything in the following ever will.

## Python Virtualenv

I always strongly recommend that folks use virtual environments when messing around with Python and installing libraries - feel free to skip this, but if you end up borking your system... **you have been warned**.

See the `requirements.txt` in this repo.

My actual `dev` virtualenv has a lot more stuff, but the above should be sufficient to get you going (and probably need strictly even less than that - YMMV); most of the dependencies above are for IPython Notebooks.

```
mkvirtualenv dev
pip install -r requirements.txt
ipython notebook
```
Then load this file in your Notebook.

Happy hacking!


# Common Imports & Useful globals

In [57]:
# We need to use the magic function %px on every engine when we run in Cluster mode.
# See below the note about executing multi-threaded code.
from __future__ import print_function

import json
import os
import pprint
import requests
import sh
from threading import Thread
from time import sleep


SUBSCRIBE_BODY = {
    "type": "SUBSCRIBE",
    "subscribe": {
        "framework_info": {
            "user" :  "marco",
            "name" :  "Example HTTP Framework"
        },
        "force" : True
    }
}

TEARDOWN_BODY = {
    "framework_id": {
        "value" : None
    },
    "type": "TEARDOWN"
}

# Adjust the ports according to how you launched Mesos:
# see --port in the commands in "Prerequisites"
MASTER_URL = 'http://localhost:5051'
SLAVE_URL = 'http://localhost:5055'
API_V1 = '/api/v1/scheduler'
API_URL = '{}/{}'.format(MASTER_URL, API_V1)
CONTENT = 'application/json'

headers = {
    "Content-Type": CONTENT, 
    "Accept": CONTENT, 
    "Connection": "close"
}

pretty = pprint.PrettyPrinter(indent=2)

These are the globals that are used to communicate with the background thread; they are currently **thread-unsafe** and may (or may not - chances of a race are pretty slim here) need to be protected with a `RLock`

In [58]:
# TODO: THIS IS THREAD-UNSAFE
terminate = False
offers = []
framework_id = None

## POST helper method

This uses [Request's streaming API](http://www.python-requests.org/en/latest/user/advanced/#chunk-encoded-requests) for the "chunk-encoded response".

In [59]:
def post(url, body, **kwargs):
    print('Connecting to Master: ' + url)
    r = requests.post(url, headers=headers, data=json.dumps(body), **kwargs)
    
    if r.status_code not in [200, 202]:
        raise ValueError("Error sending request: {} - {}".format(r.status_code, r.text))
    if 'stream' in kwargs:
        # The streaming format needs some munging:
        first_line = True
        for line in r.iter_lines():
            if first_line:
                count_bytes = int(line)
                first_line = False
                continue
            body = json.loads(line[:count_bytes])
            count_bytes = int(line[count_bytes:])
            if body.get("type") == "HEARTBEAT":
                continue
            # When we get OFFERS we want to see them (and eventually, use them)
            if body.get("type") == "OFFERS":
                global offers
                offers = body.get("offers")
            # We need to capture the framework_id to use in subsequent requests.
            if body.get("type") == "SUBSCRIBED":
                global framework_id
                framework_id = body.get("subscribed").get("framework_id").get("value")
                if framework_id:
                    print("Framework {} registered with Master at ({})".format(framework_id, url))
            if terminate:
                return
    return r

## Warm up

The following code just checks that there is connectivity and the settings are all correct: do not move forward until this run successfully.

In [60]:
r = requests.get("{}/state.json".format(MASTER_URL))
master_state = r.json()
print("Mesos version running at {}".format(master_state["version"]))

r = requests.get("{}/state.json".format(SLAVE_URL))
slave_state = r.json()

# If this is not true, you're in for a world of hurt:
assert master_state["version"] == slave_state["version"]

def get_framework(index=None, id=None):
    if index and id:
        raise ValueError("Cannot specify both ID and Index")
    r = requests.get("{}/state.json".format(MASTER_URL))
    master_state = r.json()
    frameworks = master_state.get("frameworks")
    if frameworks and isinstance(frameworks, list):
        if index is not None and len(frameworks) > index:
            return frameworks[index]
        elif id:
            for framework in frameworks:
                if framework.get("id") == id:
                    return framework

# And right now there ought to be no frameworks:
assert get_framework(index=0) is None

Mesos version running at 0.25.0


# Registering a Framework

Using the HTTP API requires to run at least two separate threads: one for the "incoming" Master messages **to** the Framework (the HTTP connection we opened with the initial `SUBSCRIBE` `POST`) and another **from** the Framework to the Master to actual convey our requests (eg, accepting `OFFER`s).

We will be using the `threading` module, as this is I/O-bound and there is no CPU contention; we will run a background thread (`persistent_channel`) to receive messages from Mesos, and will use the main thread to send `requests` to Master.

The code in this Notebook **is not thread-safe**; in particular, we don't use any form of locking, as there is no real concern about races over shared data: in real production code, one should obviously protect shared data with suitable `locks` (see the [Python Multithreading documentation](https://docs.python.org/3/library/threading.html) for more details).

In [61]:
try:
    kwargs = {'stream':True, 'timeout':30}
    persistent_channel = Thread(target=post, args=(API_URL, SUBSCRIBE_BODY), kwargs=kwargs)
    persistent_channel.daemon = True
    persistent_channel.start()
    print("The background channel was started to {}".format(API_URL))
except Exception, ex:
    print("An error occurred: {}".format(ex))

The background channel was started to http://localhost:5051//api/v1/scheduler
Connecting to Master: http://localhost:5051//api/v1/scheduler


# Terminating a Framework

The request above will keep running forever (but see [Terminating the Request](#terminating) below) until we tear down the framework we just started:

In [62]:
def terminate_framework(fid=None):
    if not fid:
        framework = get_framework(0)
        if framework:
            fid = framework['id']
        else:
            print("No frameworks to terminate")
    body = TEARDOWN_BODY
    body['framework_id']['value'] = fid
    post(API_URL, body)

Framework 20150821-130640-855746752-5051-32042-0003 registered with Master at (http://localhost:5051//api/v1/scheduler)


## <a name="terminating"></a>Terminating the Request

The following is a "best effort" to close the running background thread that keeps the connection with Master alive: this actually only works if the Master keeps sending HEARTBEAT messages (so, on the next loop iteration `terminate` gets checked).

In theory, the `timeout` passed at start should prevent the thread to become unresponsive if no more messages are processed, but this does not necessarily seem to always be the case.

If all else fails, restarting the IPython kernel seems to be the only (unsatisfactory) solution.

In [63]:
def close_channel():
    if persistent_channel.is_alive():
        terminate = True
        
    framework_id = None
    offers = None
        
    # Wait a bit...
    sleep(5)
    print("Channel was closed: {}".format(persistent_channel.is_alive()))

In [64]:
# To close the incoming channel use the following code;
# this will also terminate the framework (if still running).

# NOTE: Commented out to avoid accidental execution
#terminate_framework(fid=framework_id)
#close_channel()

pass

# Accepting Offers for Resources

We need a tiny amount of resources (0.1 CPU, 32 MB of RAM) to run a simple command on the Slave.

## Wait for Offers

We need to wait first for the framework to register, then to get resource offers:

In [89]:
# This code is safe to execute any number of times; it will only try to connect once.
# In other words, it's idempotent:

count = 0
while not framework_id and count < 10:
    sleep(3)
    print('.', end="")
    count += 1
    
if not framework_id:
    print("Failed to register, terminating Framework")
    close_channel()
else:
    print("Registered a Framework with ID: {}".format(framework_id))

    print("Waiting for offers...")

    count = 0
    while not offers and count < 10:
        print('.', end="")
        sleep(3)
        count += 1
        
    if not offers:
        print("Failed to obtain resources, terminating Framework")
        terminate_framework(framework_id)
        close_channel()
    else:
        print("Got offers:")
        pretty.pprint(offers)

## Launch a Task using the given offers

We will use a `CommandInfo` protobuf, embedded inside the `Launch` message - you can find them in 
[`mesos.proto`](https://github.com/apache/mesos/blob/master/include/mesos/v1/mesos.proto#L260) while the full request body will be an [Accept](https://github.com/apache/mesos/blob/master/include/mesos/v1/scheduler/scheduler.proto#L228) message.

The following is a simplified version of the `Accept` JSON:

In [92]:
TASK_RESOURCES = [
    {   
        'name': 'cpus',
        'role': '*',
        'scalar': {'value': 0.05},
        'type': u'SCALAR'
    },
    {
        'name': u'mem',
        'role': u'*',
        'scalar': {'value': 32},
        'type': u'SCALAR'
    }
]

LAUNCH_BODY = {
    "framework_id": {
        "value": None
    },
    "type": "ACCEPT",
    "accept": {
        "offer_ids": [
        ],
        "operations": [
            {
                "type": "LAUNCH",
                "launch": {
                    "task_infos": [
                        {
                            "task_id": {"value": str(int(time.time()))},
                            "agent_id": {"value": '20150820-125856-855746752-5051-6674-S0'},
                            "name": "List files",
                            "command": {
                                "shell": True,
                                "value": None
                            },
                            "resources": TASK_RESOURCES
                        }
                    ]
                }
            }
        ]
    }
}

LAUNCH_BODY["accept"]["offer_ids"].append(offers.get('offers')[0]['id'])
LAUNCH_BODY["framework_id"]["value"] = framework_id
LAUNCH_BODY["accept"]["operations"][0]["launch"]["task_infos"][0]["command"]["value"] = \
    "/usr/bin/notify-send -u critical -i WARN \"`whoami`, You did it!\" 'Message courtesy of Apache Mesos'"

try:
    r = post(API_URL, LAUNCH_BODY)
    print("Result: {}".format(r.status_code))
    if r.text:
        print(r.text)
except ValueError, err:
    print("Request failed: {}".format(err))

Connecting to Master: http://localhost:5051//api/v1/scheduler
Result: 202
