Skip to content

Commit

Permalink
feat: Add group by support for commits (#887)
Browse files Browse the repository at this point in the history
* Simplify walkAndFindPlanType

* Make commit field mapping deterministic

The non-deterministic mapping complicates the group-by code when adding support for commits as children and parent mappings will not always match.

* Set commit node docMapper

Was previously declared and unset and unused

* Return consistent type from dagscan

Was sometimes string, sometimes cid and sometimes *cid.  Now it is always *cid

* Add group by support for commits
  • Loading branch information
AndrewSisley committed Oct 12, 2022
1 parent bdd9b03 commit 8f57a7f
Show file tree
Hide file tree
Showing 10 changed files with 270 additions and 43 deletions.
20 changes: 12 additions & 8 deletions query/graphql/mapper/mapper.go
Expand Up @@ -613,17 +613,21 @@ func getTopLevelInfo(
}

if parsed.Name == parserTypes.LinksFieldName {
mapping.SetTypeName(parserTypes.LinksFieldName)

for f := range parserTypes.LinksFields {
mapping.Add(mapping.GetNextIndex(), f)
for i, f := range parserTypes.LinksFields {
mapping.Add(i, f)
}
} else {
mapping.SetTypeName(parserTypes.CommitTypeName)

for f := range parserTypes.VersionFields {
mapping.Add(mapping.GetNextIndex(), f)
// Setting the type name must be done after adding the fields, as
// the typeName index is dynamic, but the field indexes are not
mapping.SetTypeName(parserTypes.LinksFieldName)
} else {
for i, f := range parserTypes.VersionFields {
mapping.Add(i, f)
}

// Setting the type name must be done after adding the fields, as
// the typeName index is dynamic, but the field indexes are not
mapping.SetTypeName(parserTypes.CommitTypeName)
}

return mapping, &client.CollectionDescription{}, nil
Expand Down
12 changes: 12 additions & 0 deletions query/graphql/parser/commit.go
Expand Up @@ -49,6 +49,7 @@ type CommitSelect struct {

Limit *parserTypes.Limit
OrderBy *parserTypes.OrderBy
GroupBy *parserTypes.GroupBy

Fields []Selection
}
Expand All @@ -62,6 +63,7 @@ func (c CommitSelect) ToSelect() *Select {
Alias: c.Alias,
Limit: c.Limit,
OrderBy: c.OrderBy,
GroupBy: c.GroupBy,
Fields: c.Fields,
Root: parserTypes.CommitSelection,
}
Expand Down Expand Up @@ -127,6 +129,16 @@ func parseCommitSelect(field *ast.Field) (*CommitSelect, error) {
return nil, err
}
commit.Depth = client.Some(depth)
} else if prop == parserTypes.GroupByClause {
obj := argument.Value.(*ast.ListValue)
fields := []string{}
for _, v := range obj.Values {
fields = append(fields, v.GetValue().(string))
}

commit.GroupBy = &parserTypes.GroupBy{
Fields: fields,
}
}
}

Expand Down
14 changes: 7 additions & 7 deletions query/graphql/parser/types/types.go
Expand Up @@ -130,14 +130,14 @@ var (
AverageFieldName: {},
}

VersionFields = map[string]struct{}{
HeightFieldName: {},
CidFieldName: {},
DeltaFieldName: {},
VersionFields = []string{
HeightFieldName,
CidFieldName,
DeltaFieldName,
}

LinksFields = map[string]struct{}{
LinksNameFieldName: {},
LinksCidFieldName: {},
LinksFields = []string{
LinksNameFieldName,
LinksCidFieldName,
}
)
16 changes: 12 additions & 4 deletions query/graphql/planner/commit.go
Expand Up @@ -79,6 +79,12 @@ func (n *commitSelectNode) Next() (bool, error) {
}

n.currentValue = n.source.Value()
cid, hasCid := n.docMapper.DocumentMap().FirstOfName(n.currentValue, "cid").(*cid.Cid)
if hasCid {
// dagScanNode yields cids, but we want to yield strings
n.docMapper.DocumentMap().SetFirstOfName(&n.currentValue, "cid", cid.String())
}

return true, nil
}

Expand Down Expand Up @@ -143,8 +149,9 @@ func (p *Planner) commitSelectLatest(parsed *mapper.CommitSelect) (*commitSelect
}

commit := &commitSelectNode{
p: p,
source: dag,
p: p,
source: dag,
docMapper: docMapper{&parsed.DocumentMapping},
}

return commit, nil
Expand Down Expand Up @@ -191,8 +198,9 @@ func (p *Planner) commitSelectAll(parsed *mapper.CommitSelect) (*commitSelectNod
}
// dag.key = &key
commit := &commitSelectNode{
p: p,
source: dag,
p: p,
source: dag,
docMapper: docMapper{&parsed.DocumentMapping},
}

return commit, nil
Expand Down
9 changes: 5 additions & 4 deletions query/graphql/planner/dagscan.go
Expand Up @@ -111,7 +111,7 @@ func (h *headsetScanNode) Next() (bool, error) {
}

h.currentValue = h.parsed.DocumentMapping.NewDoc()
h.parsed.DocumentMapping.SetFirstOfName(&h.currentValue, "cid", *h.cid)
h.parsed.DocumentMapping.SetFirstOfName(&h.currentValue, "cid", h.cid)

return true, nil
}
Expand Down Expand Up @@ -263,11 +263,11 @@ func (n *dagScanNode) Next() (bool, error) {
}

val := n.headset.Value()
cid, ok := n.parsed.DocumentMapping.FirstOfName(val, "cid").(cid.Cid)
cid, ok := n.parsed.DocumentMapping.FirstOfName(val, "cid").(*cid.Cid)
if !ok {
return false, errors.New("Headset scan node returned an invalid cid")
}
n.currentCid = &cid
n.currentCid = cid
// Reset the depthVisited for each head yielded by headset
n.depthVisited = 0
} else if n.cid != nil {
Expand Down Expand Up @@ -367,7 +367,8 @@ All the dagScanNode endpoints use similar structures

func (n *dagScanNode) dagBlockToNodeDoc(block blocks.Block) (core.Doc, []*ipld.Link, error) {
commit := n.parsed.DocumentMapping.NewDoc()
n.parsed.DocumentMapping.SetFirstOfName(&commit, "cid", block.Cid().String())
cid := block.Cid()
n.parsed.DocumentMapping.SetFirstOfName(&commit, "cid", &cid)

// decode the delta, get the priority and payload
nd, err := dag.DecodeProtobuf(block.RawData())
Expand Down
2 changes: 1 addition & 1 deletion query/graphql/planner/multi.go
Expand Up @@ -397,7 +397,7 @@ func (s *selectNode) addSubPlan(fieldIndex int, plan planNode) error {

// source is a mergeNode, like a TypeJoin
case mergeNode:
origScan := s.p.walkAndFindPlanType(plan, &scanNode{}).(*scanNode)
origScan, _ := walkAndFindPlanType[*scanNode](plan)
if origScan == nil {
return errors.New("Failed to find original scan node in plan graph")
}
Expand Down
40 changes: 25 additions & 15 deletions query/graphql/planner/planner.go
Expand Up @@ -13,7 +13,6 @@ package planner
import (
"context"
"fmt"
"reflect"

"github.com/sourcenetwork/defradb/client"
"github.com/sourcenetwork/defradb/core"
Expand Down Expand Up @@ -321,15 +320,26 @@ func (p *Planner) expandTypeIndexJoinPlan(plan *typeIndexJoin, parentPlan *selec
}

func (p *Planner) expandGroupNodePlan(plan *selectTopNode) error {
// Find the first scan node in the plan, we assume that it will be for the correct collection
scanNode := p.walkAndFindPlanType(plan.plan, &scanNode{}).(*scanNode)
var sourceNode planNode
var hasScanNode bool
// Find the first scan node in the plan, we assume that it will be for the correct collection.
// This may be a commit node.
sourceNode, hasScanNode = walkAndFindPlanType[*scanNode](plan.plan)
if !hasScanNode {
commitNode, hasCommitNode := walkAndFindPlanType[*commitSelectNode](plan.plan)
if !hasCommitNode {
return errors.New("Failed to identify group source")
}
sourceNode = commitNode
}

// Check for any existing pipe nodes in the plan, we should use it if there is one
pipe, hasPipe := p.walkAndFindPlanType(plan.plan, &pipeNode{}).(*pipeNode)
pipe, hasPipe := walkAndFindPlanType[*pipeNode](plan.plan)

if !hasPipe {
newPipeNode := newPipeNode(scanNode.DocumentMap())
newPipeNode := newPipeNode(sourceNode.DocumentMap())
pipe = &newPipeNode
pipe.source = scanNode
pipe.source = sourceNode
}

if len(plan.group.childSelects) == 0 {
Expand All @@ -355,7 +365,7 @@ func (p *Planner) expandGroupNodePlan(plan *selectTopNode) error {
dataSource.pipeNode = pipe
}

if err := p.walkAndReplacePlan(plan.group, scanNode, pipe); err != nil {
if err := p.walkAndReplacePlan(plan.group, sourceNode, pipe); err != nil {
return err
}

Expand Down Expand Up @@ -419,20 +429,20 @@ func (p *Planner) walkAndReplacePlan(plan, target, replace planNode) error {
}

// walkAndFindPlanType walks through the plan graph, and returns the first
// instance of a plan, that matches the same type as the target plan
func (p *Planner) walkAndFindPlanType(plan, target planNode) planNode {
// instance of a plan, that matches the given type.
func walkAndFindPlanType[T planNode](plan planNode) (T, bool) {
src := plan
if src == nil {
return nil
var defaultT T
return defaultT, false
}

srcType := reflect.TypeOf(src)
targetType := reflect.TypeOf(target)
if srcType != targetType {
return p.walkAndFindPlanType(plan.Source(), target)
targetType, isTargetType := src.(T)
if !isTargetType {
return walkAndFindPlanType[T](plan.Source())
}

return src
return targetType, true
}

// explainRequest walks through the plan graph, and outputs the concrete planNodes that should
Expand Down
9 changes: 9 additions & 0 deletions query/graphql/schema/generate.go
Expand Up @@ -185,6 +185,8 @@ func (g *Generator) fromAST(ctx context.Context, document *ast.Document) ([]*gql
}
}

appendCommitChildGroupField()

// resolve types
if err := g.manager.ResolveTypes(); err != nil {
return nil, err
Expand Down Expand Up @@ -932,6 +934,13 @@ func (g *Generator) genNumericAggregateBaseArgInputs(obj *gql.Object) *gql.Input
})
}

func appendCommitChildGroupField() {
schemaTypes.CommitObject.Fields()[parserTypes.GroupFieldName] = &gql.FieldDefinition{
Name: parserTypes.GroupFieldName,
Type: gql.NewList(schemaTypes.CommitObject),
}
}

// Given a parsed ast.Node object, lookup the type in the TypeMap and return if its there
// otherwise return an error
// ast.Node, can either be a ast.Named type, a ast.List, or a ast.NonNull.
Expand Down
25 changes: 21 additions & 4 deletions query/graphql/schema/types/commits.go
Expand Up @@ -103,14 +103,31 @@ var (
},
)

commitFields = gql.NewEnum(
gql.EnumConfig{
Name: "commitFields",
Values: gql.EnumValueConfigMap{
"height": &gql.EnumValueConfig{Value: "height"},
"cid": &gql.EnumValueConfig{Value: "cid"},
},
},
)

QueryCommits = &gql.Field{
Name: "commits",
Type: gql.NewList(CommitObject),
Args: gql.FieldConfigArgument{
"dockey": NewArgConfig(gql.ID),
"field": NewArgConfig(gql.String),
"order": NewArgConfig(CommitsOrderArg),
"cid": NewArgConfig(gql.ID),
"dockey": NewArgConfig(gql.ID),
"field": NewArgConfig(gql.String),
"order": NewArgConfig(CommitsOrderArg),
"cid": NewArgConfig(gql.ID),
"groupBy": NewArgConfig(
gql.NewList(
gql.NewNonNull(
commitFields,
),
),
),
parserTypes.LimitClause: NewArgConfig(gql.Int),
parserTypes.OffsetClause: NewArgConfig(gql.Int),
parserTypes.DepthClause: NewArgConfig(gql.Int),
Expand Down

0 comments on commit 8f57a7f

Please sign in to comment.