-
Notifications
You must be signed in to change notification settings - Fork 1
/
de-parallel.Rmd
293 lines (223 loc) · 8.09 KB
/
de-parallel.Rmd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
---
title: "Disposition Effect in Parallel"
output: rmarkdown::html_vignette
vignette: >
%\VignetteIndexEntry{Disposition Effect in Parallel}
%\VignetteEngine{knitr::rmarkdown}
%\VignetteEncoding{UTF-8}
---
```{r, include = FALSE}
knitr::opts_chunk$set(
message = FALSE,
warning = FALSE,
collapse = TRUE,
comment = "#>",
out.width = "100%"
)
```
# Package loading
To load the package simply use the usual `library` function.
This tutorial requires also some well-known packages that allows
for parallel computations in R.
```{r, eval = TRUE}
library(dispositionEffect)
library(dplyr)
library(foreach)
library(parallel)
library(doParallel)
library(future)
library(furrr)
library(bench)
```
# Data
The disposition effect analysis is performed on two fundamental types
of data frames:
* portfolio transactions, that is all the financial transactions an
investor did during a specific period of time.
A single transaction is made up of 6 features: the investor id,
the asset id, the type of the transaction (it can be a buy or a sell),
the traded quantity, the traded price, and the datetime.
* market prices, that is the prices found on the stock markets for
each traded asset and each transaction datetimes.
# The Problem
Usually, investor's transactions datasets may be huge in size.
Moreover, the dataset of market prices may grow exponentially
since it has to contain the market prices for each traded asset
on each trading datetime.
This may cause the computation of gains and losses through the
main interface of the package (`portfolio_compute`) to take few
minutes in order to produce the output data frame.
This computational inefficiency is particularly due to the size
of the market price dataset, since for every transaction we need
to look for the closest available market price at that datetime
for each asset.
For this reason, we used few parallel methodologies to improve
the computation time on large datasets.
# The Solution: Parallel Computing
## 1) Parallel `porfolio_compute`
Usually, you may want to apply the computations on many different
investors. To do that one can simply build a function that loops
on a list containing the transactions portfolios. The first solution
to this problem is to develop a parallel version of
`portfolio_compute` that allows to compute results on many
investors in parallel, taking advantage of two very useful
R packages: `future` and `furrr`.
A very simple version of the function would look like this.
```{r, eval=TRUE}
portfolio_compute_parallel <- function(portfolio_transactions, market_prices, plan = NULL, ...) {
investors_id <- purrr::map_chr(portfolio_transactions, ~purrr::pluck(., "investor")[1])
portfolio_compute_safe <- purrr::safely(portfolio_compute)
if (is.null(plan)) {
ncores <- future::availableCores()
# if there are more than 2 cores than use parallel computing
# otherwise use sequential computing
# RULE: be polite, always leave at least 1 free core
if ((ncores - 1) > 1) {
new_plan <- "multisession" # since I am testing on Windows
} else {
new_plan <- "sequential"
}
} else {
new_plan <- plan
}
old_plan <- future::plan(strategy = new_plan)
res <- furrr::future_map(
portfolio_transactions,
portfolio_compute_safe,
market_prices,
...)
res <- purrr::transpose(res)$result
names(res) <- investors_id
future::plan(old_plan) # set back the old plan
return(res)
}
```
This way you simply need to store all the different investor's
portfolios of transactions in a list and pass it to
`portfolio_compute_parallel` as first argument.
Now let's try it on the `DEanalysis` dataset to see if there is
any computational improvement.
```{r, eval=TRUE}
trx <- DEanalysis$transactions %>%
dplyr::group_by(investor) %>%
dplyr::group_split()
mkt <- DEanalysis$marketprices
```
```{r, eval=FALSE}
res <- bench::mark(
"sequential" = portfolio_compute_parallel(trx, mkt, plan = "sequential"),
"parallel" = portfolio_compute_parallel(trx, mkt, plan = "multisession")
)
res$expression <- c("sequential", "parallel")
res[, 1:8]
```
```{r, echo=FALSE, fig.align='center'}
knitr::include_graphics("figures/de-parallel-bench1.PNG")
```
As you can see, the benefits of parallel code are clear even on this
small sample dataset.
## 2) Separate files
Although the first solution may be enough with many small
investors, when things get bigger it may be better to physically
separate transactions and market prices for each investor.
This way, a single `.RData` that contains both the transactions and
the market prices for a single investor is created.
Once this process is finished, one can simply perform the computations
with the usual `portfolio_compute` loading the needed file sequentially
or in parallel by means of the `foreach`package.
First let's store every investor's datasets into its own file. To do this
we can make use of the `dispositionEffect:::generate_data` internal function.
In this way, we are able also to optimize the market price dataset, since
for each investor we can save only the market prices of his traded assets
for that specific period of trading.
```{r, eval=FALSE}
cl <- parallel::makeCluster(parallel::detectCores())
doParallel::registerDoParallel(cl)
foreach::foreach(i = seq_along(trx), .errorhandling = "pass") %dopar% {
transactions <- dplyr::arrange(trx[[i]], datetime)
mrkt_prices <- dplyr::filter(
mkt,
asset %in% unique(transactions$asset) &
datetime <= max(transactions$datetime)
)
df <- dispositionEffect:::generate_data(transactions, mrkt_prices, subset = TRUE)
nm <- paste0("INV", i, ".RData")
save(df, file = nm)
}
parallel::stopCluster(cl)
```
Now, we can build the function to perform computations
loading each single file sequentially or in parallel.
```{r, eval=FALSE}
portfolio_compute_onfiles <- function(files, plan = "sequential") {
if (plan == "sequential") {
res_list <- vector(mode = "list", length = length(files))
for (i in seq_along(files)) {
load(files[i]) # load the file
tmp_res <- tryCatch(
dispositionEffect::portfolio_compute(
portfolio_transactions = df$transactions,
market_prices = df$marketprices
),
error = function(e) "Error"
)
res_list[[i]] <- tmp_res # save results
rm(df, tmp_res)
}
} else {
cl <- parallel::makeCluster(parallel::detectCores())
doParallel::registerDoParallel(cl)
res_list <-
foreach(i = seq_along(files), .errorhandling = "pass") %dopar% {
load(files[i]) # load the file
tmp_res <- tryCatch(
dispositionEffect::portfolio_compute(
portfolio_transactions = df$transactions,
market_prices = df$marketprices
),
error = function(e) "Error"
)
}
parallel::stopCluster(cl)
}
return(res_list)
}
```
```{r, eval=FALSE}
files <- list.files(pattern = ".RData") # list all single .RData
res <- bench::mark(
portfolio_compute_onfiles(files, plan = "sequential"),
portfolio_compute_onfiles(files, plan = "multisession")
)
res$expression <- c("sequential", "parallel")
res[, 1:8]
```
```{r, echo=FALSE, fig.align='center'}
knitr::include_graphics("figures/de-parallel-bench2.PNG")
```
# Conclusions
As expected, parallel computation coupled with load in memory
small portion of datasets is the optimal solution.
Moreover, sometimes a little gain in computation time can be
obtained by setting the `exact_market_prices` argument of
`portfolio_compute` equal to TRUE. This way, the closest
market price search is performed on the very same transaction
datetime, but it requires that the market price dataset contains
all the prices for each asset at each single transaction datetime.
---------------------------------------------------------------------------
*This tests have been performed on a 4 core, 8GB RAM, Windows machine*
```{r, eval=TRUE, echo=FALSE}
# system('systeminfo')
version
```
For more tutorials on disposition effect visit
[dispositionEffect](https://marcozanotti.github.io/dispositionEffect/).