In [1]:
# Disable warnings
import warnings
warnings.filterwarnings('ignore')

# Time profiling
import cProfile
from   timeit import default_timer

# Usual library
import numpy             as np
import pandas            as pd
import matplotlib        as mpl
import matplotlib.pyplot as plt
%matplotlib inline

# Plot settings
mpl.rcParams['legend.frameon' ] = False
mpl.rcParams['legend.fontsize'] = 'xx-large'
mpl.rcParams['xtick.labelsize'] = 16
mpl.rcParams['ytick.labelsize'] = 16
mpl.rcParams['axes.titlesize' ] = 18
mpl.rcParams['axes.labelsize' ] = 18
mpl.rcParams['lines.linewidth'] = 2.5

# root_numpy (http://scikit-hep.org/root_numpy)
from root_numpy import root2array

# Temporary to import my functions
%load_ext autoreload
%autoreload

import np_utils as npu

Welcome to JupyROOT 6.10/02


In [2]:
ar=root2array('410000.root','nominal_Loose',
               branches=['jet_pt','jet_eta','jet_phi','jet_mv2c10','jet_isbtagged_77',
                'el_pt','el_eta','el_phi',
                'mu_pt','mu_eta','mu_phi',
                'mu']
             ).view(np.recarray)
data=pd.DataFrame(ar)
data.head(10)
len(data)

1622880

In [3]:
large_data = pd.concat([data for _ in np.arange(1)])
len(large_data)

1622880

In [45]:
def get_length_array(a):
    import numpy as np
    return np.array([len(i) for i in a])

def square_array(a):
    lens = get_length_array(a)   
    mask = np.arange(lens.max()) < lens[:,None]    
    out = np.zeros(mask.shape, dtype='float32')
    out.fill(np.nan)    
    out[mask] = np.concatenate(a)   
    return out

In [46]:
jets=square_array(large_data['jet_pt'].values)

In [6]:
print(jets.shape)

(1622880, 13)


In [7]:
import ipyparallel as ipp
rc=ipp.Client()
view=rc[:]
print(view,rc.ids)
view.block=True
view['a'] = 5
view['b'] = 10
view.apply(lambda x: a+b+x, 27)

(<DirectView [0, 1, 2, 3,...]>, [0, 1, 2, 3, 4, 5])


[42, 42, 42, 42, 42, 42]

In [8]:
%timeit get_length_array(large_data['jet_pt'].values)

1 loop, best of 3: 304 ms per loop


In [9]:
def par_get_length_array(a):
    split_arrays=np.split(a,6)
    return view.apply(get_length_array,a)
    #return view.map(get_length_array,split_arrays)

jet_pt=large_data['jet_pt'].values[0:60]
print(jet_pt[0:2])
print(get_length_array(jet_pt[0:2]))

[array([169695.5 , 122250.03], dtype=float32)
 array([92278.93 , 70800.66 , 69653.164, 27776.486], dtype=float32)]
[2 4]


In [10]:
%timeit get_length_array(jet_pt)
%timeit par_get_length_array(jet_pt)

The slowest run took 4.09 times longer than the fastest. This could mean that an intermediate result is being cached.
100000 loops, best of 3: 11.7 µs per loop
100 loops, best of 3: 17.5 ms per loop


In [11]:
res=par_get_length_array(jet_pt)
for x in res:
    print(x.shape,x)

((60,), array([2, 4, 5, 2, 6, 2, 0, 3, 3, 2, 2, 2, 5, 2, 2, 3, 3, 2, 2, 2, 4, 0,
       3, 2, 2, 3, 5, 3, 2, 3, 5, 4, 3, 4, 2, 3, 3, 3, 1, 2, 7, 3, 3, 3,
       0, 3, 3, 2, 2, 2, 2, 2, 2, 1, 3, 3, 2, 2, 3, 2]))
((60,), array([2, 4, 5, 2, 6, 2, 0, 3, 3, 2, 2, 2, 5, 2, 2, 3, 3, 2, 2, 2, 4, 0,
       3, 2, 2, 3, 5, 3, 2, 3, 5, 4, 3, 4, 2, 3, 3, 3, 1, 2, 7, 3, 3, 3,
       0, 3, 3, 2, 2, 2, 2, 2, 2, 1, 3, 3, 2, 2, 3, 2]))
((60,), array([2, 4, 5, 2, 6, 2, 0, 3, 3, 2, 2, 2, 5, 2, 2, 3, 3, 2, 2, 2, 4, 0,
       3, 2, 2, 3, 5, 3, 2, 3, 5, 4, 3, 4, 2, 3, 3, 3, 1, 2, 7, 3, 3, 3,
       0, 3, 3, 2, 2, 2, 2, 2, 2, 1, 3, 3, 2, 2, 3, 2]))
((60,), array([2, 4, 5, 2, 6, 2, 0, 3, 3, 2, 2, 2, 5, 2, 2, 3, 3, 2, 2, 2, 4, 0,
       3, 2, 2, 3, 5, 3, 2, 3, 5, 4, 3, 4, 2, 3, 3, 3, 1, 2, 7, 3, 3, 3,
       0, 3, 3, 2, 2, 2, 2, 2, 2, 1, 3, 3, 2, 2, 3, 2]))
((60,), array([2, 4, 5, 2, 6, 2, 0, 3, 3, 2, 2, 2, 5, 2, 2, 3, 3, 2, 2, 2, 4, 0,
       3, 2, 2, 3, 5, 3, 2, 3, 5, 4, 3, 4, 2, 3, 3, 3, 1, 2, 7, 3, 3, 3,
  

In [48]:
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor

def multithreading(func, args, workers):
    executor = ThreadPoolExecutor(max_workers=workers)
    exec_res = executor.map(func,args)
    return list(exec_res)

def multiprocessing(func, args, workers):
    with ProcessPoolExecutor(max_workers=workers) as executor:
        exec_res = executor.map(func,args)
    return list(exec_res)

In [49]:
Nwokers=8
split_arrays=np.split(large_data['jet_pt'].values,Nwokers)
%timeit res=multithreading (func=square_array,args=split_arrays,workers=Nwokers)
%timeit res=multiprocessing(func=square_array,args=split_arrays,workers=Nwokers)
%timeit square_array(large_data['jet_pt'].values)

1 loop, best of 3: 1.71 s per loop
1 loop, best of 3: 10.1 s per loop
1 loop, best of 3: 931 ms per loop


In [40]:
def all_pairs(a,b=None,Nmax=None,axis=1,timing=False):

    # Is it the same collection
    same_arrays = b is None
        
    # Sanity check
    if not same_arrays:
        good_shape=np.array_equal(np.delete(a.shape,axis),np.delete(b.shape,axis))
        if not good_shape:
            err  = 'The shape along all dimensions but the one of axis={}'.format(axis)
            err += ' should be equal, while here:\n'
            err += '  -> shape of a is {} \n'.format(a.shape)
            err += '  -> shape of b is {} \n'.format(b.shape)
            raise NameError(err)
    
    # Reduce the number of objects to Nmax
    if Nmax:
        sl=[slice(None)]*a.ndim
        sl[axis]=slice(0,Nmax)
        if same_arrays: a,b=a[sl],None
        else          : a,b=a[sl],b[sl]
          
    # Individual indices
    if same_arrays:
        ia,jb=np.arange(a.shape[axis]),[]
    else:
        ia,jb=np.arange(a.shape[axis]),np.arange(b.shape[axis])
          
    # Pairs of indicies
    dt=np.dtype([('', np.intp)]*2)
    if same_arrays: ij=np.fromiter(itertools.combinations(ia,2),dtype=dt)     
    else          : ij=np.fromiter(itertools.product(ia,jb),dtype=dt)
    ij=ij.view(np.intp).reshape(-1, 2)

          
    # Array of all pairs
    if same_arrays: out=np.take(a,ij,axis=axis)
    else          : out=np.stack([a.take(ij[:,0],axis=axis),b.take(ij[:,1],axis=axis)],axis=axis+1)
    
    return out

In [43]:
Nwokers=6
split_arrays=np.split(large_data['jet_pt'].values,Nwokers)
args=[(a,None,1,False) for a in split_arrays]
%timeit res=multithreading(func=all_pairs,args=args,workers=Nwokers)
%timeit all_pairs(large_data['jet_pt'].values)

AttributeError: 'tuple' object has no attribute 'shape'