In [1]:
from coffea import util
from coffea.nanoevents import BaseSchema
import numpy as np
import yaml
import os
import subprocess
from processor_mHrecoil import mHrecoil
from coffea.dataset_tools import apply_to_fileset,max_chunks,preprocess
import dask
import copy
from dask.diagnostics import ProgressBar
pgb = ProgressBar()
pgb.register()

In [2]:
import hist
from coffea.analysis_tools import Cutflow
def lazy_summary(d,ntabs=1):
    tab = '\t'*ntabs
    print_string ='{\n'
    for key,value in d.items():
        print_string += f"{tab}{key} : "
        if isinstance(value,dict):
            print_string += lazy_summary(value, ntabs=ntabs+1)
        elif isinstance(value, hist.hist.Hist):
            print_string += f"{type(value)}\tIntegral:{value.sum()}\n"
        elif isinstance(value, Cutflow):
            print_string += f"{type(value)}\tInitial events:{value.result().nevcutflow[0]}\n"
        else :
            print_string += f"{type(value)}\n"
    print_string += tab+'}\n'
    return print_string

In [3]:
myfileset = {
    'p8_ee_ZH_ecm240': {
        'files': {
            '../data/p8_ee_ZH_ecm240/events_082532938.root':'events',
            '../data/p8_ee_ZH_ecm240/events_082532938a.root':'events',
        },
        'metadata': {
            'dataset':'p8_ee_ZH_ecm240',
            'generation':'Spring2021'
        }
    },
    'p8_ee_ZZ_ecm240': {
        'files': {
            '../data/p8_ee_ZH_ecm240/events_082532938b.root':'events',
            '../data/p8_ee_ZH_ecm240/events_082532938c.root':'events',
        },
        'metadata': {
            'dataset':'p8_ee_ZZ_ecm240',
            'generation':'Spring2021'
        }
    }
}

In [4]:
process = {
    'collider':'FCCee',
    'campaign':'spring2021',
    'detector':'IDEA',
    'samples':['p8_ee_ZZ_ecm240','p8_ee_WW_ecm240','p8_ee_ZH_ecm240']
}
fraction = {
    'p8_ee_ZZ_ecm240':0.005,
    'p8_ee_WW_ecm240':0.5,
    'p8_ee_ZH_ecm240':0.2
}

In [5]:
executor='condor'
maxchunks=10

In [6]:
def break_into_many(input_fileset,n):
    '''
    Split a given fileset into n almost even filesets
    '''
    
    # Create an indexed fileset
    fileset = copy.deepcopy(input_fileset)
    index = 0
    for dataset in input_fileset.keys():
        for filename,treename in input_fileset[dataset]['files'].items():
            fileset[dataset]['files'][filename] = {'treename': treename, 'index': index}
            index += 1

    # Split the array as required
    nfiles = sum([len(fileset[dataset]['files']) for dataset in fileset.keys()])
    if n == 0 :
        return [input_fileset]
    elif n > 0 and n <= index:
        index_split = np.array_split(np.arange(nfiles),n)
    else :
        raise ValueError(f'Allowed values of n between 0 and {index}')

    # Choose the required indices for each split
    raw = [copy.deepcopy(input_fileset) for i in range(n)]
    for f in range(n):
        for dataset in fileset.keys():
            for event in fileset[dataset]['files'].keys():
                if not fileset[dataset]['files'][event]['index'] in index_split[f]:
                    del raw[f][dataset]['files'][event]

    #remove empty fields
    out = copy.deepcopy(raw)
    for f in range(n):
        for dataset in raw[f].keys():
            if len(raw[f][dataset]['files']) == 0 :
                del out[f][dataset]

    return out
nparallel = 4
# [print(lazy_summary(i)) for i in break_into_many(input_fileset=myfileset,n=nparallel)]
fileset = break_into_many(input_fileset=myfileset,n=nparallel)

In [7]:
def create_job_python_file(dataset_runnable, maxchunks,filename, output_file, path, ):
    s = f'''
from coffea import util
from coffea.nanoevents import BaseSchema
import os
from processor_mHrecoil import mHrecoil
from coffea.dataset_tools import apply_to_fileset,max_chunks
import dask

dataset_runnable = {dataset_runnable}
maxchunks = {maxchunks}

to_compute = apply_to_fileset(
            mHrecoil(),
            max_chunks(dataset_runnable, maxchunks),
            schemaclass=BaseSchema,
)
computed = dask.compute(to_compute)
(Output,) = computed

print("Saving the output to : " , "{output_file}")
if not os.path.exists("{path}"):
    os.makedirs("{path}")
util.save(output= Output, filename="{path}"+"{output_file}")
print("File {output_file} saved at {path}")
print("Execution completed.")
    
    '''
    with open(filename,'w') as f:
        f.write(s)

In [8]:
def create_job_shell_file(filename, python_job_file):
    s = f'''
#!/usr/bin/env bash

export COFFEA_IMAGE=coffeateam/coffea-dask-almalinux8:2024.5.0-py3.11

echo "Coffea Image: ${{COFFEA_IMAGE}}"

EXTERNAL_BIND=${{PWD}}


singularity exec -B ${{PWD}}:/srv -B /etc/condor -B /eos -B /afs -B /cvmfs --pwd /srv \
/cvmfs/unpacked.cern.ch/registry.hub.docker.com/${{COFFEA_IMAGE}} \
/usr/local/bin/python3 {python_job_file} -e dask
    
    '''
    with open(filename,'w') as f:
        f.write(s)

In [9]:
def create_submit_file(filename, executable, output):
    s = f'''
universe = vanilla
executable = {executable}

should_transfer_files = IF_NEEDED
when_to_transfer_output = ON_EXIT
transfer_input_files= batch_runner_mHrecoil.py,processor_mHrecoil.py
transfer_output_files= {output}

output = out-{executable.strip('job_').strip('.sh')}.$(ClusterId).$(ProcId)
error = err-{executable.strip('job_').strip('.sh')}.$(ClusterId).$(ProcId)
log = log-{executable.strip('job_').strip('.sh')}.$(ClusterId).$(ProcId)

queue 1
    
    '''
    with open(filename,'w') as f:
        f.write(s)

In [10]:
output_file = "mHrecoil_mumu.coffea"
path = 'outputs/FCCee/higgs/mH-recoil/mumu/'
print('Preparing fileset before run...')
if not os.path.exists(path):
    os.makedirs(path)


dataset_runnable, dataset_updated = zip(*[preprocess(
    fl,
    align_clusters=False,
    step_size=50_000,
    files_per_batch=1,
    skip_bad_files=True,
    save_form=False,
) for fl in fileset ]
                                       )

# print(dataset_runnable) # is a tuple
#For local dask execution
if executor == "dask" :
    Output = []
    print("Executing locally with dask ...")
    for i in range(len(dataset_runnable)):
        print('Chunk : ',i)
        to_compute = apply_to_fileset(
                    mHrecoil(),
                    max_chunks(dataset_runnable[i], maxchunks),
                    schemaclass=BaseSchema,
        )
        computed = dask.compute(to_compute)
        (Out,) = computed
        Output.append(Out)
        if nparallel > 1:
            output_filename = output_file.strip('.coffea')+f'-chunk{i}'+'.coffea'
        else:
            output_filename = output_file
        print("Saving the output to : " , output_filename)
        util.save(output= Out, filename=path+output_filename)
        print(f"File {output_filename} saved at {path}")
    print("Execution completed.")

#For condor execution
elif executor == "condor" :
    print("Executing with condor ...")
    if not os.path.exists('Batch'):
        os.makedirs('Batch')
    os.chdir('Batch')
    for i in range(len(dataset_runnable)):
        if nparallel > 1:
            output_filename = output_file.strip('.coffea')+f'-chunk{i}'+'.coffea'
        else:
            output_filename = output_file
        create_job_python_file(
            dataset_runnable[i],
            maxchunks,
            f'job_{i}.py',
            output_filename,
            path
        )
        print(f'\tjob_{i}.py created')
        create_job_shell_file(
            f'job_{i}.sh',
            f'job_{i}.py'
        )
        print(f'\tjob_{i}.sh created')
        create_submit_file(
            filename=f'submit_{i}.sh',
            executable=f'job_{i}.sh',
            output=path
        )
        print(f'\tsubmit_{i}.sh created')
        p = subprocess.run(["cat",f"submit_{i}.sh"], capture_output=True).stdout.decode("utf-8")
        print(p)
os.chdir('../')

Preparing fileset before run...
[########################################] | 100% Completed | 101.51 ms
[########################################] | 100% Completed | 224.51 ms
[########################################] | 100% Completed | 102.62 ms
[########################################] | 100% Completed | 286.52 ms
[########################################] | 100% Completed | 101.69 ms
[########################################] | 100% Completed | 224.37 ms
[########################################] | 100% Completed | 101.56 ms
[########################################] | 100% Completed | 229.39 ms
Executing with condor ...
	job_0.py created
	job_0.sh created
	submit_0.sh created

universe = vanilla
executable = job_0.sh

should_transfer_files = IF_NEEDED
when_to_transfer_output = ON_EXIT
transfer_input_files= batch_runner_mHrecoil.py,processor_mHrecoil.py
transfer_output_files= outputs

output = out-0.$(ClusterId).$(ProcId)
error = err-0.$(ClusterId).$(ProcId)
log = log-0.$(ClusterI

In [11]:
print(subprocess.run(["lhj", "-l", "./"], capture_output=True).stdout.decode("utf-8"))

FileNotFoundError: [Errno 2] No such file or directory: 'lhj'

In [None]:
print(subprocess.run(['cat','submit_0.sh'], capture_output=True).stdout.decode("utf-8"))