

From http://www.stanford.edu/~boyd/papers/pdf/admm_distr_stats.pdf.
Specifically, on p. 70 $\S$ 8.1.3, making the substitutions
$x_i=\beta_i, z_i=\mu_i$.


$$
   \newcommand{\argmin}{\mathop{argmin}}
   \begin{aligned}
   \beta_i^{k+1} &= \argmin_{\beta_i} \left(\frac{\rho}{2} \|X_i\beta_i-X_i\beta_i^k - \bar{\mu}^k + \bar{X\beta}^k + u^k\|^2_2 + \lambda\|\beta_i\|_1 \right) \\\
   \bar{\mu}^{k+1} &= \frac{1}{N+\rho} \left(y + \rho \bar{X\beta}^k + \rho u^k\right) \\\
   u^{k+1} &= u^k + \bar{X\beta}^k - \bar{\mu}^{k+1}
   \end{aligned}
$$

In [1]:
%load_ext parallelmagic
from IPython.parallel import Client
rc = Client()
dview = rc[:]
dview.activate()
dview.block = True

            Controller appears to be listening on localhost, but not on this machine.
            If this is true, you should specify Client(...,sshserver='you@127.0.1.1')
            or instruct your controller to listen on an external IP.


ValueError: Connection file is invalid (missing 'key'), possibly from an old version of IPython.
If you are reusing connection files, remove them and start ipcontroller again.

In [26]:
%%px
import os.path

import regreg.api as R; reload(R)
import regreg.paths as RP; reload(RP)
import numpy as np

import scipy.io, scipy.sparse

# np.random.seed(1)  # for debugging

class loss_factory(RP.squared_error_factory):
    
    def __init__(self, response, rho=1.):
        RP.squared_error_factory.__init__(self, response)
        self.rho = rho
   
    def __call__(self, X):
        n = self.response.shape[0]
        return R.squared_error(X, -self.response, coef=self.rho/n)

# subclass specific to this dataset
class newsgroup_factory(loss_factory):
    
    def __init__(self, newsgroup_path, rows, columns, rho=1.):
        # can we read the newsgroup.mat on all nodes at once?
        D = scipy.io.loadmat(os.path.join(newsgroup_path, 'newsgroup.mat'))
        X = D['X'][rows].tocsc(); Y = D['Y'].reshape(-1)[rows] 
        self.rows = rows
        self.X = X[:,columns]
        self.columns = columns
        self.selector = R.selector(columns, X.shape[1])
        loss_factory.__init__(self, Y, rho)
        
    def fitted(self, solution):
        solution = solution.tocsr()
        intercept = np.array(solution[0].todense()).reshape(-1)
        return np.array((self.X * solution[1:]).todense()) + intercept[np.newaxis, :]
    
    def beta(self, solution):
        return self.selector.adjoint_map(solution)

Parallel execution on engine(s): [0, 1]


Get ready to instantiate each node.


In [27]:
import numpy as np, scipy.io, os.path, regreg.api as R
newsgroup_path = '/home/fperez/research/code/regreg'
D = scipy.io.loadmat(os.path.join(newsgroup_path, 'newsgroup.mat'))
n, p = D['X'].shape;
columns = np.arange(p)
np.random.shuffle(columns)
rows = np.arange(n)
np.random.shuffle(rows)
rows_subsampled = rows[:1000]
columns_subsampled = columns[:1000]
dview.scatter('columns', columns_subsampled)
dview.push({'rows':rows_subsampled})

@dview.remote(block=True)
def init_nodes(lagrange_sequence, newsgroup_path, rows):
    global lasso, factory, fitted
    factory = newsgroup_factory(newsgroup_path, rows, columns)
    lasso = R.lasso(factory, factory.X)
    lasso.lagrange_sequence = lagrange_sequence
    fitted = np.zeros((factory.rows.shape[0], lasso.lagrange_sequence.shape[0]))
    
X = D['X'][rows_subsampled].tocsc(); Y = D['Y'].reshape(-1)[rows_subsampled] 
master_lasso = R.lasso.squared_error(X, Y)

objective = master_lasso.problem.objective

    

In [28]:
init_nodes([master_lasso.lagrange_sequence[50]], newsgroup_path, rows_subsampled)


[None, None]

Each node will have a copy of its own fitted values $X_i\beta_i^k$ stored as `fitted`. The
response for the next update is 
$$
X_i\beta_i^k + \bar{\mu}^k - \bar{X\beta}^k - u^k
$$

The variable `pseudo_response` is sent from the master node and is equal to 
$$
\bar{\mu}^k - \bar{X\beta}^k - u^k.
$$

The function `update_global_variables` takes care of the updates
$$
   \newcommand{\argmin}{\mathop{argmin}}
   \begin{aligned}
   \bar{\mu}^{k+1} &= \frac{1}{N+\rho} \left(y + \rho \bar{X\beta}^k + \rho u^k\right) \\\
   u^{k+1} &= u^k + \bar{X\beta}^k - \bar{\mu}^{k+1}
   \end{aligned}
$$

In [29]:
def update_global_variables(lasso_fits, y, u, rho=1.):
    # this is a reduction operation
    Xbeta_bar = np.mean(lasso_fits, 0)

    N = len(lasso_fits)
    mu_bar = (y[:,np.newaxis] + rho * (Xbeta_bar + u)) / (N + rho)
    u = u + Xbeta_bar - mu_bar
    return Xbeta_bar, mu_bar, u

The master must pass the `pseudo_response` to the nodes, which gets added to the local
fitted values and becomes the new response for the node's `lasso` instance.

In [32]:
@dview.remote()
def update_lasso_nodes(pseudo_response):
    global fitted, results
    results = lasso.main()
    #raise ValueError(`fitted.shape`)
    fitted[:] = factory.fitted(results['beta'])
    factory.response = fitted + pseudo_response
    return fitted

@dview.remote()
def get_beta():
    global factory, results
    return factory.beta(results['beta'])


pseudo_response = np.zeros((1000, 10))
u = np.zeros(pseudo_response.shape)
from IPython.parallel.error import CompositeError
try:
    lasso_fits = update_lasso_nodes(pseudo_response)
except CompositeError, e:
    e.raise_exception()
    
Xbeta_bar, mu_bar, u = update_global_variables(lasso_fits, Y, u)


#a = get_beta()
#print [aa.shape for aa in a]

RemoteError: ValueError(operands could not be broadcast together with shapes (1000) (1000,10) )
Traceback (most recent call last):
  File "/home/fperez/usr/lib/python2.7/site-packages/IPython/zmq/ipkernel.py", line 561, in apply_request
    exec code in self.shell.user_global_ns, self.shell.user_ns
  File "<string>", line 1, in <module>
  File "<ipython-input-32-f89333c8c28b>", line 4, in update_lasso_nodes
  File "/home/fperez/usr/local/lib/python2.7/site-packages/regreg/paths.py", line 272, in main
    coef_stop=coef_stop)
  File "/home/fperez/usr/local/lib/python2.7/site-packages/regreg/paths.py", line 224, in solve_subproblem
    sub_soln = subproblem.solve(**solve_args)
  File "/home/fperez/usr/local/lib/python2.7/site-packages/regreg/simple.py", line 69, in solve
    solver.fit(**fit_args)
  File "/home/fperez/usr/local/lib/python2.7/site-packages/regreg/algorithms.py", line 105, in fit
    current_f = self.composite.smooth_objective(r,mode='func')
  File "/home/fperez/usr/local/lib/python2.7/site-packages/regreg/simple.py", line 28, in smooth_objective
    vs = self.smooth_atom.smooth_objective(x, mode, check_feasibility)
  File "/home/fperez/usr/local/lib/python2.7/site-packages/regreg/smooth.py", line 152, in smooth_objective
    v = self.sm_atom.smooth_objective(eta, mode='func')
  File "/home/fperez/usr/local/lib/python2.7/site-packages/regreg/quadratic.py", line 50, in smooth_objective
    x = self.apply_offset(x)
  File "/home/fperez/usr/local/lib/python2.7/site-packages/regreg/composite.py", line 105, in apply_offset
    return x + self.offset
ValueError: operands could not be broadcast together with shapes (1000) (1000,10) 


In [33]:
%connect_info

{
  "stdin_port": 41621, 
  "ip": "127.0.0.1", 
  "hb_port": 42021, 
  "key": "7fa4437e-c791-4d99-af2d-d905d9eb06dc", 
  "shell_port": 59622, 
  "iopub_port": 37528
}

Paste the above JSON into a file, and connect with:
    $> ipython <app> --existing <file>
or, if you are local, you can connect with just:
    $> ipython <app> --existing kernel-0353447a-a8e0-4bde-9de7-2b2771f136dd.json 
or even just:
    $> ipython <app> --existing 
if this is the most recent IPython session you have started.
