Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Premature EOF on WARC files #41

Closed
nostrademons opened this issue May 2, 2016 · 6 comments
Closed

Premature EOF on WARC files #41

nostrademons opened this issue May 2, 2016 · 6 comments

Comments

@nostrademons
Copy link

When parsing part of the CommonCrawl corpus (which consists of ~1G WARC files where each record is individually compressed), flate2 will return EOF after the first chunk has been decompressed rather than continuing to read the rest of the file. Sample data:

s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2016-07/segments/1454701166570.91/warc/CC-MAIN-20160205193926-00310-ip-10-236-182-209.ec2.internal.warc.gz

(Downloadable with 'aws s3 cp s3://aws-publicdatasets/common-crawl/crawl-data/CC-MAIN-2016-07/segments/1454701166570.91/warc/CC-MAIN-20160205193926-00310-ip-10-236-182-209.ec2.internal.warc.gz' if you have the AWS CLI installed. 837M, no fees.)

Sample code:

fn main() {
  let filename = env::args().nth(1).unwrap_or(
      "../CC-MAIN-20160205193926-00310-ip-10-236-182-209.ec2.internal.warc.gz"
      .to_string());
  let input_file = File::open(&filename).unwrap();
  let mut gz_decoder = GzDecoder::new(input_file).unwrap();
  loop {
    let mut buffer = Vec::with_capacity(1000000);
    let capacity = buffer.capacity();
    let read_range = buffer.len() .. capacity;
    unsafe { buffer.set_len(capacity) };
    match gz_decoder.read(&mut buffer[read_range]) {
        Ok(0) => {
          println!("EOF");
          break;
        },
        Ok(_) => {
          println!("Read {}", String::from_utf8_lossy(&buffer))
        },
        Err(err) => {
          println!("Error filling buffer: {}", err)
        }
    }
  }
}

Arguably the files shouldn't do this, but the WARC spec recommends record-at-a-time compression, and it's pretty common practice in the Hadoop world to operate on big files that are the concatenation of individually-gzipped records so that Hadoop can split the input without reading it. gunzip/gzcat can read it, and re-compressing it with gzip allows flate2 to as well. Given that these files exist, maybe flate2 could avoid returning EOF until the underlying stream does, instead returning the stream of decompressed bytes from the next record?

@alexcrichton
Copy link
Member

Unfortunately this has also been reported before (#35 and #23), but due to other divergences from zlib (see #40), I'm somewhat inclined to just implement a zlib-compatible gz stream as perhaps a separate type...

@nostrademons
Copy link
Author

Is there an easy workaround? I don't mind writing extra code on my end, but I don't know any other way to reliably check for EOF, since the GzDecoder takes ownership of its underlying reader and so I can't keep a borrowed reference to the File to check eof().

I suppose for my use-case I could get rid of flate entirely and pipe the output of gunzip -c to stdin, but then I've got to wire everything up with shell scripts, and I'm limited to one input file.

@alexcrichton
Copy link
Member

You may be able to do something like:

  • Use bufread::GzDecoder instead of read::GzDecoder.
  • Use &mut BufRead instead of a by-value T
  • When one stream finishes decoding, check to see if the buffer has any more bytes in it, and if it does create a new GzDecoder to decode the next stream.

Although perhaps not tested, the GzDecoder shouldn't consume any more bytes than it's supposed to, so if the streams are literally concatenated you should be able to detect this and just pick up where it left off.

I haven't tested this out yet, though, so it may not work :(

@alexcrichton
Copy link
Member

To clarify, this is what I would expect:

extern crate flate2;

use std::io::prelude::*;
use std::io;

fn main() {
    let mut v = Vec::new();
    flate2::write::GzEncoder::new(&mut v, flate2::Compression::Best)
                             .write_all(b"foo")
                             .unwrap();
    flate2::write::GzEncoder::new(&mut v, flate2::Compression::Best)
                             .write_all(b"bar")
                             .unwrap();

    let mut data = &v[..];

    io::copy(&mut flate2::bufread::GzDecoder::new(&mut data).unwrap(),
             &mut io::stdout()).unwrap();
    io::copy(&mut flate2::bufread::GzDecoder::new(&mut data).unwrap(),
             &mut io::stdout()).unwrap();
}

It's crucial that you use bufread::GzDecoder instead of read::GzDecoer (as that may buffer too much), but otherwise while there's more data you can just keep decoding with a brand new GzDecoder.

@veldsla
Copy link
Contributor

veldsla commented May 13, 2016

I can confirm that this works. Concatenated gz members are quite common in bioinformatics as well. The Bgzf standard uses this in combination with an index to allow random access on files or concurrent processing. The members are quite small (max 64Kb uncompressed data). Performance seems fine. Time to process a file is comparable to zcat (when I use the zlib feature. Miniz is about 1.6 times slower, but this is also the case in single member gz files)

Partial example that uses the fastq reader from rust-bio:

let mut reader = BufReader::new(file);
let mut r = fastq::Record::new();

loop {
    //loop over all possible gzip members
    match reader.fill_buf() {
        Ok(b) => if b.is_empty() { break },
        Err(e) => panic!(e)
    }

    //decode the next member
    let gz = flate2::bufread::GzDecoder::new(&mut reader).unwrap();
    let mut fqreader = fastq::Reader::new(gz);

    //loop over all records in this member
    loop {
        match fqreader.read(&mut r) {
            Ok(()) => {
                if r.is_empty() {
                    //current gz member finished, more to decode?
                    break;
                }
            },
            Err(err) => panic!(err)
        }
        //do stuff
    }
}

@nostrademons
Copy link
Author

Thanks for looking into this. I've been working on other parts of my system lately, but I'll implement this solution when I return to the Rust code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants