In [1]:
import os
import sys
import json
import time
import random
import numpy as np
from coffea import hist, processor

from dask_jobqueue import SLURMCluster
from distributed import Client
from dask.distributed import performance_report

  from distributed.utils import tmpfile


In [2]:
import awkward as ak
import vector
vector.register_awkward()

class Simple_Process(processor.ProcessorABC):
    def __init__(self, isMC: int, era: int, sample: str) -> None:
        self.gensumweight = 1.0
        self.era = era
        self.isMC = isMC
        self.sample = sample

        #Set up for the histograms
        self._accumulator = processor.dict_accumulator({
            "N_Muons": hist.Hist(
                "Events",
                hist.Bin("NMuons", "Num Muons", 10, 0, 10)
            ),   
        })

    @property
    def accumulator(self):
        return self._accumulator
    
    def process(self, events):
        output = self.accumulator.identity()
        dataset = events.metadata['dataset']
        Muons = ak.zip({
              "pt": arrays['Muon_pt'],
              "eta": arrays['Muon_eta'],
              "phi": arrays['Muon_phi'],
              "mass": arrays['Muon_mass']
        }, with_name="Momentum4D") 
        cut = (arrays['Muon_pt'] > 10) & \
            (abs(arrays['Muon_eta']) <= 2.4) & \
            (arrays['Muon_mediumId'] == 1) 
        Muons = Muons[cut]
        
        output["N_Muons"].fill(
            NMuons = ak.num(Muons),
        ) 
        return output

    def postprocess(self, accumulator):
        return accumulator

In [3]:
def check_port(port):
    import socket
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    try:
        sock.bind(("0.0.0.0", port))
        available = True
    except:
        available = False
    sock.close()
    return available

In [4]:
slurm_env = [
     'export XRD_RUNFORKHANDLER=1',
     'export XRD_STREAMTIMEOUT=10',
     f'source {os.environ["HOME"]}/.bashrc',
     f'conda activate dask',
     f'export X509_USER_PROXY={os.environ["HOME"]}/x509up_u206148'
]

#submachine = random.choice(["00","01","04","05","06","07","08"])
submachine = "00"

extra_args=[
     "--output=dask_out/dask_job_output_%j.out",
     "--error=dask_out/dask_job_output_%j.err",
     "--partition=submit{}".format(submachine),
     "--clusters=submit{}".format(submachine),
]

In [5]:
n_port       = 6820
w_port       = 9765
cores        = 1
processes    = 1
memory       = "5 GB"
chunksize    = 15000
maxchunks    = None

In [6]:
#Set up the processor
processor_instance = Simple_Process(isMC=1, era='2018', sample='test')

# load dataset
with open("samples.json") as f:
    sample_dict = json.load(f)
for key in sample_dict.keys():
    sample_dict[key] = sample_dict[key][:]

In [7]:
if not check_port(n_port):
    raise RuntimeError("Port '{}' is occupied on this node. Try another one.".format(n_port))

import socket
cluster = SLURMCluster(
        queue='all',
        project="SUEP_Slurm",
        cores=cores,
        processes=processes,
        memory=memory,
        #retries=10,
        walltime='00:30:00',
        scheduler_options={
              'port': n_port,
              'dashboard_address': 8000,
              'host': socket.gethostname()
        },
        extra = [
               '--worker-port {}'.format(w_port)

        ],
        job_extra=extra_args,
        env_extra=slurm_env,
)

Perhaps you already have a cluster running?
Hosting the HTTP server on port 7045 instead
Clear task state
  Scheduler at:     tcp://18.12.2.18:6820
  dashboard at:           18.12.2.18:7045


In [8]:
cluster.adapt(minimum=1, maximum=250)
client = Client(cluster)
print(client)

Receive client connection: Client-533f1ae9-7f8b-11ec-b0a6-000af7bd3c78


<Client: 'tcp://18.12.2.18:6820' processes=0 threads=0, memory=0 B>


In [9]:
with performance_report(filename="dask-report.html"):
    output = processor.run_uproot_job(sample_dict,
             treename='Events',
             processor_instance=processor_instance,
             executor=processor.dask_executor,
             executor_args={
                           'client': client,
                           #'skipbadfiles': skipbadfiles,
                           'schema': processor.NanoAODSchema,
                           'xrootdtimeout': 10,
                           'retries': 3,
                           },
             chunksize=chunksize,
             maxchunks=maxchunks)

Run out-of-band function 'lambda'


[                                        ] | 0% Completed |  2min 53.6s

KeyboardInterrupt: 