/
write_disk.frame.r
151 lines (139 loc) · 5.18 KB
/
write_disk.frame.r
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
#' Write disk.frame to disk
#' @description
#' Write a data.frame/disk.frame to a disk.frame location. If df is a data.frame
#' then using the as.disk.frame function is recommended for most cases
#' @param diskf a disk.frame
#' @param outdir output directory for the disk.frame
#' @param nchunks number of chunks
#' @param overwrite overwrite output directory
#' @param shardby the columns to shard by
#' @param partitionby the columns to (folder) partition by
#' @param compress compression ratio for fst files
#' @param ... passed to cmap.disk.frame
#' @export
#' @import fst fs
#' @importFrom dplyr group_map
#' @importFrom glue glue
#' @examples
#' cars.df = as.disk.frame(cars)
#'
#' # write out a lazy disk.frame to disk
#' cars2.df = write_disk.frame(cmap(cars.df, ~.x[1,]), overwrite = TRUE)
#' collect(cars2.df)
#'
#' # clean up cars.df
#' delete(cars.df)
#' delete(cars2.df)
write_disk.frame <- function(
diskf,
outdir = tempfile(fileext = ".df"),
nchunks = ifelse(
"disk.frame"%in% class(diskf),
nchunks.disk.frame(diskf),
recommend_nchunks(diskf)),
overwrite = FALSE,
shardby=NULL,
partitionby=NULL,
compress = 50, ...) {
force(nchunks)
overwrite_check(outdir, overwrite)
if(is.null(outdir)) {
stop("write_disk.frame error: outdir must not be NULL")
}
if(is_disk.frame(diskf)) {
if(!is.null(partitionby)) {
# for each chunk group by the partionby and then write out a partitioned disk.frame for each chunk
list_of_paths = diskf %>%
cimap(~{
tmp_dir_to_write = tempfile(as.character(.y))
tmp = .x %>%
group_by(!!!syms(partitionby)) %>%
dplyr::group_map(~{
# convert group keys to path
tmp_path = lapply(names(.y), function(n) {
sprintf("%s=%s", n, .y[, n])
}) %>%
do.call(file.path, .)
final_tmp_path = file.path(tmp_dir_to_write, tmp_path)
as.disk.frame(.x, final_tmp_path, overwrite = FALSE)
})
return(tmp_dir_to_write)
}, lazy=FALSE)
# for each of the chunks, do a soft row-append
partitioned_files = lapply(list_of_paths, function(path) {
# each path is a partitioned disk.frame
files = list.files(path, full.names = TRUE, recursive=TRUE)
tmp = data.frame(partition_path = files %>%
dirname %>%
sapply(tools::file_path_as_absolute) %>%
stringr::str_sub(nchar(path)+2))
tmp = tmp %>% mutate(path=path, files=files)
tmp
}) %>% rbindlist
partitioned_files %>%
group_by(partition_path) %>%
dplyr::group_map(function(df, grp) {
mapply(function(file, i) {
outfile = file.path(outdir, grp$partition_path, paste0(i, ".fst"))
if(!dir.exists(file.path(outdir, grp$partition_path))) {
fs::dir_create(file.path(outdir, grp$partition_path))
}
fs::file_move(file, outfile)
}, df$files, seq_along(df$files))
})
return(disk.frame(outdir))
} else if(is.null(shardby)) {
path = attr(diskf, "path")
files_shortname <- list.files(path)
cids = get_chunk_ids(diskf, full.names = T, strip_extension = F)
future.apply::future_lapply(1:length(cids), function(ii, ...) {
chunk = get_chunk(diskf, cids[ii], full.names = TRUE)
if(nrow(chunk) == 0) {
warning(sprintf("The output chunk has 0 row, therefore chunk %d NOT written", ii))
} else {
out_chunk_name = file.path(outdir, files_shortname[ii])
fst::write_fst(chunk, out_chunk_name, compress)
return(files_shortname)
}
return(NULL)
}, ..., future.seed = TRUE)
return(disk.frame(outdir))
} else {
# TODO really inefficient
shard(diskf,
outdir = outdir,
nchunks = nchunks,
overwrite = TRUE,
shardby = shardby,
compress = compress,
...
)
}
} else if ("data.frame" %in% class(diskf)) {
if(".out.disk.frame.id" %in% names(diskf)) {
diskf[,{
if (base::nrow(.SD) > 0) {
list_columns = purrr::map_lgl(.SD, is.list)
if(any(list_columns)){
stop(glue::glue("The data frame contains these list-columns: '{paste0(names(.SD)[list_columns], collapse='\', \'')}'. List-columns are not yet supported by disk.frame. Remove these columns to create a disk.frame"))
} else {
fst::write_fst(.SD, file.path(outdir, paste0(.BY, ".fst")), compress = compress)
NULL
}
}
NULL
}, .out.disk.frame.id]
res = disk.frame(outdir)
add_meta(res, shardkey = shardby, shardchunks = nchunks, compress = compress)
} else {
as.disk.frame(diskf, outdir = outdir, nchunks = nchunks, overwrite = TRUE, shardby = shardby, compress = compress, ...)
}
} else {
stop("write_disk.frame error: diskf must be a disk.frame or data.frame")
}
}
#' @rdname write_disk.frame
output_disk.frame <- function(...) {
warning("output_disk.frame is DEPRECATED. Use write_disk.frame istead")
write_disk.frame(...)
}