# A Prototype U.S. CMS analysis facility

by 

Mat Adamec, Ken Bloom, **Oksana Shadura**, 
*University of Nebraska, Lincoln*

Garhan Attebury, Carl Lundstedt, Derek Wietzel,
*University of Nebraska Holland Computing Center*

Mátyás Selmeci,
*University of Wisconsin, Madison*

Brian Bockelman,
*Morgridge Institute*



## Coffea - Columnar Object Framework For Effective Analysis

[Coffea Team](https://github.com/CoffeaTeam) && [Coffea Framework](https://github.com/CoffeaTeam/coffea)

* Leveraging large data and data analysis tools from Python to provide an array-based syntax for manipulating HEP event data
* Stark contrast to well established event loop techniques
* "+" Tremendous potential to fundamentally change the time-to-science in HEP
* "+" **Scales well horizontally with available muliple executors for efficient and flexible computations**
* "-" Cannot easily utilize current analysis facilities (T2s) as the analysis is not grid friendly, it's meant to be quasi-interactive


<img src="https://coffeateam.github.io/coffea/_images/columnar.png" width="400" height="400">


## Facility design: Coffea Local Executors

Two local executors: *iterative_executor* and *futures_executor*:

* The *iterative* executor simply processes each chunk of an input dataset in turn, using the current python thread.

* The *futures* executor employs python multiprocessing to spawn multiple python processes that process chunks in parallel on the machine. **Processes are used rather than threads to avoid performance limitations due to the CPython global interpreter lock (GIL))**

## Facility design: Coffea Distributed Executors


Four types of distributed executors:

 * the **Parsl** distributed executor, accessed via *parsl_executor*, 

 * the **Dask** distributed executor, accessed via *dask_executor*,

 * the **Apache Spark** distributed executor, accessed via *run_spark_job*,

 * and the **Work Queue** distributed executor, accessed via *work_queue_executor*.

## Dask: scalable analytics in Python

* Dask provides flexible library for parallel computing in Python
* Think of Dask as run-time parallel + cluster plugin for Python
* Easily installed via Conda as the module “distributed”
* NOT really designed with multi-user environments in mind  out-of-the-box
* Integrates with HPC clusters running a variety of scheduler including SLURM & HTCondor via “dask-jobqueue”

<img src="https://docs.dask.org/en/latest/_images/dask-overview.svg" width="600" height="600">


## Why Dask?

* <span style="color:green"> Dask Scales out to Clusters </span>: Dask figures out how to break up large computations and route parts of them efficiently onto distributed hardware.

* <span style="color:blue"> Dask Scales Down to Single Computers </span>: Dask can enable efficient parallel computations on single machines by leveraging their multi-core CPUs and streaming data efficiently from disk.

* <span style="color:orange"> Dask Integrates Natively with Python Code </span>: Python includes computational libraries like Numpy, Pandas, and Scikit-Learn, and many others for data access, plotting, statistics, image and signal processing, and more. These libraries work together seamlessly to produce a cohesive ecosystem of packages that co-evolve to meet the needs of analysts in most domains today.

* <span style="color:red"> Dask Supports Complex Applications </span>: Dask helps exposing low-level APIs to its internal task scheduler which is capable of executing very advanced computations. (e.g. the ability to build their own parallel computing system using the same engine that powers Dask’s arrays, DataFrames, and machine learning algorithms, but now with the institution’s own custom logic) **[similiar to our use case]**

* <span style="color:purple"> Dask Delivers Responsive Feedback </span>: monitoring via real-time and responsive dashboard, installed profiler, embedded IPython kernel


## Requirements for Analysis Facility @ T2

* Easy to use for users
* Scalable (dynamically/automatically)
* Responsive/Interactive
* **Utilize currently deployed hardware/middleware**
* **Minimally intrusive for site administrators**
* In addition it is important to get work (‘effort’ & CPU) accounted for by CMS




## Why Kubernetes (k8s)?

Stack Overflow blog: "*Kubernetes is about six years old, and over the last two years, it has risen in popularity to consistently be one of the most loved platforms. This year, it comes in as the number three most loved platform.*"

**Kubernetes is a platform that allows to run and orchestrate container workloads.**

![k8s](https://d33wubrfki0l68.cloudfront.net/69e55f968a6f44613384615c6a78b881bfe28bd6/42cd3/_common-resources/images/flower.svg)

## Why Kubernetes (k8s)?

* <span style="color:orange">Kubernetes is very extensible, and developers love that.</span>
There are a set of existing resources like Pods, Deployments, StatefulSets, Secrets, ConfigMaps, etc. However, users and developers can add more resources in the form of Custom Resource Definitions.
* <span style="color:green"> Infrastructure as YAML. </span>
All the resources in Kubernetes, can simply be expressed in a YAML file.
* <span style="color:blue">Scalability. </span>
Software can be deployed for the first time in a scale-out manner across Pods, and deployments can be scaled in or out at any time.
* <span style="color:red">Time savings. </span>
Pause a deployment at any time and resume it later.
* <span style="color:purple">Version control. </span>
Update deployed Pods using newer versions of application images and roll back to an earlier deployment.


## Why Kubernetes (k8s)?
 
* **Horizontal autoscaling.** 
Kubernetes autoscalers automatically size a deployment’s number of Pods based on the usage of specified resources (within defined limits).

* **Rolling updates.**
Updates to a Kubernetes deployment are orchestrated in “rolling fashion,” across the deployment’s Pods. These rolling updates are orchestrated while working with optional predefined limits on the number of Pods that can be unavailable and the number of spare Pods that may exist temporarily.

* **Canary deployments.**
A useful pattern when deploying a new version of a deployment is to first test the new deployment in production, in parallel with the previous version, and scale up the new deployment while simultaneously scaling down the previous deployment.

* **Security and Controls.**
YAML is a great way to validate what and how things get deployed in Kubernetes. For example, one of the significant concerns when it comes to security is whether your workloads are running as a non-root user.

*  <span style="color:purple">**Another big aspect of Kubernetes popularity is its strong community.**</span>


## Proposed Analysis Facility @ T2 Nebraska

<img src="pics/coffea-casa.png" width="700" height="700" />

## Proposed Analysis Facility @ T2 Nebraska: challenges

<img src="pics/coffea-casa-challenge.png" width="700" height="700" />

## Proposed Analysis Facility @ T2 Nebraska: configuration
<img src="pics/coffea-casa1.png" width="700" height="700" />

## Proposed Analysis Facility @ T2 Nebraska: configuration
<img src="pics/coffea-casa2.png" width="700" height="700" />

## Proposed Analysis Facility @ T2 Nebraska: configuration
<img src="pics/coffea-casa3.png" width="700" height="700" />

## Coffea-casa: why you would love it?

* <span style="color:blue">No x509 </span>: authentification enabled via CILogin and authorization by *WLCG Bearear Tockens** (macaroons)
* <span style="color:purple">Security</span>: enabled TLS protocol over TCP sockets
* <span style="color:green"> No need to think about xrootd </span>: **XCache with new XRootD autorization plugin**
* <span style="color:red"> Access to "grid-style" analysis but from Python notebook!</span>



## XCache


* *Xcache is a Squid-like cache*, but it primarily uses the “xroot” protocol, with HTTP protocol being added on. 
* *XCache provides a multi-threaded file caching application that can asynchronously fetch and cache file segments or whole files*.
* Its primary design use case is caching static scientific data files of any format, large or small.
* *Xcache is built upon Xrootd* and is flexible to be customized for many usage scenarios, via configuration or plugins.


## XRootd authorization plugin

* Code: https://github.com/bbockelm/xrdcl-authz-plugin

```console
$ xrdcp -f root://xcache//store/data/Run2017B/SingleElectron/MINIAOD/31Mar2018-v1/60000/9E0F8458-EA37-E811-93F1-008CFAC919F0.root /dev/null                                      
Looking for token in file /home/cse496/bbockelm/projects/xrdcl-authz-plugin/xcache_token
[3.65GB/3.65GB][100%][==================================================][934.5MB/s]
```


## Coffea-casa secrets

All secrets are available in the directory */etc/cmsaf-secrets* at container startup.

* */etc/cmsaf-secrets/condor_token* is a condor IDTOKEN useful for submitting to T3.
* */etc/cmsaf-secrets/ca.key* is a CA private key useful for Dask
* */etc/cmsaf-secrets/ca.pem* is a CA public key useful for Dask
* */etc/cmsaf-secrets/hostcert.pem* is a host certificate and private key useful for the Dask scheduler.
* */etc/cmsaf-secrets/usercert.pem* is a user certificate and private key useful for the Dask workers.

## Coffea-casa building blocks

* JupyterHub setup (Helm K8s charts): https://github.com/CoffeaTeam/jhub (except of specific secrets)
* Docker images for Dask Scheduler and Worker: https://github.com/CoffeaTeam/coffea-casa
  * https://hub.docker.com/r/coffeateam/coffea-casa
  * https://hub.docker.com/r/coffeateam/coffea-casa-analysis
* Docker image for JupyterHub (to get authentification macaroons in the launch environment)
https://github.com/clundst/jhubDocker

## When you will be able to use it?

* Expected to be available soon for preliminary tests by CMS community.
* Fully available on September 2021.

## Demo time

... lets try to see how it works!


  1. [Simple loop over events](adl1_tls.ipynb)
  2. [More complex benchmark](adl8.ipynb)
  3. [Using XCache](coffea_xcache.ipynb)
