Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallel=true, concurrent writes to a typed dictionary causes segmentation fault #8767

Closed
mdatres opened this issue Feb 17, 2023 · 14 comments
Closed
Labels
ParallelAccelerator question Notes an issue as a question stale Marker label for stale issues.

Comments

@mdatres
Copy link

mdatres commented Feb 17, 2023

Dear all,

this is my first time using Numba, so maybe I'm doing something wrong. I have tried to compile the following function using Numba:

@jit(nopython=True, parallel=True, nogil=True, cache=True)
def numba_get_fisher(out_alld: Dict, gradients: Dict, len_layers):
samples = Dict()
for i,d in enumerate(out_alld):
racc = Dict()
for j in range(len_layers+1):
if j !=0:
norm_for_x = numba_multivariate_normal_func(d[j], 0.0000001*np.eye(d[j].shape[0]),5)
racc[j-1] = norm_for_x

    samples[i] = racc
fishers = Dict()
for j,el in enumerate(gradients.values()):
    intermediate_fishers= List()
    for i, g in enumerate(el):
        int_fisher = np.zeros(shape = (g[0].shape[0], g[0].shape[0]))
        mean = out_alld[j][i+1]
        for num in samples[j][i]:
            to_scal_prod = (1/0.0000001)*np.dot(num-mean, g)
            int_fisher += np.outer(to_scal_prod, np.transpose(to_scal_prod))
        intermediate_fishers.append(int_fisher/5)


    fishers[j] = intermediate_fishers

return fishers

I have noticed that when gradients.values() is small everything works fine but increasing that value the code returns a segmentation fault. I have no idea about what is happening. Can you help me?
Furthermore, I was wondering if there exists a penumarate iterator to explicitly parallelize the for loop.

Thanks in advance,
Datres

@stuartarchibald
Copy link
Contributor

Thanks for the report.

  1. Please could you include a full reproducer for the issue that can be run as python bug.py (i.e. some example input needs to be included)?
  2. Does your code work as expected with parallel=False?
  3. Does your code work as expected when the @jit decorator is removed?
  4. There's no penumerate I'm afraid, the nearest thing is prange and then doing an indexing operation in the loop.

@stuartarchibald stuartarchibald added the more info needed This issue needs more information label Feb 17, 2023
@mdatres
Copy link
Author

mdatres commented Feb 17, 2023

Thanks for the quick reply.

  1. Yes, you can find it in attachment (bug.txt).
  2. Yes, it works correctly.
  3. Yes it works.
  4. I have tried but I received an unexpected get_item error and I don't know why. In attachment, you can find the code to reproduce the error (bug_prange.txt).

I have attached the txt files because .py files are not supported, you just need to change the extension.

Thanks again for your help!

bug.txt
bug_prange.txt

@stuartarchibald
Copy link
Contributor

Thanks for the quick reply.

1. Yes, you can find it in attachment (bug.txt).

2. Yes, it works correctly.

3. Yes it works.

4. I have tried but I received an unexpected get_item error and I don't know why. In attachment, you can find the code to reproduce the error (bug_prange.txt).

I have attached the txt files because .py files are not supported, you just need to change the extension.

Please could you use triple back ticks ``` to create copy paste-able code as a comment (it makes issues/problems more searchable in future)? Many Thanks. Here's an example:

def foo():
    pass

which worked via the mark-up:

```python
def foo():
    pass
```

Thanks again for your help!

No problem.

@mdatres
Copy link
Author

mdatres commented Feb 17, 2023

Ok! These are the new versions.
bug_prange.txt
bug.txt

@stuartarchibald
Copy link
Contributor

Apologies, I may not have explained the request RE code samples well enough. The code sample needs to be as part of the text written into these text boxes (i.e. in-line as part of the text, not as an attached file). The way to do that is with triple-backticks as demonstrated here: #8767 (comment), an example of what this looks like from another user can be found in #8742 (comment). Hope this helps?

@mdatres
Copy link
Author

mdatres commented Feb 17, 2023

Ok, I'm sorry for the misunderstanding. In the following block I define the Numba compiled function:

import numpy as np

import numpy as np
from torch import nn
import torch
import torch.nn.functional as F


import numpy as np 
import numpy
import sympy
import math
from numba.typed import Dict, List
from numba import jit, prange


@jit(nopython=True, nogil=True)
def numba_trace(array):
    return np.trace(array)

@jit(nopython=True, nogil=True)
def numba_get_fhat(fishers, num_inputs):
    fisher_teta = List()
    trace = List()
    somma = fishers[0]
    lunghezza = len(fishers[0])
    for i in range(lunghezza):
        avg = np.empty((len(fishers.values()),fishers[0][i].shape[0],fishers[0][i].shape[0]))
        for l,f in enumerate(fishers.values()):
            avg[l] = f[i] 
        trace.append(np.trace(np.sum(avg, axis=0)/avg.shape[0]))
    
    for e in range(len(fishers)):
        
        if e%(num_inputs)<(num_inputs-1):
            somma_l = List()
            for i, f in enumerate(fishers[e]):
                somma_l.append(somma[i]+f)
            somma = somma_l

        else:
            ft = List()
            for j,s in enumerate(somma):
                ft.append(s*s.shape[0]/(num_inputs*trace[j]))
            fisher_teta.append(ft)
            somma = fishers[e]
    return fisher_teta
   

@jit(nopython=True, parallel=True,nogil=True)
def numba_eig(arr):
    return np.linalg.eig(arr)

@jit(nopython=True,parallel=True, nogil=True)
def numba_multivariate_normal_func(mean, cov, n):
    l = np.empty(shape=(n,mean.shape[0]), dtype=np.float64)
    for i in range(n):
        sample=np.empty(shape=mean.shape[0], dtype=np.float64)
        for j in range(len(mean)):
            sample[j] = np.random.normal(mean[j], cov[j,j])
        l[i, :] =sample
    return l

    
@jit(nopython=True, parallel=True, nogil=True, cache=True)
def numba_get_fisher(out_alld: Dict, gradients: Dict, len_layers):
    samples = Dict()
    for i,d in enumerate(out_alld):   
        racc = Dict()
        for j in range(len_layers+1):
            if j !=0:
                norm_for_x = numba_multivariate_normal_func(d[j], 0.0000001*np.eye(d[j].shape[0]),5)  
                racc[j-1] = norm_for_x
                

        samples[i] = racc
    fishers = Dict()
    for j,el in enumerate(gradients.values()):
        intermediate_fishers= List()
        for i, g in enumerate(el):
            int_fisher = np.zeros(shape = (g[0].shape[0], g[0].shape[0]))
            mean = out_alld[j][i+1]
            for num in samples[j][i]:
                to_scal_prod = (1/0.0000001)*np.dot(num-mean, g)
                int_fisher += np.outer(to_scal_prod, np.transpose(to_scal_prod))
            intermediate_fishers.append(int_fisher/5)


        fishers[j] = intermediate_fishers

    return fishers

Since I am working with Pytorch, I attach also the model's class:

class LClassicalNeuralNetwork( nn.Module):
    def __init__(self, size):
        '''
        :param size: list, the size must contain [input_size, neurons in 1st hidden layer, ...,
        neurons in nth hidden layer, output_size]
        '''
        nn.Module.__init__(self)
        self.size = size
        self.inputsize = size[0]
        self.outputsize = size[-1]
        self.layers = nn.ModuleList(
            [nn.Linear(self.size[i - 1], self.size[i], bias=False) for i in range(1, len(self.size))]).double()
        self.d = sum(size[i] * size[i + 1] for i in range(len(size) - 1))
        self.selected_out = Dict()
        
        
    

        

    def forward(self, x):
        '''
        Computes the output of the neural network with tanh activation adam and log_softmax of last layer.
        :param x: ndarray, data inputs for the model (can be one or multiple)
        :param params: for now, random params are used, need to add functionality for using passed params
        :return: torch tensor, model output of size (len(x), output_size)
        '''
        tot = False
        
        if not torch.is_tensor(x):
            self.selected_out[0] = x
            x = torch.from_numpy(x)
        else:
            self.selected_out[0] = x.detach().numpy()
        for i in range(len(self.size) - 2):
            x = F.leaky_relu(self.layers[i](x))
            self.selected_out[i+1] = x.detach().numpy()
            tot = True
        if tot:
            x = self.layers[-1](x)
            self.selected_out[len(self.size) - 1] = x.detach().numpy()
        else:
            x = F.leaky_relu(self.layers[-1](x))
            self.selected_out[len(self.size) - 1] = x.detach().numpy()
        return x, self.selected_out


    def get_gradient(self, x, num_data):
        '''
        Computes the gradients of every parameter using each input x, wrt every output.
        :param x: ndarray, data inputs for the model (can be one or multiple)
        :param params: for now, random params are used, need to add functionality for using passed params
        :return: numpy array, gradients of size (len(x), output_size, d)
        '''
        if not torch.is_tensor(x):
            x = torch.from_numpy(x)
            x.requires_grad_(False)

        gradvectors = Dict()
        seed = 0
        dimensions =Dict()
        for i, layer in enumerate(self.layers):
            dimensions[i] = (layer.in_features, layer.out_features)
        out_alld= List()
        for m in range(len(x)):
            if m % num_data == 0:   
                seed += 1
            torch.manual_seed(seed)
            net = LClassicalNeuralNetwork(self.size)
            net._create_rand_params()  
            state_dict_for_theta = net.state_dict()
            _, outx = net(x[m])
            out_alld.append(outx)
            grad_k = List()
            for k in range(len(self.layers)):
                size_k = [dimensions[k][0], dimensions[k][1]]
                net_k = LClassicalNeuralNetwork(size = size_k)
                net_k.layers[0].weight = torch.nn.Parameter(state_dict_for_theta[list(state_dict_for_theta)[k]])
                out_k,_ = net_k(outx[k])
                grad = List()
                for i in range(net_k.outputsize):
                    net_k.zero_grad()
                    out_k[i].backward(retain_graph=True)
                    grads = List()
                    for param in net_k.parameters():
                        grads.append(param.grad.view(-1).detach().numpy())
                    gr = np.concatenate(grads)
                    grad.append(gr)
                    
                jacobian = np.concatenate(grad)
                
                jacobian = np.reshape(jacobian, (net_k.outputsize, net_k.d))
                grad_k.append(jacobian)
            
            gradvectors[m]= grad_k
        return gradvectors, out_alld


    def get_fisher(self, out_alld, gradients):
        fishers = numba_get_fisher(out_alld=out_alld, gradients=gradients, len_layers=len(self.layers))
        return fishers

        
    def _create_rand_params(self):
        for l in self.layers:
            if type(l) == nn.Linear:
                l.weight.data.uniform_(0,1)

def give_min_1(interest):
    ret = []
    for el in interest:
        row = el[:-1][:]
        ret.append(row)
    return ret

This is the main that execute everything:

def compute_lower_effdim(model, data, num_theta, eps):
    '''
    Compute the upper bound of the effectice dimension for a model p_theta with L layers, which is:

                d^(inf)_1(2pilog(n)/n) = 1/|log(2pilog(n)/n)| log(int_theta_1 det(id + n/2pilog(n)F^1/2_1(theta_1)) dtheta_1) 
                .
                .
                d^(inf)_L(2pilog(n)/n) = d^(inf)_1(2pilog(n)/n) + ...+ d^(inf)_{L-1} (2pilog(n)/n) + 1/(|log(2pilog(n)/n) Phi(Thetahat_L)|) int_{thetahat_L} log(int_{theta_L} det(id + n/2pilog(n)F^1/2_L(theta_1, ...theta_L)) dtheta_L) Phi(dtheta_1, ..., dtheta_{L-1})
    
    where Thetahat_j = Theta_1 x ... x Theta_j and Phi_j(Thetahat_j)

    Args:
        model   :   the model for which we want to compute the upper effective dimension;
        data    :   the data for which we want to compute the effective dimension 
                    (numpy array where data[0] is the first input);
        num_theta:  the number of samples that we want to use to estimate the integral in theta;   

    Output:
        eff_dim of the model one the data.
    '''


    
    num_inputs = data.shape[0]
    num_layers = len(model.size)
    x = numpy.tile(data, (num_theta, 1))
    grads, out_all = model.get_gradient(x, num_inputs)
    fishers = model.get_fisher(out_alld = out_all, gradients = grads)

    fisher_teta = numba_get_fhat(fishers, num_inputs=num_inputs)
    def compute_det_eps(matrix, eps):
        ret = []
        eige,_ =numpy.linalg.eig(matrix) 
        for e in eps:
            sqrt_eige = numpy.sqrt(numpy.maximum(numpy.real(eige),0.0))*1/sympy.Float(e)
            det = sympy.prod(1+sqrt_eige)
            ret.append(det)
        return ret

    summ_all_teta = dict()
    for i, el in enumerate(fisher_teta):
        add = []
        for j, obj in enumerate(el):
            add.append(compute_det_eps(obj, eps))
        summ_all_teta[i] = add

    
    
    eff_j = []
    for i in range(num_layers-1):
        if i==0:   
            interest = [numpy.array(el[1][0]) for el in list(summ_all_teta.items())]
            num = sum(interest)/num_theta
            res = [sympy.log(n) for n in num]
            app = [res[i]/abs(sympy.log(eps[i])) for i in range(len(eps))]
            eff_j.append(app)
        else:
            
            interest = [numpy.array(el[1][:i+1]) for el in list(summ_all_teta.items())]
            interest_j = [numpy.array(el[1][i]) for el in list(summ_all_teta.items())]
            for el in interest_j:
                first_int = [sympy.log(n) for n in el]

            phi_m_theta = sum([numpy.prod(i,axis=0) for i in give_min_1(interest)])/num_theta
            
            num = sum(numpy.multiply(numpy.array(first_int), numpy.array([numpy.prod(i,axis=0) for i in give_min_1(interest)])))/num_theta
            res = num/phi_m_theta
            app = [res[i]/abs(math.log(eps[i])) for i in range(len(eps))]
            eff_j.append(app)
    return numpy.sum(numpy.array(eff_j), axis=0).tolist()


epsilon = np.linspace(start=1e-300, stop=1e-200, num = 10).tolist()
model_input = 10

sizes =[[model_input,5,1,1,2]]
num_data = 10
num_theta = 10
data = np.random.normal(0, 1, size=(num_data, model_input))

netl = LClassicalNeuralNetwork(size=[model_input,5,2])

l_eff_dim = []

l_eff_dim.append(compute_lower_effdim(model=netl, data=data, num_theta=num_theta, eps=epsilon))

I write this one which is the topic of the issue. To reproduce the error of point it is sufficient to change:

for j,el in enumerate(gradients.values()):

in the function numba_get_fisher with:

for j in prange(len(gradients.values())):
        el = gradients.values()[j]

@stuartarchibald
Copy link
Contributor

Many thanks for creating a reproducer in text, it makes it so that it can be found more easily in text based search.

On inspection of the above, I think the issue is that in the numba_get_fisher function, if the loop is made into a prange loop (as described), then there is concurrent write access being performed on the dictionary in the variable fishers = Dict(). The Numba container implementations are not thread safe, from the docs:

Unsupported Operations
======================
This section contains a non-exhaustive list of commonly encountered but
currently unsupported features:
#. **Mutating a list is not threadsafe**
Concurrent write operations on container types (i.e. lists, sets and
dictionaries) in a ``prange`` parallel region are not threadsafe e.g.::
@njit(parallel=True)
def invalid():
z = []
for i in prange(10000):
z.append(i)
return z
It is highly likely that the above will result in corruption or an access
violation as containers require thread-safety under mutation but this feature
is not implemented.

I suspect this is the cause of the segmentation fault.

@mdatres
Copy link
Author

mdatres commented Feb 17, 2023

Thanks, hence I cannot parallelize it, am I right?

@stuartarchibald
Copy link
Contributor

I don't think it's possible to safely do this right now without relying on some internal implementation details present in Numba, which may obviously change.

@stuartarchibald stuartarchibald added ParallelAccelerator question Notes an issue as a question and removed more info needed This issue needs more information labels Feb 17, 2023
@stuartarchibald stuartarchibald changed the title Parallel= true cause segmentation fault Parallel=true, concurrent writes to a typed dictionary causes segmentation fault Feb 17, 2023
@stuartarchibald
Copy link
Contributor

NOTE: have updated title to reflect the issue.

@mdatres
Copy link
Author

mdatres commented Feb 19, 2023

Thanks for your help. There is still one point which is not clear to me. While setting model_input = 10, num_data = 10
num_theta = 10 everything works, when I increase these three variables to 100 the segmentation fault error appears. Do you know the reasons of this strange behaviour?

@stuartarchibald
Copy link
Contributor

@mdatres no problem. I'd guess that for some small amount of data the dictionary might not need to be resized internally (or it manages to survive one small resize) and as a result mutating it is "safe" in terms of not corrupting internal state, hence segmentation fault is avoided, but won't be necessarily be "safe" in terms of correctness.

@github-actions
Copy link

This issue is marked as stale as it has had no activity in the past 30 days. Please close this issue if no further response or action is needed. Otherwise, please respond with any updates and confirm that this issue still needs to be addressed.

@github-actions github-actions bot added the stale Marker label for stale issues. label Mar 26, 2023
@stuartarchibald
Copy link
Contributor

Closing this question as it seems to be resolved. Thanks for the discussion!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ParallelAccelerator question Notes an issue as a question stale Marker label for stale issues.
Projects
None yet
Development

No branches or pull requests

2 participants