# 并行化

In [6]:
library(tidyverse)
library(mlr3verse)

-- [1mAttaching core tidyverse packages[22m ------------------------ tidyverse 2.0.0 --
[32mv[39m [34mdplyr    [39m 1.1.4     [32mv[39m [34mreadr    [39m 2.1.5
[32mv[39m [34mforcats  [39m 1.0.0     [32mv[39m [34mstringr  [39m 1.5.1
[32mv[39m [34mggplot2  [39m 3.5.2     [32mv[39m [34mtibble   [39m 3.2.1
[32mv[39m [34mlubridate[39m 1.9.4     [32mv[39m [34mtidyr    [39m 1.3.1
[32mv[39m [34mpurrr    [39m 1.0.4     
-- [1mConflicts[22m ------------------------------------------ tidyverse_conflicts() --
[31mx[39m [34mdplyr[39m::[32mfilter()[39m masks [34mstats[39m::filter()
[31mx[39m [34mdplyr[39m::[32mlag()[39m    masks [34mstats[39m::lag()
[36mi[39m Use the conflicted package ([3m[34m<http://conflicted.r-lib.org/>[39m[23m) to force all conflicts to become errors
Loading required package: mlr3



In [None]:
# set up a socket cluster with 4 workers on the local machine
library(parallel)
cores <- 4
cl <- makeCluster(cores)

# vector to operate on
x <- 1:10000

# fast function to parallelize
f <- function(y) sqrt(y + 1)

# unchunked approach: 1000 jobs
system.time({parSapply(cl, x, f, chunk.size = 1)})

# user  system elapsed 
#   0.327   0.084   0.532

   user  system elapsed 
  0.327   0.084   0.532 

In [None]:
system.time({parSapply(cl, x, f, chunk.size = 2500)})

# user  system elapsed 
#   0.003   0.001   0.033

   user  system elapsed 
  0.003   0.001   0.033 

## 学习器的并行化

In [None]:
lrn_ranger <- lrn("classif.ranger")

# 显示所有带有 "threads" 标签的超参数
lrn_ranger$param_set$ids(tags = "threads")

# [1] "num.threads"

In [None]:
# The number of threads is initialized to 1
lrn_ranger$param_set$values$num.threads

# [1] 1

In [None]:
# use four CPUs
set_threads(lrn_ranger, n = 4)

# <LearnerClassifRanger:classif.ranger>: Random Forest
# * Model: -
# * Parameters: num.threads=4
# * Packages: mlr3, mlr3learners, ranger
# * Predict Types:  [response], prob
# * Feature Types: logical, integer, numeric, character, factor, ordered
# * Properties: hotstart_backward, importance, missings, multiclass,
#   oob_error, selected_features, twoclass, weights

<LearnerClassifRanger:classif.ranger>: Random Forest
* Model: -
* Parameters: num.threads=4
* Packages: mlr3, mlr3learners, ranger
* Predict Types:  [response], prob
* Feature Types: logical, integer, numeric, character, factor, ordered
* Properties: hotstart_backward, importance, missings, multiclass,
  oob_error, selected_features, twoclass, weights

In [None]:
# auto-detect cores on the local machine
set_threads(lrn_ranger)

# <LearnerClassifRanger:classif.ranger>: Random Forest
# * Model: -
# * Parameters: num.threads=8
# * Packages: mlr3, mlr3learners, ranger
# * Predict Types:  [response], prob
# * Feature Types: logical, integer, numeric, character, factor, ordered
# * Properties: hotstart_backward, importance, missings, multiclass,
#   oob_error, selected_features, twoclass, weights

<LearnerClassifRanger:classif.ranger>: Random Forest
* Model: -
* Parameters: num.threads=8
* Packages: mlr3, mlr3learners, ranger
* Predict Types:  [response], prob
* Feature Types: logical, integer, numeric, character, factor, ordered
* Properties: hotstart_backward, importance, missings, multiclass,
  oob_error, selected_features, twoclass, weights

下面做一个有趣的试验，我们探索一下并行化能否节省时间：

In [19]:
tsk_mtcars <- tsk("mtcars")

In [21]:
mtcars %>% glimpse()

Rows: 32
Columns: 11
$ mpg  [3m[90m<dbl>[39m[23m 21.0, 21.0, 22.8, 21.4, 18.7, 18.1, 14.3, 24.4, 22.8, 19.2, 17.8,~
$ cyl  [3m[90m<dbl>[39m[23m 6, 6, 4, 6, 8, 6, 8, 4, 4, 6, 6, 8, 8, 8, 8, 8, 8, 4, 4, 4, 4, 8,~
$ disp [3m[90m<dbl>[39m[23m 160.0, 160.0, 108.0, 258.0, 360.0, 225.0, 360.0, 146.7, 140.8, 16~
$ hp   [3m[90m<dbl>[39m[23m 110, 110, 93, 110, 175, 105, 245, 62, 95, 123, 123, 180, 180, 180~
$ drat [3m[90m<dbl>[39m[23m 3.90, 3.90, 3.85, 3.08, 3.15, 2.76, 3.21, 3.69, 3.92, 3.92, 3.92,~
$ wt   [3m[90m<dbl>[39m[23m 2.620, 2.875, 2.320, 3.215, 3.440, 3.460, 3.570, 3.190, 3.150, 3.~
$ qsec [3m[90m<dbl>[39m[23m 16.46, 17.02, 18.61, 19.44, 17.02, 20.22, 15.84, 20.00, 22.90, 18~
$ vs   [3m[90m<dbl>[39m[23m 0, 0, 1, 1, 0, 1, 0, 1, 1, 1, 1, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 0,~
$ am   [3m[90m<dbl>[39m[23m 1, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 0, 0,~
$ gear [3m[90m<dbl>[39m[23m 4, 4, 4, 3, 3, 3, 3, 4, 4, 4, 4, 3, 3, 3, 3, 3, 3, 4, 4,

In [None]:
lrn_ranger <- lrn("regr.ranger")

lrn_ranger_c4 <- lrn("regr.ranger", num.threads = 4)

# 比较两个模型在mtcars数据集上的训练时间
system.time({
  lrn_ranger$train(tsk_mtcars)
})
system.time({
  lrn_ranger_c4$train(tsk_mtcars)
})

# user  system elapsed 
#   0.272   0.015   1.410
# user  system elapsed 
#   0.011   0.004   0.008 

   user  system elapsed 
  0.272   0.015   1.410 

   user  system elapsed 
  0.011   0.004   0.008 

In [None]:
# 基于benchmark比较训练时间
tsk_mtcars <- tsk("mtcars")
lrn_ranger <- lrn("regr.ranger")
lrn_ranger$id <- "ranger"
lrn_ranger_c4 <- lrn("regr.ranger", num.threads = 4)
lrn_ranger_c4$id <- "ranger_c4"
bmr <- benchmark(benchmark_grid(
  tasks = tsk_mtcars,
  learners = list(lrn_ranger, lrn_ranger_c4),
  resamplings = rsmp("cv", folds = 3)
))
bmr$aggregate(msrs(c("time_train", "regr.mse")))

# nr resample_result            task_id learner_id resampling_id iters
# 1 1  <environment: 0x15019e9e0> mtcars  ranger     cv            3    
# 2 2  <environment: 0x1501cbe28> mtcars  ranger_c4  cv            3    
#   time_train  regr.mse
# 1 0.025000000 7.968369
# 2 0.007333333 8.010708

INFO  [09:02:30.945] [mlr3] Running benchmark with 6 resampling iterations
INFO  [09:02:30.988] [mlr3] Applying learner 'ranger' on task 'mtcars' (iter 1/3)
INFO  [09:02:31.037] [mlr3] Applying learner 'ranger' on task 'mtcars' (iter 2/3)
INFO  [09:02:31.109] [mlr3] Applying learner 'ranger' on task 'mtcars' (iter 3/3)
INFO  [09:02:31.149] [mlr3] Applying learner 'ranger_c4' on task 'mtcars' (iter 1/3)
INFO  [09:02:31.182] [mlr3] Applying learner 'ranger_c4' on task 'mtcars' (iter 2/3)
INFO  [09:02:31.219] [mlr3] Applying learner 'ranger_c4' on task 'mtcars' (iter 3/3)
INFO  [09:02:31.246] [mlr3] Finished benchmark


nr,resample_result,task_id,learner_id,resampling_id,iters,time_train,regr.mse
<int>,<list>,<chr>,<chr>,<chr>,<int>,<dbl>,<dbl>
1,<environment: 0x15019e9e0>,mtcars,ranger,cv,3,0.025,7.968369
2,<environment: 0x1501cbe28>,mtcars,ranger_c4,cv,3,0.007333333,8.010708


In [42]:
library(future)

plan(multisession, workers = 4)

tsk_sonar <- tsk("sonar")
lrn_raprt <- lrn("classif.ranger")
rsmp_cv3 <- rsmp("cv", folds = 3)
system.time({
  resample(
    task = tsk_sonar,
    learner = lrn_raprt,
    resampling = rsmp_cv3,
  )
})

INFO  [09:08:06.016] [mlr3] Applying learner 'classif.ranger' on task 'sonar' (iter 1/3)
INFO  [09:08:06.160] [mlr3] Applying learner 'classif.ranger' on task 'sonar' (iter 2/3)
INFO  [09:08:06.304] [mlr3] Applying learner 'classif.ranger' on task 'sonar' (iter 3/3)


   user  system elapsed 
  0.070   0.003   0.916 

In [44]:
plan(sequential)

tsk_sonar <- tsk("sonar")
lrn_raprt <- lrn("classif.ranger")
rsmp_cv3 <- rsmp("cv", folds = 3)

system.time({
  resample(
    task = tsk_sonar,
    learner = lrn_raprt,
    resampling = rsmp_cv3
  )
})

INFO  [09:11:41.600] [mlr3] Applying learner 'classif.ranger' on task 'sonar' (iter 1/3)
INFO  [09:11:41.654] [mlr3] Applying learner 'classif.ranger' on task 'sonar' (iter 2/3)
INFO  [09:11:41.703] [mlr3] Applying learner 'classif.ranger' on task 'sonar' (iter 3/3)


   user  system elapsed 
  0.155   0.012   0.161 

## 重采样与基准测试的并行化

In [None]:
library(future)

# Select the multisession backend
plan(multisession)

# Run the experiment
tsk_sonar <- tsk("sonar")
lrn_rpart <- lrn("classif.rpart")
rsmp_cv3 <- rsmp("cv", folds = 3)
system.time({
  resample(tsk_sonar, lrn_rpart, rsmp_cv3)
})

# user  system elapsed 
#   0.111   0.003   0.470

INFO  [08:07:23.568] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 1/3)
INFO  [08:07:23.736] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 2/3)
INFO  [08:07:23.867] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 3/3)


   user  system elapsed 
  0.111   0.003   0.470 

In [12]:
# simple benchmark design
design <- benchmark_grid(
  tsks(c("sonar", "penguins")),
  lrns(c("classif.featureless", "classif.rpart")),
  rsmp_cv3
)

# enable parallelization
future::plan("multisession")

# run benchmark in parallel
bmr <- benchmark(design)

INFO  [08:23:20.711] [mlr3] Running benchmark with 12 resampling iterations
INFO  [08:23:20.851] [mlr3] Applying learner 'classif.featureless' on task 'sonar' (iter 1/3)
INFO  [08:23:21.031] [mlr3] Applying learner 'classif.featureless' on task 'sonar' (iter 2/3)
INFO  [08:23:21.166] [mlr3] Applying learner 'classif.featureless' on task 'sonar' (iter 3/3)
INFO  [08:23:21.296] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 1/3)
INFO  [08:23:21.427] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 2/3)
INFO  [08:23:21.562] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 3/3)
INFO  [08:23:21.714] [mlr3] Applying learner 'classif.featureless' on task 'penguins' (iter 1/3)
INFO  [08:23:21.844] [mlr3] Applying learner 'classif.featureless' on task 'penguins' (iter 2/3)
INFO  [08:23:21.873] [mlr3] Applying learner 'classif.featureless' on task 'penguins' (iter 3/3)
INFO  [08:23:21.936] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1

In [45]:
# simple benchmark design
design <- benchmark_grid(
  tsks(c("sonar", "penguins")),
  lrns(c("classif.featureless", "classif.rpart")),
  rsmp_cv3
)

plan("multisession")

system.time({
  bmr <- benchmark(design)
})

INFO  [09:12:38.620] [mlr3] Running benchmark with 12 resampling iterations
INFO  [09:12:38.745] [mlr3] Applying learner 'classif.featureless' on task 'sonar' (iter 1/3)
INFO  [09:12:38.919] [mlr3] Applying learner 'classif.featureless' on task 'sonar' (iter 2/3)
INFO  [09:12:39.050] [mlr3] Applying learner 'classif.featureless' on task 'sonar' (iter 3/3)
INFO  [09:12:39.191] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 1/3)
INFO  [09:12:39.321] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 2/3)
INFO  [09:12:39.450] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 3/3)
INFO  [09:12:39.584] [mlr3] Applying learner 'classif.featureless' on task 'penguins' (iter 1/3)
INFO  [09:12:39.713] [mlr3] Applying learner 'classif.featureless' on task 'penguins' (iter 2/3)
INFO  [09:12:39.743] [mlr3] Applying learner 'classif.featureless' on task 'penguins' (iter 3/3)
INFO  [09:12:39.790] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1

   user  system elapsed 
  0.396   0.010   1.294 

In [47]:
plan("sequential")

system.time({
  bmr <- benchmark(design)
})

INFO  [09:13:08.683] [mlr3] Running benchmark with 12 resampling iterations
INFO  [09:13:08.685] [mlr3] Applying learner 'classif.featureless' on task 'sonar' (iter 1/3)
INFO  [09:13:08.690] [mlr3] Applying learner 'classif.featureless' on task 'sonar' (iter 2/3)
INFO  [09:13:08.695] [mlr3] Applying learner 'classif.featureless' on task 'sonar' (iter 3/3)
INFO  [09:13:08.699] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 1/3)
INFO  [09:13:08.708] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 2/3)
INFO  [09:13:08.717] [mlr3] Applying learner 'classif.rpart' on task 'sonar' (iter 3/3)
INFO  [09:13:08.726] [mlr3] Applying learner 'classif.featureless' on task 'penguins' (iter 1/3)
INFO  [09:13:08.730] [mlr3] Applying learner 'classif.featureless' on task 'penguins' (iter 2/3)
INFO  [09:13:08.734] [mlr3] Applying learner 'classif.featureless' on task 'penguins' (iter 3/3)
INFO  [09:13:08.738] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1

   user  system elapsed 
  0.081   0.002   0.084 

## 调优并行化

In [None]:
future::plan("multisession", workers = 4)

instance <- tune(
  tnr("random_search", batch_size = 12),
  tsk("penguins"),
  lrn("classif.rpart", minsplit = to_tune(2, 128)),
  rsmp("cv", folds = 3),
  term_evals = 20
)

instance$archive$n_evals

# [1] 24

INFO  [08:26:56.547] [bbotk] Starting to optimize 1 parameter(s) with '<OptimizerBatchRandomSearch>' and '<TerminatorEvals> [n_evals=20, k=0]'
INFO  [08:26:56.554] [bbotk] Evaluating 12 configuration(s)
INFO  [08:26:56.558] [mlr3] Running benchmark with 36 resampling iterations
INFO  [08:26:56.680] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/3)
INFO  [08:26:56.837] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 2/3)
INFO  [08:26:56.950] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 3/3)
INFO  [08:26:57.061] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/3)
INFO  [08:26:57.076] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 2/3)
INFO  [08:26:57.105] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 3/3)
INFO  [08:26:57.142] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/3)
INFO  [08:26:57.176] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (ite

In [49]:
plan("multisession", workers = 4)

system.time({
  instance <- tune(
    tnr("random_search", batch_size = 12),
    tsk("penguins"),
    lrn("classif.rpart", minsplit = to_tune(2, 128)),
    rsmp("cv", folds = 3),
    term_evals = 20
  )
})

INFO  [09:15:45.430] [bbotk] Starting to optimize 1 parameter(s) with '<OptimizerBatchRandomSearch>' and '<TerminatorEvals> [n_evals=20, k=0]'
INFO  [09:15:45.435] [bbotk] Evaluating 12 configuration(s)
INFO  [09:15:45.438] [mlr3] Running benchmark with 36 resampling iterations
INFO  [09:15:45.550] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/3)
INFO  [09:15:45.720] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 2/3)
INFO  [09:15:45.837] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 3/3)
INFO  [09:15:45.954] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/3)
INFO  [09:15:45.970] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 2/3)
INFO  [09:15:45.999] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 3/3)
INFO  [09:15:46.028] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/3)
INFO  [09:15:46.060] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (ite

   user  system elapsed 
  1.389   0.041   2.670 

In [50]:
plan("sequential")

system.time({
  instance <- tune(
    tnr("random_search", batch_size = 12),
    tsk("penguins"),
    lrn("classif.rpart", minsplit = to_tune(2, 128)),
    rsmp("cv", folds = 3),
    term_evals = 20
  )
})

INFO  [09:16:10.799] [bbotk] Starting to optimize 1 parameter(s) with '<OptimizerBatchRandomSearch>' and '<TerminatorEvals> [n_evals=20, k=0]'
INFO  [09:16:10.805] [bbotk] Evaluating 12 configuration(s)
INFO  [09:16:10.808] [mlr3] Running benchmark with 36 resampling iterations
INFO  [09:16:10.811] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/3)
INFO  [09:16:10.818] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 2/3)
INFO  [09:16:10.824] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 3/3)
INFO  [09:16:10.831] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/3)
INFO  [09:16:10.837] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 2/3)
INFO  [09:16:10.844] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 3/3)
INFO  [09:16:10.850] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/3)
INFO  [09:16:10.857] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (ite

   user  system elapsed 
  0.656   0.018   0.674 

## 嵌套重采样并行化

In [None]:
library(mlr3tuning)
# reset to default sequential plan
future::plan("sequential")

lrn_rpart <- lrn(
  "classif.rpart",
  minsplit = to_tune(2, 128)
)

lrn_rpart_tuned <- auto_tuner(
  tnr("random_search", batch_size = 2),
  lrn_rpart,
  rsmp("holdout"),
  msr("classif.ce"),
  2
)

rr <- resample(
  tsk("penguins"),
  lrn_rpart_tuned,
  rsmp("cv", folds = 5)
)

Loading required package: paradox



INFO  [08:30:09.104] [mlr3] Applying learner 'classif.rpart.tuned' on task 'penguins' (iter 1/5)
INFO  [08:30:09.161] [bbotk] Starting to optimize 1 parameter(s) with '<OptimizerBatchRandomSearch>' and '<TerminatorEvals> [n_evals=2, k=0]'
INFO  [08:30:09.171] [bbotk] Evaluating 2 configuration(s)
INFO  [08:30:09.173] [mlr3] Running benchmark with 2 resampling iterations
INFO  [08:30:09.176] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/1)
INFO  [08:30:09.188] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/1)
INFO  [08:30:09.193] [mlr3] Finished benchmark
INFO  [08:30:09.209] [bbotk] Result of batch 1:
INFO  [08:30:09.211] [bbotk]        47 0.08695652        0      0            0.006
INFO  [08:30:09.211] [bbotk]       108 0.09782609        0      0            0.002
INFO  [08:30:09.211] [bbotk]                                 uhash
INFO  [08:30:09.211] [bbotk]  71f634ff-51ec-4713-828b-d17cc8bab171
INFO  [08:30:09.211] [bbotk]  b3da1728-8eef-4cf1-8

In [15]:
# Parallelize outer loop
future::plan(list("multisession", "sequential"))

# Alternative: skip specification of 2nd level, since future
# sets all levels after the first to "sequential" by default
future::plan("multisession")

In [16]:
# Parallelize inner loop
future::plan(list("sequential", "multisession"))

In [17]:
# Runs both loops in parallel
future::plan(list(
  tweak("multisession", workers = 2),
  tweak("multisession", workers = 2)
))

In [53]:
library(mlr3tuning)
# reset to default sequential plan
future::plan("sequential")

lrn_rpart <- lrn(
  "classif.rpart",
  minsplit = to_tune(2, 128)
)

lrn_rpart_tuned <- auto_tuner(
  tnr("random_search", batch_size = 2),
  lrn_rpart,
  rsmp("holdout"),
  msr("classif.ce"),
  2
)

system.time({
  rr <- resample(
    tsk("penguins"),
    lrn_rpart_tuned,
    rsmp("cv", folds = 5)
  )
})

INFO  [09:21:17.008] [mlr3] Applying learner 'classif.rpart.tuned' on task 'penguins' (iter 1/5)
INFO  [09:21:17.050] [bbotk] Starting to optimize 1 parameter(s) with '<OptimizerBatchRandomSearch>' and '<TerminatorEvals> [n_evals=2, k=0]'
INFO  [09:21:17.055] [bbotk] Evaluating 2 configuration(s)
INFO  [09:21:17.058] [mlr3] Running benchmark with 2 resampling iterations
INFO  [09:21:17.061] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/1)
INFO  [09:21:17.068] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/1)
INFO  [09:21:17.073] [mlr3] Finished benchmark
INFO  [09:21:17.089] [bbotk] Result of batch 1:
INFO  [09:21:17.091] [bbotk]        36 0.07608696        0      0            0.003
INFO  [09:21:17.091] [bbotk]        90 0.07608696        0      0            0.003
INFO  [09:21:17.091] [bbotk]                                 uhash
INFO  [09:21:17.091] [bbotk]  0411e574-6ce8-4eaf-b6f9-d0a89228fc3e
INFO  [09:21:17.091] [bbotk]  3ee6e379-1d79-4fc3-9

   user  system elapsed 
  0.473   0.012   0.487 

In [54]:
plan("multisession", workers = 4)

system.time({
  rr <- resample(
    tsk("penguins"),
    lrn_rpart_tuned,
    rsmp("cv", folds = 5)
  )
})

INFO  [09:21:32.416] [mlr3] Applying learner 'classif.rpart.tuned' on task 'penguins' (iter 1/5)
INFO  [09:21:32.536] [bbotk] Starting to optimize 1 parameter(s) with '<OptimizerBatchRandomSearch>' and '<TerminatorEvals> [n_evals=2, k=0]'
INFO  [09:21:32.560] [bbotk] Evaluating 2 configuration(s)
INFO  [09:21:32.564] [mlr3] Running benchmark with 2 resampling iterations
INFO  [09:21:32.604] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/1)
INFO  [09:21:32.617] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/1)
INFO  [09:21:32.623] [mlr3] Finished benchmark
INFO  [09:21:32.646] [bbotk] Result of batch 1:
INFO  [09:21:32.648] [bbotk]        35 0.04347826        0      0            0.008
INFO  [09:21:32.648] [bbotk]        69 0.04347826        0      0            0.003
INFO  [09:21:32.648] [bbotk]                                 uhash
INFO  [09:21:32.648] [bbotk]  9091fb22-77e0-47fc-ab3e-c7705818ea6b
INFO  [09:21:32.648] [bbotk]  c60700c8-4baa-4a3f-8

   user  system elapsed 
  0.106   0.005   0.858 

In [56]:
plan(list("multisession", "sequential"))
system.time({
  rr <- resample(
    tsk("penguins"),
    lrn_rpart_tuned,
    rsmp("cv", folds = 5)
  )
})

INFO  [09:22:04.822] [mlr3] Applying learner 'classif.rpart.tuned' on task 'penguins' (iter 1/5)
INFO  [09:22:04.951] [bbotk] Starting to optimize 1 parameter(s) with '<OptimizerBatchRandomSearch>' and '<TerminatorEvals> [n_evals=2, k=0]'
INFO  [09:22:04.977] [bbotk] Evaluating 2 configuration(s)
INFO  [09:22:04.981] [mlr3] Running benchmark with 2 resampling iterations
INFO  [09:22:05.024] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/1)
INFO  [09:22:05.036] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/1)
INFO  [09:22:05.042] [mlr3] Finished benchmark
INFO  [09:22:05.063] [bbotk] Result of batch 1:
INFO  [09:22:05.065] [bbotk]        47 0.06521739        0      0            0.006
INFO  [09:22:05.065] [bbotk]        71 0.06521739        0      0            0.002
INFO  [09:22:05.065] [bbotk]                                 uhash
INFO  [09:22:05.065] [bbotk]  0ded7b30-2e60-4a5a-8a6c-4f8abda9e5a1
INFO  [09:22:05.065] [bbotk]  e3c9e939-7d17-4220-b

   user  system elapsed 
  0.186   0.008   1.157 

In [57]:
plan(list("sequential", "multisession"))
system.time({
  rr <- resample(
    tsk("penguins"),
    lrn_rpart_tuned,
    rsmp("cv", folds = 5)
  )
})

INFO  [09:22:27.540] [mlr3] Applying learner 'classif.rpart.tuned' on task 'penguins' (iter 1/5)
INFO  [09:22:27.578] [bbotk] Starting to optimize 1 parameter(s) with '<OptimizerBatchRandomSearch>' and '<TerminatorEvals> [n_evals=2, k=0]'
INFO  [09:22:27.583] [bbotk] Evaluating 2 configuration(s)
INFO  [09:22:27.586] [mlr3] Running benchmark with 2 resampling iterations
INFO  [09:22:28.011] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/1)
INFO  [09:22:28.137] [mlr3] Applying learner 'classif.rpart' on task 'penguins' (iter 1/1)
INFO  [09:22:28.156] [mlr3] Finished benchmark
INFO  [09:22:28.172] [bbotk] Result of batch 1:
INFO  [09:22:28.174] [bbotk]       110 0.08695652        0      0             0.01
INFO  [09:22:28.174] [bbotk]        84 0.09782609        0      0             0.01
INFO  [09:22:28.174] [bbotk]                                 uhash
INFO  [09:22:28.174] [bbotk]  d8159aac-98e0-4196-9110-210750a228b9
INFO  [09:22:28.174] [bbotk]  85a2306b-59ba-4594-8

   user  system elapsed 
  0.752   0.032   1.368 

## 预测并行化

In [None]:
# 训练随机森林（决策树）在 sonar 任务上
tsk_sonar <- tsk("sonar")
lrn_rpart <- lrn("classif.rpart")
lrn_rpart$train(tsk_sonar)

# 设置并行预测，4 个 worker
future::plan("multisession", workers = 4)

# 预测
prediction <- lrn_rpart$predict(tsk_sonar)

# 错误处理

In [None]:
tsk_penguins <- tsk("penguins")
lrn_debug <- lrn("classif.debug")
lrn_debug

# <LearnerClassifDebug:classif.debug>: Debug Learner for Classification
# * Model: -
# * Parameters: list()
# * Validate: NULL
# * Packages: mlr3
# * Predict Types:  [response], prob
# * Feature Types: logical, integer, numeric, character, factor, ordered
# * Properties: hotstart_forward, internal_tuning, marshal, missings,
#   multiclass, twoclass, validation

<LearnerClassifDebug:classif.debug>: Debug Learner for Classification
* Model: -
* Parameters: list()
* Validate: NULL
* Packages: mlr3
* Predict Types:  [response], prob
* Feature Types: logical, integer, numeric, character, factor, ordered
* Properties: hotstart_forward, internal_tuning, marshal, missings,
  multiclass, twoclass, validation

In [None]:
# set probability to signal an error to `1`
lrn_debug$param_set$values$error_train <- 1
lrn_debug$train(tsk_penguins)

# Error in .__LearnerClassifDebug__.train(self = self, private = private, : Error from classif.debug->train()
# Traceback:

ERROR: Error in .__LearnerClassifDebug__.train(self = self, private = private, : Error from classif.debug->train()


## 封装

In [None]:
# trigger warning and error in training
lrn_debug <- lrn("classif.debug", warning_train = 1, error_train = 1)

# enable encapsulation for train() and predict()
lrn_debug$encapsulate("evaluate", fallback = lrn("classif.featureless"))
lrn_debug$train(tsk_penguins)

[38;5;167mERROR[39m [09:26:51.942] [mlr3] train: Error from classif.debug->train()
INFO  [09:26:51.956] [mlr3] Calling train method of fallback 'classif.featureless' on task 'penguins' with 344 observations [90m{[39m[38;5;169mlearner[39m[90m: [39m<LearnerClassifFeatureless/LearnerClassif/Learner/R6>[90m}[39m


In [None]:
lrn_debug$log

# stage class   msg                                
# 1 train warning Warning from classif.debug->train()
# 2 train error   Error from classif.debug->train()

stage,class,msg
<fct>,<ord>,<chr>
train,warning,Warning from classif.debug->train()
train,error,Error from classif.debug->train()


In [None]:
lrn_debug$warnings

# [1] "Warning from classif.debug->train()"

In [None]:
lrn_debug$errors

# [1] "Error from classif.debug->train()"

In [None]:
# near instant timeout for training, no timeout for predict
lrn_debug$timeout <- c(train = 1e-5, predict = Inf)
lrn_debug$train(task = tsk_penguins)$errors

# [1] "Error from classif.debug->train()"

[38;5;167mERROR[39m [09:30:28.915] [mlr3] train: Error from classif.debug->train()
INFO  [09:30:28.921] [mlr3] Calling train method of fallback 'classif.featureless' on task 'penguins' with 344 observations [90m{[39m[38;5;169mlearner[39m[90m: [39m<LearnerClassifFeatureless/LearnerClassif/Learner/R6>[90m}[39m


In [None]:
# no saved model as there was an error during training
lrn("classif.debug", error_train = 1)$train(tsk_penguins)$model

# Error in .__LearnerClassifDebug__.train(self = self, private = private, : Error from classif.debug->train()
# Traceback:

ERROR: Error in .__LearnerClassifDebug__.train(self = self, private = private, : Error from classif.debug->train()


In [None]:
# saved model
lrn_debug <- lrn("classif.debug", error_predict = 1)$train(tsk_penguins)
lrn_debug$model

# $response
# [1] "Chinstrap"

# $pid
# [1] 23467

# $id
# [1] "ff99dfd0-2c08-4b97-acb0-f1c45a5cdb0c"

# $random_number
# [1] 30325

# $iter
# [1] 1

# attr(,"class")
# [1] "classif.debug_model"

$response
[1] "Chinstrap"

$pid
[1] 23467

$id
[1] "ff99dfd0-2c08-4b97-acb0-f1c45a5cdb0c"

$random_number
[1] 30325

$iter
[1] 1

attr(,"class")
[1] "classif.debug_model"

## 回退学习器

In [67]:
lrn_debug <- lrn("classif.debug", error_train = 1)
lrn_debug$encapsulate("evaluate", fallback = lrn("classif.featureless"))

lrn_debug$train(tsk_penguins)

[38;5;167mERROR[39m [09:39:59.345] [mlr3] train: Error from classif.debug->train()
INFO  [09:39:59.351] [mlr3] Learner 'classif.debug' on task 'penguins' failed to train a model [90m{[39m[38;5;169mlearner[39m[90m: [39m<LearnerClassifDebug/LearnerClassif/Learner/R6>[90m, [39m[38;5;169mmessages[39m[90m: [39m`Error from classif.debug->train()`[90m}[39m
INFO  [09:39:59.353] [mlr3] Calling train method of fallback 'classif.featureless' on task 'penguins' with 344 observations [90m{[39m[38;5;169mlearner[39m[90m: [39m<LearnerClassifFeatureless/LearnerClassif/Learner/R6>[90m}[39m


In [None]:
lrn_debug

# <LearnerClassifDebug:classif.debug>: Debug Learner for Classification
# * Model: -
# * Parameters: error_train=1
# * Validate: NULL
# * Packages: mlr3
# * Predict Types:  [response], prob
# * Feature Types: logical, integer, numeric, character, factor, ordered
# * Properties: hotstart_forward, internal_tuning, marshal, missings,
#   multiclass, twoclass, validation
# * Errors: Error from classif.debug->train()

<LearnerClassifDebug:classif.debug>: Debug Learner for Classification
* Model: -
* Parameters: error_train=1
* Validate: NULL
* Packages: mlr3
* Predict Types:  [response], prob
* Feature Types: logical, integer, numeric, character, factor, ordered
* Properties: hotstart_forward, internal_tuning, marshal, missings,
  multiclass, twoclass, validation
* Errors: Error from classif.debug->train()

In [None]:
lrn_debug$log

# stage class msg                              
# 1 train error Error from classif.debug->train()

stage,class,msg
<fct>,<ord>,<chr>
train,error,Error from classif.debug->train()


In [None]:
lrn_debug$model

# NULL

NULL

In [None]:
prediction <- lrn_debug$predict(tsk_penguins)
prediction$score()

# classif.ce 
#  0.5581395

In [72]:
lrn_debug <- lrn(
  "classif.debug",
  error_train = 0.5
)
lrn_debug$encapsulate(
  "evaluate",
  fallback = lrn("classif.featureless")
)

aggr <- benchmark(
  benchmark_grid(
    tsk_penguins,
    list(
      lrn_debug,
      lrn("classif.rpart")
    ),
    rsmp("cv", folds = 20)
  )
)$aggregate(conditions = TRUE)

INFO  [09:44:07.717] [mlr3] Running benchmark with 40 resampling iterations
INFO  [09:44:07.727] [mlr3] Applying learner 'classif.debug' on task 'penguins' (iter 1/20)
INFO  [09:44:07.736] [mlr3] Calling train method of fallback 'classif.featureless' on task 'penguins' with 326 observations [90m{[39m[38;5;169mlearner[39m[90m: [39m<LearnerClassifFeatureless/LearnerClassif/Learner/R6>[90m}[39m
INFO  [09:44:07.749] [mlr3] Applying learner 'classif.debug' on task 'penguins' (iter 2/20)
INFO  [09:44:07.756] [mlr3] Calling train method of fallback 'classif.featureless' on task 'penguins' with 326 observations [90m{[39m[38;5;169mlearner[39m[90m: [39m<LearnerClassifFeatureless/LearnerClassif/Learner/R6>[90m}[39m
INFO  [09:44:07.768] [mlr3] Applying learner 'classif.debug' on task 'penguins' (iter 3/20)
INFO  [09:44:07.774] [mlr3] Calling train method of fallback 'classif.featureless' on task 'penguins' with 326 observations [90m{[39m[38;5;169mlearner[39m[90m: [39m<Learner

In [None]:
aggr[, .(learner_id, warnings, errors, classif.ce)]

# learner_id    warnings errors classif.ce
# 1 classif.debug 0        6      0.64428105
# 2 classif.rpart 0        0      0.05833333

learner_id,warnings,errors,classif.ce
<chr>,<int>,<int>,<dbl>
classif.debug,0,6,0.64428105
classif.rpart,0,0,0.05833333


In [None]:
rr <- aggr[learner_id == "classif.debug"]$resample_result[[1L]]
rr$errors[1:2]

# iteration msg                              
# 1 5         Error from classif.debug->train()
# 2 9         Error from classif.debug->train()

iteration,msg
<int>,<chr>
5,Error from classif.debug->train()
9,Error from classif.debug->train()


# 日志记录

In [75]:
lgr::get_logger("mlr3")$set_threshold("debug")

In [76]:
lgr::get_logger("mlr3")$set_threshold("warn")

In [77]:
lgr::get_logger("mlr3")$set_threshold("warn")
lgr::get_logger("bbotk")$set_threshold("info")

In [None]:
tf <- tempfile("mlr3log_", fileext = ".json")

logger <- lgr::get_logger("mlr3")
logger$add_appender(lgr::AppenderJson$new(tf), name = "json")
logger$warn("this is a warning from mlr3")

# [38;5;221mWARN [39m [09:54:07.161] [mlr3] this is a warning from mlr3



In [None]:
# print the contents of the file (splitting over two lines)
x <- readLines(tf)
cat(paste0(substr(x, 1, 71), "\n", substr(x, 72, nchar(x))))

# {"level":300,"timestamp":"2025-05-24 09:54:07","logger":"mlr3","caller"
# :"eval","msg":"[mlr3] this is a warning from mlr3"}

{"level":300,"timestamp":"2025-05-24 09:54:07","logger":"mlr3","caller"

In [80]:
# remove the appender again
logger$remove_appender("json")

# 数据后端

## 使用 DataBackendDplyr 的数据库

In [86]:
# load data
requireNamespace("DBI")
requireNamespace("RSQLite")
requireNamespace("nycflights13")
data("flights", package = "nycflights13")
dim(flights)

In [87]:
# 添加唯一行ID列
flights$row_id <- seq_len(nrow(flights))

# 创建临时sqlite数据库文件
db_path <- tempfile("flights", fileext = ".sqlite")
con <- DBI::dbConnect(RSQLite::SQLite(), db_path)
DBI::dbWriteTable(con, "flights", as.data.frame(flights))
DBI::dbDisconnect(con)

# 移除内存中的数据
rm(flights)

In [None]:
# establish connection
con = DBI::dbConnect(RSQLite::SQLite(), path)

# select the "flights" table
library(dplyr)
library(dbplyr)
tbl <- tbl(con, "flights")


Attaching package: 'dbplyr'


The following objects are masked from 'package:dplyr':

    ident, sql




In [None]:
# 1. subset columns
keep <- c(
  "row_id", "year", "month", "day", "hour", "minute", "dep_time",
  "arr_time", "carrier", "flight", "air_time", "distance", "arr_delay"
)
tbl <- select(tbl, all_of(keep))

# 2. filter by missing
tbl <- filter(tbl, !is.na(arr_delay))

# 3. select every other row
tbl <- filter(tbl, row_id %% 2 == 0)

# 4. merge infrequent carriers
infrequent <- c("OO", "HA", "YV", "F9", "AS", "FL", "VX", "WN")
tbl <- mutate(
  tbl,
  carrier = case_when(
    carrier %in% infrequent ~ "other",
    TRUE                   ~ carrier
  )
)

In [91]:
library(mlr3db)
backend_flights <- as_data_backend(tbl, primary_key = "row_id")
c(nrow = backend_flights$nrow, ncol = backend_flights$ncol)

In [None]:
backend_flights$head()

# row_id year month day hour minute dep_time arr_time carrier flight air_time
# 1  2     2013 1     1   5    29     533       850     UA      1714   227     
# 2  4     2013 1     1   5    45     544      1004     B6       725   183     
# 3  6     2013 1     1   5    58     554       740     UA      1696   150     
# 4  8     2013 1     1   6     0     557       709     EV      5708    53     
# 5 10     2013 1     1   6     0     558       753     AA       301   138     
# 6 12     2013 1     1   6     0     558       853     B6        71   158     
#   distance arr_delay
# 1 1416      20      
# 2 1576     -18      
# 3  719      12      
# 4  229     -14      
# 5  733       8      
# 6 1005      -3

row_id,year,month,day,hour,minute,dep_time,arr_time,carrier,flight,air_time,distance,arr_delay
<int>,<int>,<int>,<int>,<dbl>,<dbl>,<int>,<int>,<fct>,<int>,<dbl>,<dbl>,<dbl>
2,2013,1,1,5,29,533,850,UA,1714,227,1416,20
4,2013,1,1,5,45,544,1004,B6,725,183,1576,-18
6,2013,1,1,5,58,554,740,UA,1696,150,719,12
8,2013,1,1,6,0,557,709,EV,5708,53,229,-14
10,2013,1,1,6,0,558,753,AA,301,138,733,8
12,2013,1,1,6,0,558,853,B6,71,158,1005,-3


In [None]:
tsk_flights <- as_task_regr(
  backend_flights,
  id = "flights_sqlite",
  target = "arr_delay"
)
rsmp_sub002 <- rsmp("subsampling", ratio = 0.02, repeats = 3)

In [94]:
rr <- resample(tsk_flights, lrn("regr.rpart"), rsmp_sub002)

"DataBackend$data_formats is deprecated and will be removed in the future."


In [None]:
measures <- msrs(c("regr.rmse", "time_train", "time_predict"))
rr$aggregate(measures)

# regr.rmse   time_train time_predict 
#    34.886158     0.185000     5.959333

In [96]:
rm(tbl)
DBI::dbDisconnect(con)

In [None]:
path <- system.file(
  file.path("extdata", "spam.parquet"),
  package = "mlr3db"
)
backend <- as_duckdb_backend(path)
as_task_classif(backend, target = "type")

# <TaskClassif:backend> (4601 x 58)
# * Target: type
# * Properties: twoclass
# * Features (57):
#   - dbl (57): address, addresses, all, business, capitalAve,
#     capitalLong, capitalTotal, charDollar, charExclamation, charHash,
#     charRoundbracket, charSemicolon, charSquarebracket, conference,
#     credit, cs, data, direct, edu, email, font, free, george, hp, hpl,
#     internet, lab, labs, mail, make, meeting, money, num000, num1999,
#     num3d, num415, num650, num85, num857, order, original, our, over,
#     parts, people, pm, project, re, receive, remove, report, table,
#     technology, telnet, will, you, your

<TaskClassif:backend> (4601 x 58)
* Target: type
* Properties: twoclass
* Features (57):
  - dbl (57): address, addresses, all, business, capitalAve,
    capitalLong, capitalTotal, charDollar, charExclamation, charHash,
    charRoundbracket, charSemicolon, charSquarebracket, conference,
    credit, cs, data, direct, edu, email, font, free, george, hp, hpl,
    internet, lab, labs, mail, make, meeting, money, num000, num1999,
    num3d, num415, num650, num85, num857, order, original, our, over,
    parts, people, pm, project, re, receive, remove, report, table,
    technology, telnet, will, you, your