# Getting Started with Pilot-Streaming on Wrangler

In the first step we need to import all required packages and modules into the Python Path

In [None]:
# System Libraries
import sys, os
sys.path.append("..")
import pandas as pd

## logging
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.ERROR)
logging.getLogger("py4j").setLevel(logging.ERROR)
 

# Pilot-Streaming
import pilot.streaming
sys.modules['pilot.streaming']

The Pilot-Compute Description is a simple key/value style description of the cluster environment that should be started. Alternatively, the commandline tool delivered with this package can be used:

     pilot-streaming --resource=slurm://localhost --queue=normal --walltime=59 --number_cores=48 --framework spark 

# 1. Spark

## Start and Manage Pilots

In [3]:
### Required Spark configuration that needs to be provided before pyspark is imported and JVM started
os.environ["SPARK_LOCAL_IP"]='129.114.58.2' #must be done before pyspark is loaded
import pyspark

spark_pilot1_description = {
    "resource":"slurm://localhost",
    "working_directory": os.path.join('/work/01131/tg804093/wrangler/', "work"),
    "number_cores": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 59,
    "type":"spark"
}

Start Spark Cluster and Wait for Startup Completion

In [4]:
%%time
spark_pilot1 = pilot.streaming.PilotComputeService.create_pilot(spark_pilot1_description)
spark_pilot1.wait()

**** Job: 59315 State : Pending
Create Spark Context for URL: spark://129.114.58.136:7077
Create Spark Context for URL: spark://129.114.58.136:7077
CPU times: user 180 ms, sys: 89.1 ms, total: 269 ms
Wall time: 23.3 s


In [5]:
spark_pilot1.get_details()

{'master_url': 'spark://129.114.58.136:7077',
 'spark_home': '/work/01131/tg804093/wrangler/work/spark-0de84fe6-f1b2-11e7-a551-549f35083c1c/spark-2.2.1-bin-hadoop2.7',
 'web_ui_url': 'http://129.114.58.136:8080'}

In [6]:
sc = spark_pilot1.get_context()

Create Spark Context for URL: spark://129.114.58.136:7077


In [7]:
rdd = sc.parallelize([1,2,3])
rdd.map(lambda a: a*a).collect()

[1, 4, 9]

## Extend Spark Cluster 1 with additional resources

To extend a Spark cluster simple start another Pilot referencing the other Pilot-Job ID in the Pilot Compute Description.

In [8]:
spark_pilot1_jobid = spark_pilot1.get_id()

spark_pilot_description2 = {
    "resource":"slurm://localhost",
    "working_directory": os.path.join('/work/01131/tg804093/wrangler/', "work"),
    "number_cores": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 59,
    "type":"spark",
    "parent": spark_pilot1_jobid
}

In [9]:
%%time
spark_pilot2 = pilot.streaming.PilotComputeService.create_pilot(spark_pilot_description2)
spark_pilot2.wait()

**** Job: 59316 State : Pending
Create Spark Context for URL: spark://129.114.58.136:7077
Create Spark Context for URL: spark://129.114.58.136:7077
CPU times: user 89.5 ms, sys: 51.6 ms, total: 141 ms
Wall time: 16.2 s


**Note:** Connection Spark Cluster are the same as the parent Pilot Job

In [10]:
spark_pilot2.get_details()

{'master_url': 'spark://129.114.58.136:7077',
 'spark_home': '/work/01131/tg804093/wrangler/work/spark-1ed095e8-f1b2-11e7-a551-549f35083c1c/spark-2.2.1-bin-hadoop2.7',
 'web_ui_url': 'http://129.114.58.136:8080'}

Stop both Pilots

In [11]:
spark_pilot1.cancel()
spark_pilot2.cancel()

# 2. Kafka

In [2]:
kafka_pilot_description1 = {
    "resource":"slurm://localhost",
    "working_directory": os.path.join('/work/01131/tg804093/wrangler/', "work"),
    "number_cores": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 59,
    "type":"kafka"
}

In [3]:
%%time
kafka_pilot1 = pilot.streaming.PilotComputeService.create_pilot(kafka_pilot_description1)
kafka_pilot1.wait()

**** Job: 59351 State : Pending
look for configs in: /work/01131/tg804093/wrangler/work/kafka-1b9acffe-f1c3-11e7-9bc2-549f35083c1c/config
['broker-0']
Kafka Config: /work/01131/tg804093/wrangler/work/kafka-1b9acffe-f1c3-11e7-9bc2-549f35083c1c/config (Thu Jan  4 20:50:05 2018)
{'zookeeper.connection.timeout.ms': '6000', 'broker.id': '0', 'listeners': 'PLAINTEXT://c251-133:9092', 'zookeeper.connect': 'c251-133:2181'}
CPU times: user 194 ms, sys: 79.5 ms, total: 273 ms
Wall time: 25.8 s


In [4]:
kafka_pilot1.get_details()

{'details': {'broker.id': '0',
  'listeners': 'PLAINTEXT://c251-133:9092',
  'zookeeper.connect': 'c251-133:2181',
  'zookeeper.connection.timeout.ms': '6000'},
 'master_url': 'c251-133:2181'}

## Extend Pilot

In [5]:
kafka_pilot1_jobid = kafka_pilot1.get_id()

kafka_pilot_description2 = {
    "resource":"slurm://localhost",
    "working_directory": os.path.join('/work/01131/tg804093/wrangler/', "work"),
    "number_cores": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 59,
    "type":"kafka",
    "parent": kafka_pilot1_jobid
}

In [6]:
%%time
kafka_pilot2 = pilot.streaming.PilotComputeService.create_pilot(kafka_pilot_description2)
kafka_pilot2.wait()

**** Job: 59352 State : Pending
look for configs in: /work/01131/tg804093/wrangler/work/kafka-2b010166-f1c3-11e7-9bc2-549f35083c1c/config
['broker-1']
Kafka Config: /work/01131/tg804093/wrangler/work/kafka-2b010166-f1c3-11e7-9bc2-549f35083c1c/config (Thu Jan  4 20:50:20 2018)
{'zookeeper.connection.timeout.ms': '6000', 'broker.id': '1', 'listeners': 'PLAINTEXT://c251-135:9092', 'zookeeper.connect': 'c251-133:2181'}
CPU times: user 88.3 ms, sys: 35.9 ms, total: 124 ms
Wall time: 18.8 s


In [7]:
kafka_pilot2.get_details()

{'details': {'broker.id': '1',
  'listeners': 'PLAINTEXT://c251-135:9092',
  'zookeeper.connect': 'c251-133:2181',
  'zookeeper.connection.timeout.ms': '6000'},
 'master_url': 'c251-133:2181'}

Check Brokers of Kafka Cluster

In [8]:
import pykafka
zkhost=kafka_pilot2.get_details()['master_url']
client = pykafka.KafkaClient(zookeeper_hosts=zkhost)
client.brokers

{0: <pykafka.broker.Broker at 0x2b438f3a1c50 (host=c251-133, port=9092, id=0)>}

## Cancel Pilots

In [9]:
kafka_pilot1.cancel()
kafka_pilot2.cancel()

# 3. Dask

In [8]:
dask_pilot_description1 = {
    "resource":"slurm://localhost",
    "working_directory": os.path.join('/work/01131/tg804093/wrangler/', "work"),
    "number_cores": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 59,
    "type":"dask"
}

In [9]:
%%time
dask_pilot1 = pilot.streaming.PilotComputeService.create_pilot(dask_pilot_description1)
dask_pilot1.wait()

**** Job: 59348 State : Pending
CPU times: user 85.1 ms, sys: 40.1 ms, total: 125 ms
Wall time: 5.44 s


In [10]:
dask_pilot1.get_details()

{'master_url': 'tcp://c251-133:8786', 'web_ui_url': 'http://c251-133:8787'}

## Extension

In [11]:
dask_pilot1_jobid = dask_pilot1.get_id()
dask_pilot_description2 = {
    "resource":"slurm://localhost",
    "working_directory": os.path.join('/work/01131/tg804093/wrangler/', "work"),
    "number_cores": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 59,
    "type":"dask",
    "parent": dask_pilot1_jobid
}

In [12]:
dask_pilot2 = pilot.streaming.PilotComputeService.create_pilot(dask_pilot_description2)
dask_pilot2.wait()

**** Job: 59349 State : Pending


In [13]:
dask_pilot2.get_details()

{'master_url': 'tcp://c251-133:8786', 'web_ui_url': 'http://c251-133:8787'}

Testing new Cluster

In [15]:
import distributed
dask_client  = distributed.Client(dask_pilot1.get_details()['master_url'])
dask_client.scheduler_info()

{'address': 'tcp://129.114.58.133:8786',
 'id': 'Scheduler-f0aee3d5-0b61-4404-bcee-18c364474eac',
 'services': {'bokeh': 8787},
 'type': 'Scheduler',
 'workers': {'tcp://129.114.58.132:50776': {'cpu': 10.0,
   'executing': 0,
   'host': '129.114.58.132',
   'in_flight': 0,
   'in_memory': 0,
   'last-seen': 1515120541.188664,
   'local_directory': '/home/01131/tg804093/dask-worker-space/worker-C4tWH_',
   'memory': 102072320,
   'memory_limit': 134778585088,
   'name': 'tcp://129.114.58.132:50776',
   'ncores': 48,
   'num_fds': 23,
   'pid': 123991,
   'read_bytes': 558.2869765094172,
   'ready': 0,
   'services': {'bokeh': 8789, 'nanny': 35130},
   'time': 1515120604.501052,
   'time-delay': -63.81193709373474,
   'write_bytes': 1200.6171537836929},
  'tcp://129.114.58.133:39759': {'cpu': 12.0,
   'executing': 0,
   'host': '129.114.58.133',
   'in_flight': 0,
   'in_memory': 0,
   'last-seen': 1515120541.299997,
   'local_directory': '/home/01131/tg804093/dask-worker-space/worker-EP

In [16]:
dask_client.gather(dask_client.map(lambda a: a*a, range(10)))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

Cancel Pilots

In [None]:
dask_pilot1.cancel()
dask_pilot2.cancel()

tornado.application - ERROR - Exception in callback <functools.partial object at 0x2abcd579a158>
Traceback (most recent call last):
  File "/home/01131/tg804093/anaconda2/lib/python2.7/site-packages/tornado/ioloop.py", line 605, in _run_callback
    ret = callback()
  File "/home/01131/tg804093/anaconda2/lib/python2.7/site-packages/tornado/stack_context.py", line 277, in null_wrapper
    return fn(*args, **kwargs)
  File "/home/01131/tg804093/anaconda2/lib/python2.7/site-packages/tornado/ioloop.py", line 626, in _discard_future_result
    future.result()
  File "/home/01131/tg804093/anaconda2/lib/python2.7/site-packages/tornado/concurrent.py", line 238, in result
    raise_exc_info(self._exc_info)
  File "/home/01131/tg804093/anaconda2/lib/python2.7/site-packages/tornado/gen.py", line 1063, in run
    yielded = self.gen.throw(*exc_info)
  File "/home/01131/tg804093/anaconda2/lib/python2.7/site-packages/distributed/client.py", line 804, in _update_scheduler_info
    self._scheduler_iden