# full_like issue reproducer
This notebook reproduces an issue with ```full_like``` in dask_awkward. I observe that when it is used, the dask_awkward job hangs. The Dask dashboard indicates that the work has been completed, but the processes don't seem to clear and the job never finishes.

After some trial and error I narrowed this down to the ```full_like``` method. If an array is built with this method the job hangs; if it is not used then it completes within a minute or so, as usual. One can set this with the flag below.

This only happens with dask_awkward; without dask the job completes as usual.

The notebook is a minimum reproducer from a much longer workbook that is used for teaching at the University of Oslo. Some of the inline comment and variabe names may reflect this, as may some unused data structures/variables. The relevant input file (a plain ROOT n-tuple) can be found at

```
/afs/cern.ch/work/j/jcatmore/public/dask_awkward
```

In [None]:
# Use or don't use full_like
useFullLike = True

In [None]:
# Set whether to use Dask or not
doDask = True
n_workers = 64 # Only relevant if Dask is used

# Imports
import time
import numpy as np
import os

# Import uproot
import uproot

# Import Awkward Array or Dask-awkward, depending on the setting above
if doDask:
    import dask_awkward as ak
else:
    import awkward as ak

# Import Pandas
import pandas as pd

## Define some functions
We need to define some helpful python functions for running with Dask and creating histograms.

In [None]:
# Set up a dask cluster 
# We set the number of workers to 64 and have a single execution thread per worker
# Thus each core independently works on its own batch of data
# The memory per process has to be carefully chosen - too small and the job will be slowed down due to over 
# frequent reads from disk. Too large and the machine will run out of memory and start to swap data back and 
# forth from disk 
if doDask:
    import dask
    from dask.distributed import Client,LocalCluster
    cluster = LocalCluster(n_workers=n_workers,processes=True,threads_per_worker=1, memory_limit="100GiB")
    client = Client(cluster)
    client

This function generates a convenient record from an Awkward Array handle and a dictionary of names and keys (it will be clear how it works later one)

In [None]:
# Function to build a record from the list of variable names
def buildRecord(awkwardEvents,variableDict):
    theDict = {}
    for key in variableDict.keys():
        if doDask: 
            theDict[key] = awkwardEvents[variableDict[key]]
        else:
            theDict[key] = awkwardEvents[variableDict[key]].array()
    return ak.zip(theDict)

## Getting and preparing the input 

In this part we list the variables needed and point the software at the relevant input files.

First we list all of the variables that we'll use in the analysis.

In [None]:
event_variables = {'scalef': 'scalef',
                   'n_electron': 'n_electron',
                   'n_muon': 'n_muon',
                   'n_tau': 'n_tau',
                   'n_photon': 'n_photon',
                   'n_heavyjet': 'n_heavyjet',
                   'n_lightjet': 'n_lightjet',
                   'category': 'category'
                  }

all_variables = []
for dictionary in [event_variables]:
    all_variables += list(dictionary.values())

In [None]:
# Set which processes to run... each process sits in its own file. All that is necessary is to list the
# process names in an array, which will then be looped over during the analysis.
# If not using Dask, just select one of them.

# Open the files and create the data structures. If you are running Dask, nothing will happen at this 
# stage (lazy execution)
path = "/storage/shared/data/PHYSLITEforML/forCompSciFeb04/"
process =  "singletop_nom_2L"
inputString = path+process+".root:CollectionTree"
if doDask:        
    events = uproot.dask(inputString,
                         library="ak",
                         filter_name=all_variables,
                         open_files=False)
else:
    events = uproot.open(inputString,filter_name=all_variables)

In [None]:
# Set up easy-to-use data structures
eventLevel = buildRecord(events,event_variables)

In [None]:
# Arrays to be written to HDF5
arraysForHDF5 = {}

In [None]:
# Build the arrays independent of e,m,t kinematics. 
arraysForHDF5["n_electron"] = eventLevel.n_electron
arraysForHDF5["n_muon"] = eventLevel.n_muon
arraysForHDF5["n_tau"] = eventLevel.n_tau
arraysForHDF5["n_photon"] = eventLevel.n_photon
arraysForHDF5["n_lightjet"] = eventLevel.n_lightjet
arraysForHDF5["n_heavyjet"] = eventLevel.n_heavyjet
arraysForHDF5["scalef"] = eventLevel.scalef
if useFullLike:
    if ("Higgs_" in process):
        arraysForHDF5["isSignal"] = ak.full_like(arraysForHDF5["scalef"],1)
    else:
        arraysForHDF5["isSignal"] = ak.full_like(arraysForHDF5["scalef"],0)

In [None]:
# If Dask used, compute the arrays (if run without dask they'll have already been computed)

start = time.time()

if doDask:

    arraysForEvaluation = []

    for result in arraysForHDF5.keys():
        arraysForEvaluation.append(arraysForHDF5[result])
            
    print("Number of arrays to evaluate = ",len(arraysForEvaluation))

    # Evaluate the arrays
    evaluated = dask.compute(arraysForEvaluation)
            
end = time.time()

print("Time taken to execute = ",end-start)

# Monitoring of DASK jobs
If you'd like to monitor your DASK jobs start a new terminal and open a tunnel to port 8787, which is the default port of the DASK dashbord. You can also configure the address using the *dashboard_address* parameter (see [LocalCluster](https://docs.dask.org/en/latest/deploying-python.html#distributed.deploy.local.LocalCluster)). The ssh-tunnel is set by doing in the terminal
```
ssh -L 8888:localhost:8787 username@hepp03.hpc.uio.no
```

Then open [http://localhost:8888/status](http://localhost:8888/status) (where you must switch 8888 with the port you chose above).

More information about Dask Dashboard can be found [here](https://docs.dask.org/en/latest/dashboard.html)