# LRZ Linux Cluster: Getting Started with Pilot-Streaming 

In the first step we need to import all required packages and modules into the Python Path

In [None]:
# System Libraries
import sys, os
sys.path.append("..")
import pandas as pd

## logging
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.ERROR)
logging.getLogger("py4j").setLevel(logging.ERROR)
 

# Pilot-Streaming
import pilot.streaming
sys.modules['pilot.streaming']

The Pilot-Compute Description is a simple key/value style description of the cluster environment that should be started. Alternatively, the commandline tool delivered with this package can be used:

     pilot-streaming --resource=slurm://localhost --queue=normal --walltime=59 --number_cores=48 --framework spark 

# 1. Spark

In [None]:
### Required Spark configuration that needs to be provided before pyspark is imported and JVM started
#os.environ["SPARK_LOCAL_IP"]='129.114.58.2' #must be done before pyspark is loaded
import pyspark

pilot_compute_description = {
    "resource":"slurm+ssh://mpp2-login6",
    "working_directory": os.path.join('/home/hpc/ug201/di57hah/project/', "work"),
    "number_cores": 56,
    "walltime": 59,
    "type":"spark"
}

Start Spark Cluster and Wait for Startup Completion

In [None]:
%%time

spark_pilot = pilot.streaming.PilotComputeService.create_pilot(pilot_compute_description)
spark_pilot.wait()

In [None]:
spark_pilot.get_details()

In [None]:
#sc = pyspark.SparkContext(master="spark://129.114.58.135:7077", appName="test")

In [None]:
sc = spark_pilot.get_context()

In [None]:
rdd = sc.parallelize([1,2,3])
rdd.map(lambda a: a*a).collect()

In [None]:
spark_pilot.cancel()

# 2. Kafka

## HPC

In [None]:
pilot_compute_description = {
    "resource":"slurm://localhost",
    "working_directory": os.path.join('/work/01131/tg804093/wrangler/', "work"),
    "number_cores": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 59,
    "type":"kafka"
}

In [None]:
%%time
kafka_pilot = pilot.streaming.PilotComputeService.create_pilot(pilot_compute_description)
kafka_pilot.wait()

In [None]:
kafka_pilot.get_details()

In [None]:
kafka_pilot.cancel()

## Jetstream Cloud

In [None]:
import os
os.system("type python")


In [None]:
# System Libraries
import sys, os
import pandas as pd

## logging
import logging
logging.basicConfig(level=logging.DEBUG)
logging.getLogger().setLevel(logging.ERROR)
logging.getLogger("py4j").setLevel(logging.ERROR)
 

# Pilot-Streaming
import pilot.streaming
sys.modules['pilot.streaming']

RESOURCE_URL_DASK = "ssh://aluckow@js-129-114-17-61.jetstream-cloud.org"
RESOURCE_URL_KAFKA = "ssh://aluckow@js-129-114-17-61.jetstream-cloud.org"
WORKING_DIRECTORY = os.path.join(os.environ["HOME"], "work")

pilot_compute_description = {
    "resource": RESOURCE_URL_KAFKA,
    "working_directory": WORKING_DIRECTORY,
    "number_of_nodes": 1,
    "cores_per_node": 1,
    "config_name": "jetstream",
    "walltime": 59,
    "type": "kafka"
}
kafka_pilot = pilot.streaming.PilotComputeService.create_pilot(pilot_compute_description)
kafka_pilot.wait()


In [None]:
import os
os.getcwd()

# 3. Dask

In [None]:
import distributed

pilot_compute_description = {
    "resource":"slurm://localhost",
    "working_directory": os.path.join('/work/01131/tg804093/wrangler/', "work"),
    "number_cores": 48,
    "project": "TG-MCB090174",
    "queue": "normal",
    "walltime": 59,
    "type":"dask"
}

In [None]:
%%time
dask_pilot = pilot.streaming.PilotComputeService.create_pilot(pilot_compute_description)
dask_pilot.wait()

In [None]:
dask_pilot.get_details()

In [None]:
import distributed
dask_client  = distributed.Client(dask_pilot.get_details()['master_url'])
dask_client.scheduler_info()

In [None]:
dask_client.gather(dask_client.map(lambda a: a*a, range(10)))