Skip to content

Commit

Permalink
fix(post-processing): remove anchors from edge/xref relations (#3198)
Browse files Browse the repository at this point in the history
  • Loading branch information
schroederc committed Oct 30, 2018
1 parent 81a3c8b commit b81ef3a
Show file tree
Hide file tree
Showing 3 changed files with 22 additions and 19 deletions.
11 changes: 9 additions & 2 deletions kythe/go/serving/pipeline/beam.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ func init() {
beam.RegisterFunction(edgeTargets)
beam.RegisterFunction(edgeToCrossRefRelation)
beam.RegisterFunction(fileToDecorPiece)
beam.RegisterFunction(filterAnchorNodes)
beam.RegisterFunction(groupCrossRefs)
beam.RegisterFunction(groupEdges)
beam.RegisterFunction(keyByPath)
Expand Down Expand Up @@ -134,7 +135,6 @@ func (k *KytheBeam) SplitCrossReferences() beam.PCollection {
))

// TODO(schroederc): need to add definitions to related nodes
// TODO(schroederc): remove "anchor" edges
relations := beam.ParDo(s, edgeToCrossRefRelation, k.edgeRelations())

return beam.ParDo(s, encodeCrossRef, beam.Flatten(s,
Expand Down Expand Up @@ -621,7 +621,7 @@ func (k *KytheBeam) edgeRelations() beam.PCollection {
if !k.edges.IsValid() {
s := k.s.Scope("Relations")

nodeEdges := beam.ParDo(s, &nodes.Filter{IncludeFacts: []string{}}, k.nodes)
nodeEdges := beam.Seq(s, k.nodes, filterAnchorNodes, &nodes.Filter{IncludeFacts: []string{}})
sourceNodes := beam.ParDo(s, moveSourceToKey, k.nodes)

targetNodes := beam.ParDo(s, encodeEdgeTarget, beam.CoGroupByKey(s,
Expand All @@ -646,6 +646,13 @@ func (k *KytheBeam) SplitEdges() beam.PCollection {
return beam.ParDo(s, encodeEdgesEntry, beam.Flatten(s, idx, k.edgeRelations()))
}

func filterAnchorNodes(n *scpb.Node, emit func(*scpb.Node)) {
if n.GetKytheKind() == scpb.NodeKind_ANCHOR {
return
}
emit(n)
}

func edgeTargets(n *scpb.Node, emit func(*scpb.Edge)) {
for _, e := range n.Edge {
emit(&scpb.Edge{Source: n.Source, Target: e.Target})
Expand Down
15 changes: 2 additions & 13 deletions kythe/go/serving/pipeline/beam_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -578,11 +578,6 @@ func TestServingSimpleEdges(t *testing.T) {
TargetTicket: "kythe:#interface2",
}},
},
"%" + edges.Ref: {
Edge: []*gpb.EdgeSet_Group_Edge{{
TargetTicket: "kythe:?path=path#anchor1",
}},
},
"%" + edges.ChildOf: {
Edge: []*gpb.EdgeSet_Group_Edge{{
TargetTicket: "kythe:#child",
Expand All @@ -607,11 +602,6 @@ func TestServingSimpleEdges(t *testing.T) {
TargetTicket: "kythe:#interface2",
}},
},
"%" + edges.Ref: {
Edge: []*gpb.EdgeSet_Group_Edge{{
TargetTicket: "kythe:?path=path#anchor1",
}},
},
"%" + edges.ChildOf: {
Edge: []*gpb.EdgeSet_Group_Edge{{
TargetTicket: "kythe:#child",
Expand All @@ -621,9 +611,8 @@ func TestServingSimpleEdges(t *testing.T) {
},
},
Nodes: map[string]*cpb.NodeInfo{
ticket: &cpb.NodeInfo{Facts: map[string][]byte{facts.NodeKind: []byte(nodes.Record)}},
"kythe:?path=path#anchor1": &cpb.NodeInfo{Facts: map[string][]byte{facts.NodeKind: []byte(nodes.Anchor)}},
"kythe:#child": &cpb.NodeInfo{Facts: map[string][]byte{facts.NodeKind: []byte(nodes.Record)}},
ticket: &cpb.NodeInfo{Facts: map[string][]byte{facts.NodeKind: []byte(nodes.Record)}},
"kythe:#child": &cpb.NodeInfo{Facts: map[string][]byte{facts.NodeKind: []byte(nodes.Record)}},
},
}))
}
Expand Down
15 changes: 11 additions & 4 deletions kythe/go/serving/pipeline/beamio/leveldb.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ func WriteLevelDB(s beam.Scope, path string, numShards int, tables ...beam.PColl
filesystem.ValidateScheme(path)
s = s.Scope("WriteLevelDB")

tableMetadata := writeShards(s, path, numShards, tables...)

// Write all SSTable metadata to the LevelDB's MANIFEST journal.
s = s.Scope("Manifest")
beam.ParDo(s, &writeManifest{Path: path}, beam.GroupByKey(s, beam.AddFixedKey(s, tableMetadata)))
}

func writeShards(s beam.Scope, path string, numShards int, tables ...beam.PCollection) beam.PCollection {
s = s.Scope("Shards")

// Encode each PCollection of KVs into ([]byte, []byte) key-values (*keyValue)
// and flatten all entries into a single PCollection.
var encodings []beam.PCollection
Expand All @@ -68,10 +78,7 @@ func WriteLevelDB(s beam.Scope, path string, numShards int, tables ...beam.PColl

// Write each shard to a separate SSTable. The resulting PCollection contains
// each SSTable's metadata (*tableMetadata).
tableMetadata := beam.ParDo(s, &writeTable{path}, shards)

// Write all SSTable metadata to the LevelDB's MANIFEST journal.
beam.ParDo(s, &writeManifest{Path: path}, beam.GroupByKey(s, beam.AddFixedKey(s, tableMetadata)))
return beam.ParDo(s, &writeTable{path}, shards)
}

type writeManifest struct{ Path string }
Expand Down

0 comments on commit b81ef3a

Please sign in to comment.