Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s2c: Add Snappy/S2 stream recompression #611

Merged
merged 1 commit into from
Jun 1, 2022
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 57 additions & 2 deletions s2/cmd/s2c/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ var (
faster = flag.Bool("faster", false, "Compress faster, but with a minor compression loss")
slower = flag.Bool("slower", false, "Compress more, but a lot slower")
snappy = flag.Bool("snappy", false, "Generate Snappy compatible output stream")
recomp = flag.Bool("recomp", false, "Recompress Snappy or S2 input")
cpu = flag.Int("cpu", runtime.GOMAXPROCS(0), "Compress using this amount of threads")
blockSize = flag.String("blocksize", "4M", "Max block size. Examples: 64K, 256K, 1M, 4M. Must be power of two and <= 4MB")
block = flag.Bool("block", false, "Compress as a single block. Will load content into memory.")
Expand Down Expand Up @@ -342,6 +343,9 @@ Options:`)
}
for _, filename := range files {
if *block {
if *recomp {
exitErr(errors.New("cannot recompress blocks (yet)"))
}
func() {
var closeOnce sync.Once
dstFilename := cleanFileName(fmt.Sprintf("%s%s", filename, ext))
Expand Down Expand Up @@ -429,20 +433,55 @@ Options:`)
}
func() {
var closeOnce sync.Once
dstFilename := cleanFileName(fmt.Sprintf("%s%s", filename, ext))
outFileBase := filename
if *recomp {
switch {
case strings.HasSuffix(outFileBase, s2Ext):
outFileBase = strings.TrimSuffix(outFileBase, s2Ext)
case strings.HasSuffix(outFileBase, snappyExt):
outFileBase = strings.TrimSuffix(outFileBase, snappyExt)
case strings.HasSuffix(outFileBase, ".snappy"):
outFileBase = strings.TrimSuffix(outFileBase, ".snappy")
}
}
dstFilename := cleanFileName(fmt.Sprintf("%s%s", outFileBase, ext))
if *out != "" {
dstFilename = *out
}
if !*quiet {
fmt.Print("Compressing ", filename, " -> ", dstFilename)
}

if dstFilename == filename && !*stdout {
if *remove {
exitErr(errors.New("cannot remove when input = output"))
}
renameDst := dstFilename
dstFilename = fmt.Sprintf(".tmp-%s%s", time.Now().Format("2006-01-02T15-04-05Z07"), ext)
defer func() {
exitErr(os.Rename(dstFilename, renameDst))
}()
}

// Input file.
file, _, mode := openFile(filename)
exitErr(err)
defer closeOnce.Do(func() { file.Close() })
src, err := readahead.NewReaderSize(file, *cpu+1, 1<<20)
exitErr(err)
defer src.Close()
var rc = &rCounter{
in: src,
}
if !*quiet {
// We only need to count for printing
src = rc
}
if *recomp {
dec := s2.NewReader(src)
src = ioutil.NopCloser(dec)
}

var out io.Writer
switch {
case *stdout:
Expand All @@ -466,11 +505,12 @@ Options:`)
wr.Reset(&wc)
defer wr.Close()
start := time.Now()
input, err := wr.ReadFrom(src)
_, err = wr.ReadFrom(src)
exitErr(err)
err = wr.Close()
exitErr(err)
if !*quiet {
input := rc.n
elapsed := time.Since(start)
mbpersec := (float64(input) / (1024 * 1024)) / (float64(elapsed) / (float64(time.Second)))
pct := float64(wc.n) * 100 / float64(input)
Expand Down Expand Up @@ -608,3 +648,18 @@ func (w *wCounter) Write(p []byte) (n int, err error) {
return n, err

}

type rCounter struct {
n int64
in io.Reader
}

func (w *rCounter) Read(p []byte) (n int, err error) {
n, err = w.in.Read(p)
w.n += int64(n)
return n, err
}

func (w *rCounter) Close() (err error) {
return nil
}