Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

implement future.apply in forecast as it is in model #268

Open
davidtedfordholt opened this issue Sep 23, 2020 · 30 comments
Open

implement future.apply in forecast as it is in model #268

davidtedfordholt opened this issue Sep 23, 2020 · 30 comments

Comments

@davidtedfordholt
Copy link
Contributor

Using fable at any kind of scale gets painful if computation of forecasts isn't also parallelized. I've got a possible implementation I'll PR tomorrow.

@mitchelloharawild
Copy link
Member

A PR for this would be great, thanks!

@mitchelloharawild
Copy link
Member

My suggestion for this would be to drop the use of mutate_at() and replace it with a internal util function which iterates over the mable with parallel support. There is a common pattern throughout the package where functions applied to a mable need to operate over each model, and a more general (and parallelisable) helper function for this would be great (for example, generate() should also be possible in parallel).

@davidtedfordholt
Copy link
Contributor Author

I've got to pull a couple of errant commits out, but I'll send it along momentarily. I reversed the order of the pivot_longer and changed the mutate_at to mapply to mirror the future.apply implementation.

@davidtedfordholt
Copy link
Contributor Author

davidtedfordholt commented Sep 23, 2020

I'm happy to make a util function that does the iteration piece. It would essentially start with something that essentially at the point of having been pivot_longer'ed and iterates across with either mapply or future_mapply, given that they take the same arguments (other than the future-specific added ones). Take everything that's passed in additionally and place it in MoreArgs, then select the method based on the availability of future.

@davidtedfordholt
Copy link
Contributor Author

This is an 80-core machine trying to churn through forecasts for the models it fit between 5:45pm and 6pm. It's still at it, 4 hours later. This is a pretty strong motivator, as I'll be explaining the bill later. :)

image

@mitchelloharawild
Copy link
Member

I think it could be safer / more efficient if the internal util would also handle the pivot_longer bit to avoid possible variable name conflicts that exist from the pivoting process.

@davidtedfordholt
Copy link
Contributor Author

Makes sense, and shouldn't be any tougher. It won't pivot_wider to return to the previous structure, because forecast() specifically needs for it not to do so. I can't think of an instance where it would need to, but I can add in a param to return it to the original structure, if you'd like.

@mitchelloharawild
Copy link
Member

An example of returning to the previous structure is when the operation returns a <mable>.

For example, refit(<mable>, <tsibble>) returns a <mable> of the same structure. The same applies for stream(<mable>, <tsibble>)

Adding/extending the apply-util for this can be done later.

@davidtedfordholt
Copy link
Contributor Author

Makes sense. Should be straightforward enough.

@davidtedfordholt
Copy link
Contributor Author

To make modeling and forecasting more memory efficient for tsibbles with many keys, and this should probably go for any fabletools process, it might make sense to shrink the environment of each worker in the future_mapply calls. This means we'll need to figure out what packages like fable.prophet might be attached that will need to be available. Obviously, if we can generally rely on the idea that the namespace will begin with fable. then it's pretty easy. I don't know if that's something you've thought about and feel strongly about, but it seems reasonable enough.

Otherwise, we might need to crawl across the functions within the call to find all of their namespaces and add them that way, which seems a bit more fraught with peril.

@snowman-87
Copy link

Just following along as I use fable for millions of time series. If this is in the newest version of the Github release I'd be happy to help test, otherwise just saying I appreciate the work here!

@mitchelloharawild
Copy link
Member

On further thought and a bug report, converting to longer before parallelising isn't appropriate because the column structure defines the way in which the forecasts are handled (such as reconciliation). I'll have a go at adding the conditionally parallel mapply function.

mitchelloharawild added a commit that referenced this issue Sep 28, 2020
@davidtedfordholt
Copy link
Contributor Author

Look at what's in my branch right now. There was an issue with how new_data was working, but I have a version that I have been testing all afternoon and is working for it. I didn't understand how key_data was being used, and had messed that portion up, in terms of how it got pushed to forecast along with the new_data.

@davidtedfordholt
Copy link
Contributor Author

Using what's currently in the parallel_forecasting branch, here's an example of parallel modeling and forecasting. I'm sorry it's so long, but I needed something with regressors, and I wanted to make sure I included fable.prophet in there to make sure the capacity and floor for logistic growth works properly.

devtools::install_github("davidtedfordholt/fabletools", ref = "parallel_forecasting")

library(tsibbledata)
library(tsibble)
library(dplyr)
library(tidyr)
library(fable)
library(fabletools)
library(fable.prophet)
library(future)
library(future.apply)
options(future.fork.enable = TRUE, future.rng.onMisuse = "ignore")

data <- gafa_stock %>%
    select(Volume, Open) %>%
    fill_gaps(Volume = 0, .full = TRUE) %>%
    group_by_key() %>%
    fill(Open) %>%
    ungroup()

train <- data %>%
    filter(Date < as.Date("2018-01-01")) %>%
    mutate(capacity = max(Open) * 4,
           floor = min(Open) * .25) %>%
    ungroup()

test <- train %>%
    new_data(n = 365) %>%
    left_join(select(data, -Open), by = c("Date", "Symbol")) %>%
    left_join(train %>%
                  as_tibble() %>%
                  group_by(Symbol) %>%
                  summarise(capacity = last(capacity),
                            floor = last(floor)),
              by = "Symbol")

# SEQUENTIAL --------------------------

plan(sequential)
start_sequential_modeling <- Sys.time()
models <-
    train %>%
    model(
        arima = ARIMA(Open),
        tslm = TSLM(Open ~ Volume + season()),
        naive = NAIVE(Open),
        prophet = prophet(
            Open ~ 
                Volume + 
                growth(type = "logistic", capacity = capacity, floor = floor)))
end_sequential_modeling <- Sys.time()
print(paste("Sequential modeling took", format(end_sequential_modeling - start_sequential_modeling)))
forecasts <- forecast(models, new_data = test)
print(paste("Sequential forecasting took", format(Sys.time() - end_sequential_modeling)))

# MULTICORE --------------------------

plan(multicore)
start_multicore_modeling <- Sys.time()
models_multicore <-
    train %>%
    model(
        arima = ARIMA(Open),
        tslm = TSLM(Open ~ Volume + season()),
        naive = NAIVE(Open),
        prophet = prophet(
            Open ~ 
                Volume + 
                growth(type = "logistic", capacity = capacity, floor = floor)))
end_multicore_modeling <- Sys.time()
print(paste("Multicore modeling took", format(end_multicore_modeling - start_multicore_modeling)))
forecasts_multicore <- forecast(models_multicore, new_data = test)
print(paste("Multicore forecasting took", format(Sys.time() - end_multicore_modeling)))

# MULTISESSION --------------------------

plan(multisession)
start_multisession_modeling <- Sys.time()
models_multisession <-
    train %>%
    model(
        arima = ARIMA(Open),
        tslm = TSLM(Open ~ Volume + season()),
        naive = NAIVE(Open),
        prophet = prophet(
            Open ~ 
                Volume + 
                growth(type = "logistic", capacity = capacity, floor = floor)))
end_multisession_modeling <- Sys.time()
print(paste("Multisession modeling took", format(end_multisession_modeling - start_multisession_modeling)))
forecasts_multisession <- forecast(models_multisession, new_data = test)
print(paste("Multisession forecasting took", format(Sys.time() - end_multisession_modeling)))

@davidtedfordholt
Copy link
Contributor Author

Output for me was:

[1] "Sequential modeling took 6.105713 secs"
[1] "Sequential forecasting took 3.273358 secs"
[1] "Multicore modeling took 2.228881 secs"
[1] "Multicore forecasting took 1.384231 secs"
[1] "Multisession modeling took 14.54192 secs"
[1] "Multisession forecasting took 1.457515 secs"

Obviously, the multisession overhead for modeling is a bit painful, but that should drop on a larger dataset. I just wanted to make sure it works even when the workers aren't sharing the current session's environment.

@mitchelloharawild
Copy link
Member

Thanks, looks promising. Do you know if the data transfer overhead for multicore is lower than multisession?
I think substantial gains will be had by reducing object sizes here.

I get similar speed-up:

[1] "Sequential modeling took 8.717845 secs"
[1] "Sequential forecasting took 3.696366 secs"
[1] "Multicore modeling took 3.74861 secs"
[1] "Multicore forecasting took 2.048967 secs"
[1] "Multisession modeling took 11.08729 secs"
[1] "Multisession forecasting took 2.403781 secs"

@davidtedfordholt
Copy link
Contributor Author

So the underlying issue with the overhead is that multisession actually creates another R session, copies the environment into it, then launches the process. multicore works within the current session. multicore is a bit unstable inside RStudio (hence me setting the option to allow forked processes), but works far better when you're running it in a session from the command line.

@davidtedfordholt
Copy link
Contributor Author

You can reduce the overhead for multisession by scoping what all has to be included, to include packages that have to be loaded. That's the slowest thing, most likely.

@davidtedfordholt
Copy link
Contributor Author

Don't hate me for being super hacky, but throw this in after you build data.

data <- data %>%
    bind_rows(mutate(data, Symbol = paste(Symbol, "_1"))) %>%
    bind_rows(mutate(data, Symbol = paste(Symbol, "_2"))) %>%
    bind_rows(mutate(data, Symbol = paste(Symbol, "_3"))) %>%
    bind_rows(mutate(data, Symbol = paste(Symbol, "_4"))) %>%
    bind_rows(mutate(data, Symbol = paste(Symbol, "_5"))) %>%
    bind_rows(mutate(data, Symbol = paste(Symbol, "_6"))) %>%
    bind_rows(mutate(data, Symbol = paste(Symbol, "_7")))

My results were:

[1] "Sequential modeling took 46.61608 secs"
[1] "Sequential forecasting took 21.95314 secs"
[1] "Multicore modeling took 14.79503 secs"
[1] "Multicore forecasting took 26.5846 secs"
[1] "Multisession modeling took 1.087287 mins"
[1] "Multisession forecasting took 7.000597 secs"

It's an interesting question about the overhead. I need to run it on something much larger.

@davidtedfordholt
Copy link
Contributor Author

davidtedfordholt commented Sep 29, 2020

I extended the data out to 60 keys. Also, I killed off Chrome and a bunch of other things to free up more cores.

[1] "Sequential modeling took 1.396766 mins"
[1] "Sequential forecasting took 37.79279 secs"
[1] "Multicore modeling took 26.57042 secs"
[1] "Multicore forecasting took 14.76845 secs"
[1] "Multisession modeling took 38.57623 secs"
[1] "Multisession forecasting took 11.76835 secs"

@davidtedfordholt
Copy link
Contributor Author

I added a second key to them and it still worked just fine. Tomorrow morning, I'm going to test it on the process for work, which takes about 2 hours sequentially, but the machine has 80 cores that are used during modeling but not during forecasting. Yet. It's a bear.

@mitchelloharawild
Copy link
Member

Sounds great. I think there are some low hanging fruit for performance improvements which I'll try to get today.

@davidtedfordholt
Copy link
Contributor Author

davidtedfordholt commented Sep 29, 2020

Here's the base of an mapply util function. I'm interested in extending it to use progressr, if available, to give a progress bar.

versatile_mapply <-function(...){
  if(is_attached("package:future")){
    require_package("future.apply")
    future_mapply(..., future.globals = FALSE)
  }else{
    mapply(...)
  }
}

Anything written as an mapply should be able to have this swapped in.

@mitchelloharawild
Copy link
Member

I agree that adding progressr reporting here is appropriate.

This is what I'm working with at the moment.

mapply_maybe_parallel <- function (.f, ..., MoreArgs = list(), SIMPLIFY = FALSE) {
  if(is_attached("package:future")){
    require_package("future.apply")
    
    future.apply::future_mapply(
      FUN = .f,
      ...,
      MoreArgs = MoreArgs,
      SIMPLIFY = SIMPLIFY,
      future.globals = FALSE
    )
  }
  else{
    mapply(
      FUN = .f,
      ...,
      MoreArgs = MoreArgs,
      SIMPLIFY = SIMPLIFY
    )
  }
}

mable_apply <- function (.data, .f, ..., names_to = ".model") {
  mv <- mable_vars(.data)
  kv <- key_vars(.data)
  
  # Compute .f for all models in mable
  result <- lapply(
    as_tibble(.data)[mv],
    mapply_maybe_parallel,
    .f = .f,
    .data, 
    MoreArgs = dots_list(...)
  )
  num_rows <- lapply(result, vapply, nrow, integer(1L))
  # Assume same tsibble structure for all outputs
  first_result <- result[[1]][[1]]
  result <- vec_rbind(!!!lapply(result, function(x) vec_rbind(!!!x)))
  
  # Compute .model label
  model <- rep.int(mv, vapply(num_rows, sum, integer(1L)))
  
  # Repeat key structure as needed
  .data <- .data[rep.int(seq_along(num_rows[[1]]), num_rows[[1]]), kv]
  
  # Combine into single
  .data <- bind_cols(.data, !!names_to := model, result)
  
  if (is_tsibble(first_result)) {
    .data <- build_tsibble(
      .data, key = c(kv, names_to, key_vars(first_result)), 
      index = index_var(first_result), index2 = !!index2(first_result), 
      ordered = is_ordered(first_result),
      interval = interval(first_result))
  }
  
  return(.data)
}

@davidtedfordholt
Copy link
Contributor Author

davidtedfordholt commented Sep 29, 2020

I like it. Here's something for the progress bar (because sleep is silly). Obviously, you're paying a lot more attention to the additional arguments getting passed through, which is great.

possibly_fast_mapply <- function(FUN, ...) {
  if(is_attached("package:future")) {
    require_package("future.apply")
    future.apply::future_mapply(
      FUN = FUN, ...,
      SIMPLIFY = FALSE,
      future.globals = FALSE)
  } else {
    mapply(
      FUN = FUN, ...,
      SIMPLIFY = FALSE)
  }
}

versatile_mapply <- function(FUN, ...) {
  if(is_attached("package:progressr")) {
    require_package("progressr")

    # check to find the longest iterant, to define the progress bar
    dots <- dots_list(...)
    iterants <- dots[which(!names(dots) %in% c("MoreArgs", "SIMPLIFY", "USE.NAMES"))]
    len <- 1:max(unlist(lapply(iterants, length)), na.rm = TRUE)
    
    with_progress({
      p <- progressor(along = len)
      out <- possibly_fast_mapply(
        FUN = function(...){
          p()
          FUN(...)
        }, ...)
    })
  } else {
    out <- possibly_fast_mapply(...)
  }
  out
}

I was using it out on nothing_in_particular <- versatile_mapply(FUN = sum, x = 1:10000, y = 2:10001). I really like progress bars. Steal and adapt, if you like.

@davidtedfordholt
Copy link
Contributor Author

I definitely appreciate your refusal to play their FUN games, though. .f is so much more pleasant.

@davidtedfordholt
Copy link
Contributor Author

As a note, the issue in mitchelloharawild/fable.prophet#17 continues. It doesn't happen with multicore, since the environment is shared, but does with multisession. This means that running a fable.prophet model in RStudio (without that option change) will likely fail to fit.

One way to solve this would be to attached the holidays to the tsibble being modeled. Another would be to codify the way holidays will work in general, rather than specifically for fable.prophet, and build in a check for them in the environment, ensuring that they are passed through to the workers. A third is, as you suggest in the other issue, carefully parsing through the formula for objects that might need to be added to the globals argument in the future_mapply() call.

It's an edge case and can be handled through a warning, but it seems like TSLM and fasster, as well as causal and econometric models, might benefit from a standardized holiday framework that is consistent across models.

@mitchelloharawild
Copy link
Member

Long term I hope for a better holiday framework, likely powered by {almanac} and representing holidays as part of the data object.
However I do expect the need for non-data variables in model specifications, and so detecting and appropriately passing them through to the worker nodes is essential.

@davidtedfordholt
Copy link
Contributor Author

I assume that integrating almanac would look like a function that takes in a recurrence bundle and creates columns on the tsibble corresponding to the bundle, perhaps dragging things forward and backward in time?

Perhaps, when parsing specials, object specifications that aren't variables in the tsibble need to be stored in a common place. That way they can be passed as a named list to 'globals' to the future_mapply.

@davidtedfordholt
Copy link
Contributor Author

I don't know if you're using it, @mitchelloharawild , but options(future.globals.onReference = "warning") is helpful.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants