# Acknowledgement 
This is based on [this wonderful post](https://freakonometrics.hypotheses.org/53269), but adapted to a "tidy" approach for coding in R. Therefore we start by ...

In [1]:
require(tidyverse)

Loading required package: tidyverse
── Attaching packages ─────────────────────────────────────── tidyverse 1.2.1 ──
✔ ggplot2 3.0.0     ✔ purrr   0.2.5
✔ tibble  1.4.2     ✔ dplyr   0.7.6
✔ tidyr   0.8.1     ✔ stringr 1.3.1
✔ readr   1.1.1     ✔ forcats 0.3.0
── Conflicts ────────────────────────────────────────── tidyverse_conflicts() ──
✖ dplyr::filter() masks stats::filter()
✖ dplyr::lag()    masks stats::lag()


# Background
TBD (you can find the details in the original [post](https://freakonometrics.hypotheses.org/53269))

# R's built-in function
For reference, this is the result of the built-in R function `lm`

In [2]:
lm(dist~speed,data=cars)$coefficients

# Matrix multiplication
This is what's behind the function 

In [3]:
X = data_frame(intercept = 1, speed = cars$speed) %>% as.matrix()
y = cars$dist

And we can see that matrix multiplication $\left( X^{T}X \right)^{-1} X^{T} y$ gives the same results

In [4]:
solve(t(X) %*% X) %*% t(X) %*% y %>% t()

intercept,speed
-17.57909,3.932409


# QR decomposition

Using the [QR decomposition](https://en.wikipedia.org/wiki/QR_decomposition) $X = QR$ (with $Q^{T} Q = \mathbf{I}$) and $R$ an upper-triangular matrix) we can re-write the calculation as:

$$ \hat{\beta} = \left( X^{T} X \right)^{-1} X^{T} y = \left( R^{T}Q^{T} QR \right)^{-1} R^{T}Q^{T} y = \left( R^{T} R \right)^{-1} R^{T}Q^{T} y = R^{-1} \left( R^{T} \right)^{-1} R^{T} Q^{T} y  = R^{-1} Q^{T} y$$ 

* Note: The trivial looking operations of inversion and transposing multiplicated matrices requires some assumptionson the properties of $Q$ and $R$ mentioned above.

Putting the new formula into action we get the same results as before:

In [5]:
(X %>% qr %>% qr.R %>% solve()) %*% 
  (X %>% qr %>% qr.Q %>% t()) %*% 
  y %>% 
  t()

intercept,speed
-17.57909,3.932409


# Map reduce

As our data matrix increases in size (either number rows as observations or columns as attributes), the inversion part of the calculation `solve` and/or QR-factorization `qr` become more memory and time intensive for a single thread calculation.

## Map

Let's say we have `m = 5` cores at our disposal and we'd like to distribute the calculation.

In [6]:
m <- 5

We define the process in a "tidy" way, so all `map` operations here are [embarrassingly parallel](https://en.wikipedia.org/wiki/Embarrassingly_parallel). For simplicity we split the rows between the blocks using modulo operation, but this can be done sequentially or based on existing partitions.

In [7]:
mats <- X %>%
  as_data_frame() %>%
  mutate(
    id = row_number() %% m,
    y = y
  ) %>% 
  ## this is where partitioning happens
  nest(-id) %>% 
  mutate(
    X = map(data, ~ .x %>% select(-y) %>% as.matrix()),
    y = map(data, ~ .x %>% pull(y))
  ) %>% 
  ## We calculate QR decomposition for each partition independently
  mutate(
    Q2 = map(X, ~ .x %>% qr() %>% qr.Q()),
    R1 = map(X, ~ .x %>% qr() %>% qr.R())
  )

## Collect

Having some of the heavy lifting in a distributed way (in the case the QR decomposition) we can collect the results 

In [8]:
df_collect <- mats$R1 %>% do.call(what = 'rbind', args = .)

And continue our calculations with a matrix of lower dimensions (number of partitions X number of columns)

In [9]:
data.frame(dimension = c('rows', 'columns'), cbind(X %>% dim(), df_collect %>% dim()))

dimension,X1,X2
rows,50,10
columns,2,2


## Reduce

Final calculations require only the small matrix we've created.


In [10]:
## Number of groups for nesting can be automatically inferred
m2 <-  dim(mats$R1[[1]])[2]

## The map-stage QR-decomposition
Q1 = df_collect %>% qr %>% qr.Q
R2 = df_collect %>% qr %>% qr.R

## For some reason this did not work with a `mutate` command...
mats$Q1 = Q1 %>% 
  as_data_frame() %>% 
  mutate(id = ceiling(row_number() / m2)) %>% 
  nest(-id) %>% 
  mutate(data = map(data, ~ as.matrix(.x))) %>% 
  pull(data)

v_sum = mats %>% 
  mutate(Q3_t = map2(.x = Q2, .y = Q1, .f = ~ t(.x %*% .y))) %>%
  mutate(V = map2(.x = Q3_t, .y = y, .f = ~ .x %*% .y)) %>% 
  pull(V) %>% 
  reduce(`+`)

t(solve(R2) %*% v_sum)

intercept,speed
-17.57909,3.932409


Voila! :)

# A note the number of nodes vs. efficiency 

As the number of nodes available to us increases, we encounter a little paradox - at some point the calculation time will actually start getting longer! This is because we run QR-factorization (QRF) both on the map *and* the reduce stage. 

Think of 2 extreme cases: 

1. $m=1$: We run QRF the entire matrix on a single node and end up with 2X2 matrix for the second QRF (this is `dim()`).

2. $m=50$: The QRF on each node is trivial and returns the original row $X_i$ that was sent to the node, so the second QRF is in-fact again a full scale QRF of the original function (with `df_collect = X`)

But - sending data to the nodes and collecting it has overhead, so in fact our expected run-time with 50 nodes is longer than with 1 node! 

We can demonstrate the trade-off with a simple formula, but first let's make some assumptions:

* From digging around the web (_citation needed_), we know that the QR algorithm has a complexity of $O(n k^2)$, where $n$ is the number of rows and $k$ is the number of columns. 

* This means that run-time for `qr` should be roughly proportional to the number of rows, so the unit we are interested in is the incremental time it takes to process an additional row of the matrix $X$. Let's call this unit of time $t_{row}$, but instead of looking at a specific duration (which really depends on the platform, CPU, etc) we just mark it as a single unit of time (so $t_{row} = 1$ arbitrarily) .

* We also denote the overhead time to set-up a node as $\delta$ units of $t_{row}$. Again, this does not stand for a specific duration but for the ratio between the overhead time and the time increment per-row.

* We neglect the communication time between the mapper and the nodes (and if it's proportional to $n$ it remains fixed anyway)

* And a final simplification is that we only look at values of $m$ that are factors of $n$ ($n/m \in \mathbb{N}$) and ignore the trivial case where $m = n$ (it just makes the math more simple...)

With this in mind we can calculate the total run-time $T$ as:
1. The time it takes to set-up the nodes (proportional to the number of nodes) + 
2. The time it takes to run QRF on each node (which is the number of nodes X number of rows per nodes X time per row divided by number of nodes, as they run in parallel) +
3. The time to run QRF on the reduced matrix (which is the number of rows of the reduced matrix X time per row)

Or as:

$$ \frac{T}{t_{row}} = m \delta + m \frac{n}{m} \frac{1}{m} + m k = m(\delta + k) + \frac{n}{m} $$ 

We can calculate for our example ($k = 2, n = 50, t_{row} = 1$) and see how sensitive is the optimal choice to the overhead time ratio $\delta$ 

In [11]:
data.frame(m = c(1,2,5,10,25)) %>% 
  mutate(
    time_d1 = m * (1 + 2 ) + 50 / m, 
    time_d2 = m * (2 + 2 ) + 50 / m,
    time_d5 = m * (5 + 2 ) + 50 / m,
    time_d20 = m * (10 + 2 ) + 50 / m,
    time_d100 = m * (100 + 2 ) + 50 / m
  )

m,time_d1,time_d2,time_d5,time_d20,time_d100
1,53,54,57,62,152
2,31,33,39,49,229
5,25,30,45,70,520
10,35,45,75,125,1025
25,77,102,177,302,2552


And as expected - as the overhead time increases the time saved by parallelization decreases (or even disappears...)

# Simulation

In [12]:
require(future)
require(future.apply)

Loading required package: future
Loading required package: future.apply

Attaching package: ‘future.apply’

The following object is masked from ‘package:future’:

    future_lapply



In order to really test the value of parallelization we make 2 changes:
1. Inflate `X` to a 10M size matrix
2. Allocate all the data manipulation activities to the map function

In [13]:
sample_rows <- sample(x = 1:nrow(X), size = 10000000L, replace = TRUE)
X2 <- X[sample_rows,]
y2 <- y[sample_rows]

In [14]:
getQR <- function(X) {X %>% select(-y) %>% as.matrix() %>% qr()}

In [15]:
sim1 <- function(m) {
    tic <- Sys.time()

    plan(multiprocess(workers = {m}))
    
    ## Map
    mats <- X2 %>%
      as_data_frame() %>%
      mutate(
        id = row_number() %% m,
        y = y2
      ) %>% 
      ## this is where partitioning happens
      nest(-id) %>% 
      mutate(
        QR = future_lapply(data, getQR),
        y = map(data, ~ .x %>% pull(y))
      ) %>% 
      ## We calculate QR decomposition for each partition independently
      mutate(
        Q2 = map(QR, qr.Q),
        R1 = map(QR, qr.R)
      )

    ## Collect
    df_collect <- mats$R1 %>% do.call(what = 'rbind', args = .)
    
    ## Number of groups for nesting can be automatically inferred
    m2 <-  dim(mats$R1[[1]])[2]
    
    ## The map-stage QR-decomposition
    Q1 = df_collect %>% qr %>% qr.Q
    R2 = df_collect %>% qr %>% qr.R

    ## For some reason this did not work with a `mutate` command...
    mats$Q1 = Q1 %>% 
      as_data_frame() %>% 
      mutate(id = ceiling(row_number() / m2)) %>% 
      nest(-id) %>% 
      mutate(data = map(data, ~ as.matrix(.x))) %>% 
      pull(data)

    v_sum = mats %>% 
      mutate(Q3_t = map2(.x = Q2, .y = Q1, .f = ~ t(.x %*% .y))) %>%
      mutate(V = map2(.x = Q3_t, .y = y, .f = ~ .x %*% .y)) %>% 
      pull(V) %>% 
      reduce(`+`)

    return(list(values = t(solve(R2) %*% v_sum), time = Sys.time() - tic))
}

Running the simulation can also be parallelized, but to avoid conflicts with the `plan` function we run sequentially:

In [19]:
sim_results <- data_frame(m = c(1,2,5,10,25,50)) %>% 
    mutate(
        y = map(m, sim1), 
        time = map_dbl(y, ~.x$time),
        intercept = map_dbl(y, ~.x$values[1]),
        beta = map_dbl(y, ~.x$values[2])
    ) %>% 
    select(-y)

In [None]:
sim_results %>% 
    mutate(m = as.factor(m)) %>% 
    ggplot(aes(x = m, y = time)) + geom_point() 