Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Bug when importing 30GB csv file #141

Closed
b-rodrigues opened this issue Sep 3, 2019 · 16 comments
Closed

Bug when importing 30GB csv file #141

b-rodrigues opened this issue Sep 3, 2019 · 16 comments

Comments

@b-rodrigues
Copy link

Hi,

I'm trying to reproduce a blog post I've written where I've used Spark to read in a 30gb file: https://www.brodrigues.co/blog/2018-02-16-importing_30gb_of_data/

you can download the data here: https://packages.revolutionanalytics.com/datasets/AirOnTime87to12/ (it's the zip file)

This zip file contains a folder with a lot of smaller csv files. I could import these, but I wanted to try the scenario where I would get a 30gb file and import it in one go (just as described in the post).

So first I combine the file into one big csv file:

head -1 airOT198710.csv > combined.csv for file in $(ls airOT*); do cat $file | sed "1 d" >> combined.csv; done

and then I try out {disk.frame}:

library(tidyverse)
library(disk.frame)
setup_disk.frame(workers = 6)
options(future.globals.maxSize = Inf)
path_to_data <- "/run/media/cbrunos/0a8a239e-7f3e-4756-9ddd-129c23fad79b/laragreen/Downloads/AirOnTimeCSV/"

flights.df <- csv_to_disk.frame(
  paste0(path_to_data, "combined.csv"), 
  outdir = paste0(path_to_data, "combined.df"),
  in_chunk_size = 1000)

But when I try this, I get the following error message:

System errno 22 unmapping file: Invalid argument
Error in data.table::fread(infile, skip = skiprows, nrows = in_chunk_size,  : 
  Opened 29.56GB (31737196240 bytes) file ok but could not memory map it. This is a 64bit process. There is probably not enough contiguous virtual memory available.

The hard drive has around 600gb of space left, and 16gigs of ram.
I am running the latest {disk.frame} from github as well.

Here are some info about my session:

> R.Version()
$platform
[1] "x86_64-conda_cos6-linux-gnu"

$arch
[1] "x86_64"

$os
[1] "linux-gnu"

$system
[1] "x86_64, linux-gnu"

$status
[1] ""

$major
[1] "3"

$minor
[1] "6.1"

$year
[1] "2019"

$month
[1] "07"

$day
[1] "05"

$`svn rev`
[1] "76782"

$language
[1] "R"

$version.string
[1] "R version 3.6.1 (2019-07-05)"

$nickname
[1] "Action of the Toes"
[1] "path_to_data"
> sessionInfo()
R version 3.6.1 (2019-07-05)
Platform: x86_64-conda_cos6-linux-gnu (64-bit)
Running under: openSUSE Tumbleweed

Matrix products: default
BLAS/LAPACK: /home/cbrunos/miniconda3/envs/r_env/lib/R/lib/libRblas.so

locale:
 [1] LC_CTYPE=en_US.utf8       LC_NUMERIC=C             
 [3] LC_TIME=en_US.utf8        LC_COLLATE=en_US.utf8    
 [5] LC_MONETARY=en_US.utf8    LC_MESSAGES=en_US.utf8   
 [7] LC_PAPER=en_US.utf8       LC_NAME=C                
 [9] LC_ADDRESS=C              LC_TELEPHONE=C           
[11] LC_MEASUREMENT=en_US.utf8 LC_IDENTIFICATION=C      

attached base packages:
[1] stats     graphics  grDevices utils     datasets  methods   base     

other attached packages:
 [1] disk.frame_0.1.0  forcats_0.4.0     stringr_1.4.0     dplyr_0.8.3.9000 
 [5] purrr_0.3.2       readr_1.3.1       tidyr_0.8.99.9000 tibble_2.1.3     
 [9] ggplot2_3.1.1     tidyverse_1.2.1  

loaded via a namespace (and not attached):
 [1] benchmarkmeData_1.0.2 tidyselect_0.2.5      listenv_0.7.0        
 [4] haven_2.1.0           lattice_0.20-38       colorspace_1.4-1     
 [7] vctrs_0.2.0           generics_0.0.2        rlang_0.4.0          
[10] pillar_1.4.2          glue_1.3.1            withr_2.1.2          
[13] pryr_0.1.4            modelr_0.1.4          readxl_1.3.1         
[16] foreach_1.4.7         lifecycle_0.1.0       plyr_1.8.4           
[19] benchmarkme_1.0.2     munsell_0.5.0         gtable_0.3.0         
[22] cellranger_1.1.0      rvest_0.3.3           future_1.14.0        
[25] codetools_0.2-16      fst_0.9.0             doParallel_1.0.15    
[28] parallel_3.6.1        furrr_0.1.0           broom_0.5.2          
[31] Rcpp_1.0.2            scales_1.0.0          backports_1.1.4      
[34] jsonlite_1.6          fs_1.3.1              digest_0.6.20        
[37] hms_0.4.2             stringi_1.4.3         grid_3.6.1           
[40] cli_1.1.0             tools_3.6.1           magrittr_1.5         
[43] lazyeval_0.2.2        future.apply_1.3.0    crayon_1.3.4         
[46] pkgconfig_2.0.2       zeallot_0.1.0         Matrix_1.2-17        
[49] data.table_1.12.2     xml2_1.2.0            lubridate_1.7.4      
[52] assertthat_0.2.1      httr_1.4.1            rstudioapi_0.10      
[55] iterators_1.0.12      globals_0.12.4        R6_2.4.0             
[58] nlme_3.1-139          compiler_3.6.1  
@xiaodaigh
Copy link
Collaborator

Thanks for the details bug report.

It will be another 17 hours before I can download the file as I don't have unlimited internet where I am. So please be patient. I will prioritise this.

@xiaodaigh
Copy link
Collaborator

The great thing about disk.frame is that you don't need to combine the CSVs first before you read. In fact, it's recommended that you don't so you can take advantage of parallel processing where each worker looks at one file at a time. But I will do the combination and then test. It's a great test for disk.frame!

@b-rodrigues
Copy link
Author

b-rodrigues commented Sep 3, 2019 via email

@xiaodaigh
Copy link
Collaborator

xiaodaigh commented Sep 3, 2019

I have implemented a backend using readr, please note the argument backend = "readr"

Also, please up the in_chunk_size. The value of 1000 is too small, I would try 1e7 or 1e6, or maybe even 2e7 or 4e7 depending on the available RAM. The more you load per chunk the more efficient disk.frame can work.

devtools::install_github("disk.frame")
library(tidyverse)
library(disk.frame)
setup_disk.frame(workers = 6)
options(future.globals.maxSize = Inf)
path_to_data <- "/run/media/cbrunos/0a8a239e-7f3e-4756-9ddd-129c23fad79b/laragreen/Downloads/AirOnTimeCSV/"

flights.df <- csv_to_disk.frame(
  paste0(path_to_data, "combined.csv"), 
  outdir = paste0(path_to_data, "combined.df"),
  in_chunk_size = 1e6,
  backend = "readr")

@b-rodrigues
Copy link
Author

b-rodrigues commented Sep 3, 2019

Hi, this runs, but maxes out my RAM and R gets killed by linux:

> flights.df <- csv_to_disk.frame(
+   paste0(path_to_data, "combined.csv"), 
+   outdir = paste0(path_to_data, "combined.df"),
+   in_chunk_size = 1e5,
+   backend = "readr")

Process R killed at Tue Sep  3 20:42:47 2019

@xiaodaigh
Copy link
Collaborator

xiaodaigh commented Sep 4, 2019

@b-rodrigues Ok, I got this with #147. The problem is with data.table trying to mmap the whole file even though you only want to read part of it. The only real solution is to have a chunked fread but there doesn't seem to be too much appetite for development.

There's at least two ways to use csv_to_disk.frame to read it now. I will design some smarts into csv_to_disk.frame so it will auto-select the methods, but for now, this is the best I can do. The fastest way is using the LaF backend, but I am not sure about the data correctness for all columns using LaF.

Don't forget to install the least version

remotes::install_github("xiaodaigh/disk.frame")

Using LaF backend

library(disk.frame)
setup_disk.frame()
rows = 148619656
recommended_nchunks = recommend_nchunks(file.size(file.path(path_to_data, "combined.csv")))
in_chunk_size = ceiling(rows/ recommended_nchunks)

path_to_data <- "c:/data/"
system.time(flights.df <- csv_to_disk.frame(
   paste0(path_to_data, "combined.csv"), 
   outdir = paste0(path_to_data, "combined.laf.df"),
   in_chunk_size = in_chunk_size,
   backend = "LaF"
))

Using readr and readLines chunk_reader, but still using data.table backend (the default)
There are two pesky columns that were detected as integers and but later detected as strings. I am designing a system to deal with this better. But currently, it gives head-scratching errors!

Both of the below works fine:

library(disk.frame)
setup_disk.frame()
rows = 148619656
recommended_nchunks = recommend_nchunks(file.size(file.path(path_to_data, "combined.csv")))
in_chunk_size = ceiling(rows/ recommended_nchunks)

system.time(a <- csv_to_disk.frame(
   file.path(path_to_data, "combined.csv"),
   outdir = file.path(path_to_data, "combined.readr.df"),
   in_chunk_size = in_chunk_size,
   colClasses = list(character = c("WHEELS_OFF","WHEELS_ON")),
   chunk_reader = "readr"
))

or

library(disk.frame)
setup_disk.frame()
rows = 148619656
recommended_nchunks = recommend_nchunks(file.size(file.path(path_to_data, "combined.csv")))
in_chunk_size = ceiling(rows/ recommended_nchunks)

system.time(a <- csv_to_disk.frame(
   file.path(path_to_data, "combined.csv"),
   outdir = file.path(path_to_data, "combined.readr.df"),
   in_chunk_size = in_chunk_size,
   colClasses = list(character = c("WHEELS_OFF","WHEELS_ON")),
   chunk_reader = "readLines"
))

The best way is to not combine first!
If I don't combine first into one big file then I can make use of parallelism and it reads it in 3mins on my computer!!

path_to_data = "c:/data/AirOnTimeCSV/"
system.time(a <- csv_to_disk.frame(
   list.files(path_to_data, pattern = ".csv$", full.names = TRUE),
   outdir = file.path(path_to_data, "airontimecsv.df"),
   colClasses = list(character = c("WHEELS_OFF", "WHEELS_ON"))
))

Repeating your analysis using disk.frame
Note the annoying "two-stage" aggregation. But it's much faster than 5 mins for Spark.

library(disk.frame)

setup_disk.frame()

path_to_data = "c:/data/Air"

a = disk.frame(file.path(path_to_data, "airontimecsv.df"))
system.time(r_mean_del_delay <- a %>% 
  group_by(YEAR, MONTH, DAY_OF_MONTH) %>% 
  summarise(sum_delay = sum(DEP_DELAY, na.rm = TRUE), n = n()) %>% 
  collect %>% 
  group_by(YEAR, MONTH, DAY_OF_MONTH) %>% 
  summarise(mean_delay = sum(sum_delay)/sum(n)))

library(lubridate)

dep_delay =  r_mean_del_delay %>%
  arrange(YEAR, MONTH, DAY_OF_MONTH) %>%
  mutate(date = ymd(paste(YEAR, MONTH, DAY_OF_MONTH, sep = "-")))

library(ggplot2)
ggplot(dep_delay, aes(date, mean_delay)) + geom_smooth()

@xiaodaigh
Copy link
Collaborator

xiaodaigh commented Sep 4, 2019

Another very fast way to do it is to split up the files first using bigreadr's split function.

pt= proc.time()
a = bigreadr::split_file(file.path(path_to_data, "combined.csv"), every_nlines = in_chunk_size, repeat_header = TRUE)
f = bigreadr::get_split_files(a)
csv_to_disk.frame(
   f, 
   outdir = "c:/data/split30g.df"
)
data.table::timetaken(pt)

Update 20190910
csv_to_disk.frame splits the file into chunk using bigreadr by default if in_chunk_size is set.

@xiaodaigh
Copy link
Collaborator

xiaodaigh commented Sep 4, 2019

Just added a bigreadr chunk_reader (#150) which splits the files. But it breaks colClasses. So we need to specify it using column numbers, which is a shame. The over-arching CSV read frame work will solve it though in the near future.

pt= proc.time()
csv_to_disk.frame(
   file.path(path_to_data, "combined.csv"), 
   outdir = "c:/data/split30g.df",
   in_chunk_size = in_chunk_size,
   chunk_reader = "bigreadr",
   colClasses = list(character = c(22,23))
   #colClasses = list(character = c("WHEELS_OFF","WHEELS_ON"))
)
data.table::timetaken(pt) # 3min total.

@b-rodrigues
Copy link
Author

Thank you very much for your help and responsiveness! I was able to run the code on my machine without any issues. I have written a blog post about this magnificient package:

https://www.brodrigues.co/blog/2019-09-03-disk_frame/

@xiaodaigh
Copy link
Collaborator

Closing it as issue seem resolved.

@jangorecki
Copy link

I am hitting the same error System errno 22 unmapping file: Invalid argument when using fread on a large file when using R -d valgrind, normal R works fine (although segfaults from time to time). Any hints on reasons behind that errors?

@xiaodaigh
Copy link
Collaborator

@jangorecki Perhaps this issue Rdatatable/data.table#3526

Try csv_to_disk.frame(input_file, in_chunk_size = 1e6) and see if the error goes away. You may want to modify in_chunk_size = 1e7 etc if your computer has lots of RAM.

Would be great if the data can be shared. Always looking for large datasets.

@jangorecki
Copy link

I need to debug fread's segfault so disk.frame won't address by needs. Thanks anyway.

@lucazav
Copy link

lucazav commented May 1, 2021

Hi all,
I created a disk.dataframe using the multiple CSV files as following:

library(disk.frame)

n_cores <- future::availableCores() - 1

setup_disk.frame(workers = n_cores)
options(future.globals.maxSize = Inf)

main_path <- 'E:/data/AirOnTime'
air_files <- list.files( paste0(main_path, '/AirOnTimeCSV'), full.names=TRUE )

system.time(
    dkf <- csv_to_disk.frame(
        infile = air_files,
        outdir = paste0(main_path, '/AirOnTime.df'),
        colClasses = list(character = c("WHEELS_OFF", "WHEELS_ON")),
        overwrite = TRUE
    )
)

Then, as the one-stage group-by has been implemented, I tried to calculate the average delay by origins in the following way:

system.time(
    mean_dep_delay_df <- dkf %>%
        group_by(YEAR, MONTH, DAY_OF_MONTH, ORIGIN) %>%
        summarise(avg_delay = mean(DEP_DELAY)) %>%
        collect()
)

Unfortunately this operation makes my machine crash, sometime causing a blue screen (I'm on Windows).
If instead of using the mean() function in one step, I just calculate the sum with sum() and the number of occurrences with n(), the computation completed with success in 6 minutes.

I'm using a 32 GB hexacore machine (12 threads, so I used 11 workers for the upon mentioned computations), Microsoft R Open 4.0.2 and disk.frame 0.3.7.

@xiaodaigh
Copy link
Collaborator

Try this which should load only the 5 columns needed for this operation..

srckeep only loads the column specified into memory.

In the future, we might be able to detect the column needs from the code so the user don't have to use srckeep

system.time(
    mean_dep_delay_df <- dkf %>%
        srckeep(c("YEAR", "MONTH", "DAY_OF_MONTH", "ORIGIN", "DEP_DELAY")) %>%
        group_by(YEAR, MONTH, DAY_OF_MONTH, ORIGIN) %>%
        summarise(avg_delay = mean(DEP_DELAY)) %>%
        collect()
)

@lucazav
Copy link

lucazav commented May 2, 2021

Thanks @xiaodaigh , now it works well. I noticed two things:

  1. After the processing R sessions still keep living. I've to kill them using future:::ClusterRegistry("stop").
  2. It happen that if I run the summarize operation just after dkf has been written on disk, the computations don't use all the 11 workers, even if I don't kill the cluster. I've to force the setup_disk.frame again to make it working. Here the full code I'm using:
library(dplyr)
library(disk.frame)

n_cores <- future::availableCores() - 1

setup_disk.frame(workers = n_cores)
options(future.globals.maxSize = Inf)

main_path <- 'E:/data/AirOnTime'

air_files <- list.files( paste0(main_path, '/AirOnTimeCSV'), full.names=TRUE )

dkf <- csv_to_disk.frame(
	infile = air_files,
	outdir = paste0(main_path, '/AirOnTime.df'),
	colClasses = list(character = c("WHEELS_OFF", "WHEELS_ON")),
	overwrite = TRUE
)

mean_dep_delay_df <- dkf %>%
	srckeep(c("YEAR", "MONTH", "DAY_OF_MONTH", "ORIGIN", "DEP_DELAY")) %>%
	group_by(YEAR, MONTH, DAY_OF_MONTH, ORIGIN) %>%
	summarise(avg_delay = mean(DEP_DELAY)) %>%
	collect()

future:::ClusterRegistry("stop")

Does disk.dataframe creation force the cluster to reduce its nodes (sessions) in some way?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

4 participants