# Getting Started with Pilot-Streaming on Stampede

In the first step we need to import all required packages and modules into the Python Path

Pilot-Streaming utilizes [SAGA-Python](http://saga-python.readthedocs.io/en/latest/tutorial/part3.html) to manage the Spark cluster environment. All attributes of the SAGA Job map 1-to-1 to the Pilot Compute Description. 

`resource`: URL of the Local Resource Manager. All SAGA adaptors are supported. Examples:

* `slurm://localhost`: Submit to local SLURM resource manager, e.g. on master node of Wrangler or Stampede
* `slurm+ssh://login1.wrangler.tacc.utexas.edu`: Submit to Wrangler master node SLURM via SSH (e.g. on node running a job)

`type:` The `type` attributes specifies the cluster environment. It can be: `Spark`, `Dask` or `Kafka`.


Note: This is not required anymore on Stampede 2

Depending on the resource there might be other configurations necessary, e.g. to ensure that the correct subnet is used the Spark driver can be configured using various environment variables:   os.environ["SPARK_LOCAL_IP"]='129.114.58.2'

In [1]:
# System Libraries
import sys, os
sys.path.append("..")
import pandas as pd

## logging
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.ERROR)
logging.getLogger("py4j").setLevel(logging.ERROR)

import sys, os
sys.path.append("..")
import pandas as pd
import datetime

In [2]:
# System Libraries
import sys, os
sys.path.append("..")
import pandas as pd
## logging
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.ERROR)
logging.getLogger("py4j").setLevel(logging.ERROR)
import datetime
import confluent_kafka
from confluent_kafka import TopicPartition
import pykafka
import pyspark
import time
import redis
import uuid
import os
import pickle
import math
# Dask
import dask.array as da
import dask.bag as db
from dask.delayed import delayed
import distributed
from distributed import Client
# Pilot-Streaming
import pilot.streaming
sys.modules['pilot.streaming']

<module 'pilot.streaming' from '/home1/06548/tg858476/anaconda3/lib/python3.7/site-packages/Pilot_Streaming-0.31.2-py3.7.egg/pilot/streaming.py'>

In [3]:
RESOURCE_URL="slurm+ssh://login4.stampede2.tacc.utexas.edu"
WORKING_DIRECTORY=os.path.join(os.environ["HOME"], "work")

# 1. Kafka

In [4]:
pilot_compute_description = {
    "resource":RESOURCE_URL,
    "working_directory": WORKING_DIRECTORY,
    "number_of_nodes": 1,
    "cores_per_node": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "config_name": "stampede",
    "walltime": 59,
    "type":"kafka"
}

In [5]:
%%time
kafka_pilot = pilot.streaming.PilotComputeService.create_pilot(pilot_compute_description)
kafka_pilot.wait()

DEBUG:pilot-streaming:Pilot-Streaming SLURM: Parsing job description: {'executable': 'python', 'arguments': ['-m ', 'pilot.plugins.kafka.bootstrap_kafka', ' -n ', 'stampede'], 'working_directory': '/home1/06548/tg858476/work/kafka-dbd88714-e6d8-11e9-8599-15f796122f42', 'output': 'kafka_job_kafka-dbd88714-e6d8-11e9-8599-15f796122f42.stdout', 'error': 'kafka_job_kafka-dbd88714-e6d8-11e9-8599-15f796122f42.stderr', 'number_of_nodes': 1, 'cores_per_node': 48, 'project': 'TG-MCB090174', 'reservation': None, 'queue': 'normal', 'walltime': 59}
DEBUG:pilot-streaming:Submit pilot job to: slurm+ssh://login4.stampede2.tacc.utexas.edu
DEBUG:pilot-streaming:Type Job IDps-dbdb0


Working Directory: /home1/06548/tg858476/work
/tmp/tmpurwqgtud
Submission of Job Command: ssh login4.stampede2.tacc.utexas.edu sbatch  tmpurwqgtud
Cleanup: ssh login4.stampede2.tacc.utexas.edu rm tmpurwqgtud


DEBUG:pilot-streaming:Pilot-Streaming SLURM: SSH run job finished
DEBUG:pilot-streaming:Output - 
To access the system:

1) If not using ssh-keys, please enter your TACC password at the password prompt
2) At the TACC Token prompt, enter your 6-digit code followed by <return>.  


-----------------------------------------------------------------
          Welcome to the Stampede2 Supercomputer                 
-----------------------------------------------------------------

No reservation for this job
--> Verifying valid submit host (login4)...OK
--> Verifying valid jobname...OK
--> Enforcing max jobs per user...OK
--> Verifying availability of your home dir (/home1/06548/tg858476)...OK
--> Verifying availability of your work dir (/work/06548/tg858476/stampede2)...OK
--> Verifying availability of your scratch dir (/scratch/06548/tg858476)...OK
--> Verifying valid ssh keys...OK
--> Verifying access to desired queue (normal)...OK
--> Verifying job request is within current queue limits.

**** Job: 4448178 State : Running
look for configs in: /home1/06548/tg858476/work/kafka-dbd88714-e6d8-11e9-8599-15f796122f42/config
['broker-0']
Kafka Config: /home1/06548/tg858476/work/kafka-dbd88714-e6d8-11e9-8599-15f796122f42/config (Fri Oct  4 13:58:30 2019)
{'broker.id': '0', 'listeners': 'PLAINTEXT://c401-013:9092', 'zookeeper.connect': 'c401-013:2181', 'zookeeper.connection.timeout.ms': '6000'}
CPU times: user 82.8 ms, sys: 200 ms, total: 282 ms
Wall time: 1min 12s


In [6]:
kafka_pilot.get_details()

look for configs in: /home1/06548/tg858476/work/kafka-dbd88714-e6d8-11e9-8599-15f796122f42/config
['broker-0']
Kafka Config: /home1/06548/tg858476/work/kafka-dbd88714-e6d8-11e9-8599-15f796122f42/config (Fri Oct  4 13:58:30 2019)
{'broker.id': '0', 'listeners': 'PLAINTEXT://c401-013:9092', 'zookeeper.connect': 'c401-013:2181', 'zookeeper.connection.timeout.ms': '6000'}


{'master_url': 'c401-013:2181',
 'details': {'broker.id': '0',
  'listeners': 'PLAINTEXT://c401-013:9092',
  'zookeeper.connect': 'c401-013:2181',
  'zookeeper.connection.timeout.ms': '6000'}}

In [None]:
kafka_pilot.cancel()

# 2. Dask

In [7]:
import distributed

pilot_compute_description = {
    "resource":RESOURCE_URL,
    "working_directory": WORKING_DIRECTORY,
    "number_of_nodes": 1,
    "cores_per_node": 48,
    "dask_cores" : 24,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 359,
    "type":"dask"
}

In [8]:
%%time
dask_pilot = pilot.streaming.PilotComputeService.create_pilot(pilot_compute_description)
dask_pilot.wait()

DEBUG:pilot-streaming:Pilot-Streaming SLURM: Parsing job description: {'executable': 'python', 'arguments': ['-m', 'pilot.plugins.dask.bootstrap_dask', ' -p ', '24'], 'working_directory': '/home1/06548/tg858476/work/dask-718a0378-e6d9-11e9-8599-15f796122f42', 'output': 'dask_job_dask-718a0378-e6d9-11e9-8599-15f796122f42.stdout', 'error': 'dask_job_dask-718a0378-e6d9-11e9-8599-15f796122f42.stderr', 'number_of_nodes': 1, 'cores_per_node': 48, 'project': 'TG-MCB090174', 'reservation': None, 'queue': 'normal', 'walltime': 359, 'pilot_compute_description': {'resource': 'slurm+ssh://login4.stampede2.tacc.utexas.edu', 'working_directory': '/home1/06548/tg858476/work', 'number_of_nodes': 1, 'cores_per_node': 48, 'dask_cores': 24, 'project': 'TG-MCB090174', 'queue': 'normal', 'walltime': 359, 'type': 'dask'}}
DEBUG:pilot-streaming:Submit pilot job to: slurm+ssh://login4.stampede2.tacc.utexas.edu
DEBUG:pilot-streaming:Type Job IDps-718ac


Working Directory: /home1/06548/tg858476/work
dask-718a0378-e6d9-11e9-8599-15f796122f42/home1/06548/tg858476/work
/tmp/tmpaypyn7_o
Submission of Job Command: ssh login4.stampede2.tacc.utexas.edu sbatch  tmpaypyn7_o
Cleanup: ssh login4.stampede2.tacc.utexas.edu rm tmpaypyn7_o


DEBUG:pilot-streaming:Pilot-Streaming SLURM: SSH run job finished
DEBUG:pilot-streaming:Output - 
To access the system:

1) If not using ssh-keys, please enter your TACC password at the password prompt
2) At the TACC Token prompt, enter your 6-digit code followed by <return>.  


-----------------------------------------------------------------
          Welcome to the Stampede2 Supercomputer                 
-----------------------------------------------------------------

No reservation for this job
--> Verifying valid submit host (login4)...OK
--> Verifying valid jobname...OK
--> Enforcing max jobs per user...OK
--> Verifying availability of your home dir (/home1/06548/tg858476)...OK
--> Verifying availability of your work dir (/work/06548/tg858476/stampede2)...OK
--> Verifying availability of your scratch dir (/scratch/06548/tg858476)...OK
--> Verifying valid ssh keys...OK
--> Verifying access to desired queue (normal)...OK
--> Verifying job request is within current queue limits.

**** Job: 4448193 State : Running
init distributed client
Connect to Dask: tcp://c402-031:8786
Dask Client Connect Attempt 0 failed
init distributed client
Connect to Dask: tcp://c402-031:8786
Dask Client Connect Attempt 1 failed
init distributed client
Connect to Dask: tcp://c402-031:8786
Dask Client Connect Attempt 2 failed
init distributed client
Connect to Dask: tcp://c402-031:8786
{'type': 'Scheduler', 'id': 'Scheduler-b6c658a6-cae4-4da6-bed9-561e955275ea', 'address': 'tcp://206.76.194.69:8786', 'services': {'dashboard': 8787}, 'workers': {'tcp://206.76.194.69:39148': {'type': 'Worker', 'id': 'tcp://206.76.194.69:39148', 'host': '206.76.194.69', 'resources': {}, 'local_directory': '/home1/06548/tg858476/worker-bcamhi7t', 'name': 'tcp://206.76.194.69:39148', 'nthreads': 24, 'memory_limit': 92000000000, 'last_seen': 1570215811.4124334, 'services': {'dashboard': 42111}, 'metrics': {'cpu': 0.0, 'memory': 83361792, 'time': 1570215811.2157397, 'read_bytes': 0.0, 'write_bytes': 0.0, 'num

In [9]:
dask_pilot.get_details()

{'master_url': 'tcp://c402-031:8786', 'web_ui_url': 'http://c402-031:8787'}

In [10]:
import distributed
dask_client  = distributed.Client(dask_pilot.get_details()['master_url'])
dask_client.scheduler_info()

{'type': 'Scheduler',
 'id': 'Scheduler-b6c658a6-cae4-4da6-bed9-561e955275ea',
 'address': 'tcp://206.76.194.69:8786',
 'services': {'dashboard': 8787},
 'workers': {'tcp://206.76.194.69:39148': {'type': 'Worker',
   'id': 'tcp://206.76.194.69:39148',
   'host': '206.76.194.69',
   'resources': {},
   'local_directory': '/home1/06548/tg858476/worker-bcamhi7t',
   'name': 'tcp://206.76.194.69:39148',
   'nthreads': 24,
   'memory_limit': 92000000000,
   'last_seen': 1570216310.4432776,
   'services': {'dashboard': 42111},
   'metrics': {'cpu': 10.0,
    'memory': 107163648,
    'time': 1570216309.9269888,
    'read_bytes': 136285.44632290705,
    'write_bytes': 135804.07783893382,
    'num_fds': 25,
    'executing': 0,
    'in_memory': 0,
    'ready': 0,
    'in_flight': 0,
    'bandwidth': 100000000},
   'nanny': 'tcp://206.76.194.69:39441'}}}

In [11]:
dask_client.gather(dask_client.map(lambda a: a*a, range(10)))

[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]

# 3 Spark

In [12]:
### Required Spark configuration that needs to be provided before pyspark is imported and JVM started
#os.environ["SPARK_LOCAL_IP"]='129.114.58.101' #must be done before pyspark is loaded
import os
import pyspark

pilot_compute_description = {
   "resource":RESOURCE_URL,
    "working_directory": WORKING_DIRECTORY,
    "number_of_nodes": 1,
    "cores_per_node": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 359,
    "type":"spark"
}

Start Spark Cluster and Wait for Startup Completion

In [13]:
%%time

spark_pilot = pilot.streaming.PilotComputeService.create_pilot(pilot_compute_description)
spark_pilot.wait()

DEBUG:pilot-streaming:Pilot-Streaming SLURM: Parsing job description: {'executable': 'python', 'arguments': ['-m', 'pilot.plugins.spark.bootstrap_spark'], 'working_directory': '/home1/06548/tg858476/work/spark-e33511ce-e6da-11e9-8599-15f796122f42', 'output': 'spark_job_spark-e33511ce-e6da-11e9-8599-15f796122f42.stdout', 'error': 'spark_job_spark-e33511ce-e6da-11e9-8599-15f796122f42.stderr', 'number_of_nodes': 1, 'cores_per_node': 48, 'project': 'TG-MCB090174', 'reservation': None, 'queue': 'normal', 'walltime': 359}
DEBUG:pilot-streaming:Submit pilot job to: slurm+ssh://login4.stampede2.tacc.utexas.edu
DEBUG:pilot-streaming:Type Job IDps-e3359


Working Directory: /home1/06548/tg858476/work
/tmp/tmpy1kpepq3
Submission of Job Command: ssh login4.stampede2.tacc.utexas.edu sbatch  tmpy1kpepq3
Cleanup: ssh login4.stampede2.tacc.utexas.edu rm tmpy1kpepq3


DEBUG:pilot-streaming:Pilot-Streaming SLURM: SSH run job finished
DEBUG:pilot-streaming:Output - 
To access the system:

1) If not using ssh-keys, please enter your TACC password at the password prompt
2) At the TACC Token prompt, enter your 6-digit code followed by <return>.  


-----------------------------------------------------------------
          Welcome to the Stampede2 Supercomputer                 
-----------------------------------------------------------------

No reservation for this job
--> Verifying valid submit host (login4)...OK
--> Verifying valid jobname...OK
--> Enforcing max jobs per user...OK
--> Verifying availability of your home dir (/home1/06548/tg858476)...OK
--> Verifying availability of your work dir (/work/06548/tg858476/stampede2)...OK
--> Verifying availability of your scratch dir (/scratch/06548/tg858476)...OK
--> Verifying valid ssh keys...OK
--> Verifying access to desired queue (normal)...OK
--> Verifying job request is within current queue limits.

**** Job: 4448239 State : Running
Create Spark Context for URL: spark://206.76.194.222:7077
Create Spark Context for URL: spark://206.76.194.222:7077
CPU times: user 320 ms, sys: 208 ms, total: 528 ms
Wall time: 1min 16s


In [14]:
spark_pilot.get_details()

Create Spark Context for URL: spark://206.76.194.222:7077


{'spark_home': '/home1/06548/tg858476/work/spark-e33511ce-e6da-11e9-8599-15f796122f42/spark-2.4.4-bin-hadoop2.7',
 'master_url': 'spark://206.76.194.222:7077',
 'web_ui_url': 'http://206.76.194.222:8080'}

In [None]:
#conf=pyspark.SparkConf()
#conf.set("spark.driver.bindAddress", "129.114.58.101")
#sc = pyspark.SparkContext(master="spark://129.114.58.102:7077", appName="dfas")

In [15]:
#os.environ["SPARK_LOCAL_IP"]="129.114.58.101"
sc = spark_pilot.get_context()

Create Spark Context for URL: spark://206.76.194.222:7077


In [16]:
rdd = sc.parallelize([1,2,3])
rdd.map(lambda a: a*a).collect()

[1, 4, 9]

In [None]:
spark_pilot.cancel()