Skip to content

Commit

Permalink
feat(post-processing): add support for Riegeli input files (#3223)
Browse files Browse the repository at this point in the history
  • Loading branch information
schroederc committed Nov 6, 2018
1 parent e1fe01a commit 4035f93
Show file tree
Hide file tree
Showing 4 changed files with 57 additions and 6 deletions.
1 change: 1 addition & 0 deletions kythe/go/serving/pipeline/beamio/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ go_library(
],
deps = [
"//kythe/go/storage/stream",
"//kythe/go/util/riegeli",
"//kythe/proto:storage_go_proto",
"@com_github_apache_beam//sdks/go/pkg/beam:go_default_library",
"@com_github_apache_beam//sdks/go/pkg/beam/core/runtime/exec:go_default_library",
Expand Down
57 changes: 53 additions & 4 deletions kythe/go/serving/pipeline/beamio/entries.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,10 @@ package beamio

import (
"context"
"io"

"kythe.io/kythe/go/storage/stream"
"kythe.io/kythe/go/util/riegeli"

"github.com/apache/beam/sdks/go/pkg/beam"
"github.com/apache/beam/sdks/go/pkg/beam/io/filesystem"
Expand All @@ -28,16 +30,39 @@ import (
)

func init() {
beam.RegisterFunction(readRiegeli)
beam.RegisterFunction(readStream)
}

// ReadEntries reads a delimited stream of *spb.Entry messages into a
// PCollection from the given file. The file can be part of any filesystem
// registered with the beam/io/filesystem package.
func ReadEntries(s beam.Scope, file string) beam.PCollection {
// ReadEntries reads a set of *spb.Entry messages into a PCollection from the
// given file. The file can be part of any filesystem registered with the
// beam/io/filesystem package and can either be a delimited protobuf stream or a
// Riegeli file.
func ReadEntries(ctx context.Context, s beam.Scope, file string) beam.PCollection {
if p := tryReadRiegeli(ctx, s, file); p.IsValid() {
return p
}
return beam.ParDo(s, readStream, beam.Create(s, file))
}

func tryReadRiegeli(ctx context.Context, s beam.Scope, file string) (coll beam.PCollection) {
fs, err := filesystem.New(ctx, file)
if err != nil {
return
}
defer fs.Close()
f, err := fs.OpenRead(ctx, file)
if err != nil {
return
}
defer f.Close()
rd := riegeli.NewReader(f)
if _, err := rd.RecordsMetadata(); err != nil {
return
}
return beam.ParDo(s, readRiegeli, beam.Create(s, file))
}

func readStream(ctx context.Context, filename string, emit func(*spb.Entry)) error {
fs, err := filesystem.New(ctx, filename)
if err != nil {
Expand All @@ -53,3 +78,27 @@ func readStream(ctx context.Context, filename string, emit func(*spb.Entry)) err
}
return fs.Close()
}

func readRiegeli(ctx context.Context, filename string, emit func(*spb.Entry)) error {
fs, err := filesystem.New(ctx, filename)
if err != nil {
return err
}
defer fs.Close()
f, err := fs.OpenRead(ctx, filename)
if err != nil {
return err
}
defer f.Close()

rd := riegeli.NewReader(f)
for {
var e spb.Entry
if err := rd.NextProto(&e); err == io.EOF {
return nil
} else if err != nil {
return err
}
emit(&e)
}
}
3 changes: 2 additions & 1 deletion kythe/go/serving/pipeline/beamio/entries_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package beamio

import (
"context"
"io/ioutil"
"os"
"testing"
Expand Down Expand Up @@ -65,7 +66,7 @@ func TestReadEntries(t *testing.T) {

p, s := beam.NewPipelineWithRoot()

coll := ReadEntries(s, f.Name())
coll := ReadEntries(context.Background(), s, f.Name())

var found []*spb.Entry
beam.ParDo(s, func(e *spb.Entry, emit func(*spb.Entry)) { found = append(found, e) }, coll)
Expand Down
2 changes: 1 addition & 1 deletion kythe/go/serving/tools/write_tables/write_tables.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,7 +157,7 @@ func runExperimentalBeamPipeline(ctx context.Context) error {
}

p, s := beam.NewPipelineWithRoot()
entries := beamio.ReadEntries(s, *entriesFile)
entries := beamio.ReadEntries(ctx, s, *entriesFile)
k := pipeline.FromEntries(s, entries)
shards := 128 // TODO(schroederc): better determine number of shards
if *experimentalColumnarData {
Expand Down

0 comments on commit 4035f93

Please sign in to comment.