Skip to content

Commit

Permalink
WIP: add nats datasource (#136)
Browse files Browse the repository at this point in the history
* initial impl for nats data source

* fix null data value

* fix escaping problem with multiline comments in the sdl

* add rootField to BaseDataSource

* add addr and topic literals

* update nats datasource to use directive definitions

* add logging to nats data source

* fix dependency injection

* add missing copy value

* add cleanup and additional logging

* fix err logging
  • Loading branch information
jensneuse committed Dec 6, 2019
1 parent 8d622f1 commit 42d7dc4
Show file tree
Hide file tree
Showing 13 changed files with 760 additions and 36 deletions.
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/gobwas/pool v0.2.0 // indirect
github.com/gobwas/ws v1.0.2
github.com/jensneuse/diffview v1.0.0
github.com/nats-io/nats.go v1.9.1
github.com/pkg/errors v0.8.1 // indirect
github.com/sebdah/goldie v0.0.0-20180424091453-8784dd1ab561
github.com/stretchr/testify v1.4.0 // indirect
Expand Down
10 changes: 10 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG
github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0=
github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE=
github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y=
github.com/nats-io/jwt v0.3.0 h1:xdnzwFETV++jNc4W1mw//qFyJGb2ABOombmZJQS4+Qo=
github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg=
github.com/nats-io/nats.go v1.9.1 h1:ik3HbLhZ0YABLto7iX80pZLPw/6dx3T+++MZJwLnMrQ=
github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w=
github.com/nats-io/nkeys v0.1.0 h1:qMd4+pRHgdr1nAClu+2h/2a5F2TmKcCzjCDazVgRoX4=
github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w=
github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw=
github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c=
github.com/pelletier/go-toml v1.2.0 h1:T5zMGML61Wp+FlcbWjRDT7yAxhJNAiPPLOFECq181zc=
github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/94hg7ilaic=
github.com/pkg/errors v0.8.1 h1:iURUrRGxPUNPdy5/HRSm+Yj6okJ6UtLINN0Q9M4+h3I=
Expand Down Expand Up @@ -110,6 +118,8 @@ go.uber.org/zap v1.11.0/go.mod h1:vwi/ZaCAaUcBkycHslxD9B2zi4UTXhF60s6SWpuDF0Q=
golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20190621222207-cc06ce4a13d4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4 h1:HuIa8hRrWRSrqYzx1qI49NNxhdi2PrY7gxVSq1JjLDc=
golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/net v0.0.0-20190311183353-d8887717615a/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
Expand Down
1 change: 1 addition & 0 deletions pkg/execution/a_execution-packr.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

18 changes: 18 additions & 0 deletions pkg/execution/datasource.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,4 +75,22 @@ type BaseDataSourcePlanner struct {
definition, operation *ast.Document // nolint
args []Argument // nolint
graphqlDefinitions *packr.Box // nolint
rootField rootField // nolint
}

type rootField struct {
isDefined bool
ref int
}

func (r *rootField) setIfNotDefined(ref int){
if r.isDefined {
return
}
r.isDefined = true
r.ref = ref
}

func (r *rootField) isDefinedAndEquals(ref int) bool {
return r.isDefined && r.ref == ref
}
178 changes: 178 additions & 0 deletions pkg/execution/datasource_nats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
package execution

import (
"github.com/jensneuse/graphql-go-tools/pkg/ast"
"github.com/jensneuse/graphql-go-tools/pkg/astvisitor"
"github.com/jensneuse/graphql-go-tools/pkg/lexer/literal"
"github.com/nats-io/nats.go"
"go.uber.org/zap"
"io"
"sync"
"time"
)

type NatsDataSourcePlanner struct {
BaseDataSourcePlanner
}

func NewNatsDataSourcePlanner(baseDataSourcePlanner BaseDataSourcePlanner) *NatsDataSourcePlanner {
return &NatsDataSourcePlanner{
BaseDataSourcePlanner: baseDataSourcePlanner,
}
}

func (n *NatsDataSourcePlanner) DirectiveName() []byte {
return []byte("NatsDataSource")
}

func (n *NatsDataSourcePlanner) DirectiveDefinition() []byte {
data, _ := n.graphqlDefinitions.Find("directives/nats_datasource.graphql")
return data
}

func (n *NatsDataSourcePlanner) Plan() (DataSource, []Argument) {
return &NatsDataSource{
log: n.log,
}, n.args
}

func (n *NatsDataSourcePlanner) Initialize(walker *astvisitor.Walker, operation, definition *ast.Document, args []Argument, resolverParameters []ResolverParameter) {
n.walker, n.operation, n.definition, n.args = walker, operation, definition, args
}

func (n *NatsDataSourcePlanner) EnterInlineFragment(ref int) {

}

func (n *NatsDataSourcePlanner) LeaveInlineFragment(ref int) {

}

func (n *NatsDataSourcePlanner) EnterSelectionSet(ref int) {

}

func (n *NatsDataSourcePlanner) LeaveSelectionSet(ref int) {

}

func (n *NatsDataSourcePlanner) EnterField(ref int) {
n.rootField.setIfNotDefined(ref)
}

func (n *NatsDataSourcePlanner) LeaveField(ref int) {
if !n.rootField.isDefinedAndEquals(ref) {
return
}
definition, exists := n.walker.FieldDefinition(ref)
if !exists {
return
}
directive, exists := n.definition.FieldDefinitionDirectiveByName(definition, n.DirectiveName())
if !exists {
return
}
value, exists := n.definition.DirectiveArgumentValueByName(directive, literal.ADDR)
if !exists {
return
}
variableValue := n.definition.StringValueContentBytes(value.Ref)
arg := &StaticVariableArgument{
Name: literal.ADDR,
Value: make([]byte, len(variableValue)),
}
copy(arg.Value, variableValue)
n.args = append(n.args, arg)

value, exists = n.definition.DirectiveArgumentValueByName(directive, literal.TOPIC)
if !exists {
return
}
variableValue = n.definition.StringValueContentBytes(value.Ref)
arg = &StaticVariableArgument{
Name: literal.TOPIC,
Value: make([]byte, len(variableValue)),
}
copy(arg.Value, variableValue)
n.args = append(n.args, arg)
}

type NatsDataSource struct {
log *zap.Logger
conn *nats.Conn
sub *nats.Subscription
once sync.Once
}

func (n *NatsDataSource) Resolve(ctx Context, args ResolvedArgs, out io.Writer) Instruction {
var err error
n.once.Do(func() {

addrArg := args.ByKey(literal.ADDR)
topicArg := args.ByKey(literal.TOPIC)

addr := nats.DefaultURL
topic := string(topicArg)

if len(addrArg) != 0 {
addr = string(addrArg)
}

go func() {
<-ctx.Done()
if n.sub != nil {
n.log.Debug("NatsDataSource.unsubscribing",
zap.String("addr", addr),
zap.String("topic", topic),
)
err := n.sub.Unsubscribe()
if err != nil {
n.log.Error("Unsubscribe", zap.Error(err))
}
}
if n.conn != nil {
n.log.Debug("NatsDataSource.closing",
zap.String("addr", addr),
zap.String("topic", topic),
)
n.conn.Close()
}
}()

n.log.Debug("NatsDataSource.connecting",
zap.String("addr", addr),
zap.String("topic", topic),
)

n.conn, err = nats.Connect(addr)
if err != nil {
panic(err)
}

n.log.Debug("NatsDataSource.subscribing",
zap.String("addr", addr),
zap.String("topic", topic),
)

n.sub, err = n.conn.SubscribeSync(topic)
if err != nil {
panic(err)
}
})

if err != nil {
return CloseConnection
}

message, err := n.sub.NextMsg(time.Minute)
if err != nil {
return CloseConnection
}

_, err = out.Write(message.Data)
if err != nil {
return CloseConnection
}

return KeepStreamAlive
}
31 changes: 24 additions & 7 deletions pkg/execution/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,15 +158,14 @@ func (e *Executor) resolveNode(node Node, data []byte, path string, prefetch *sy
data = unsafebytes.StringToBytes(result.Raw)
}

if e.err == jsonparser.KeyPathNotFoundError {
e.err = nil
if len(data) == 0 || bytes.Equal(data, literal.NULL) {
e.write(literal.NULL)
return
}
if node.QuoteValue {
e.write(literal.QUOTE)
}
e.write(data)
e.write(e.escape(data))
if node.QuoteValue {
e.write(literal.QUOTE)
}
Expand All @@ -183,10 +182,6 @@ func (e *Executor) resolveNode(node Node, data []byte, path string, prefetch *sy
}
}

/*_, e.err = jsonparser.ArrayEach(data, func(value []byte, dataType jsonparser.ValueType, offset int, err error) {
listItems = append(listItems, value)
}, node.Path...)*/

var result []gjson.Result
if node.PathSelector.Path == "" {
result = gjson.ParseBytes(data).Array()
Expand Down Expand Up @@ -240,6 +235,28 @@ func (e *Executor) resolveNode(node Node, data []byte, path string, prefetch *sy
}
}

func (e *Executor) escape(data []byte) []byte {
if !e.dataNeedsEscaping(data) {
return data
}
out, err := json.Marshal(unsafebytes.BytesToString(data))
if err != nil {
panic(err)
}
out = out[1 : len(out)-1]
return out
}

func (e *Executor) dataNeedsEscaping(data []byte) bool {
for i := range data {
switch data[i] {
case runes.BACKSLASH, runes.TAB, runes.SPACE, runes.LINETERMINATOR:
return true
}
}
return false
}

func (e *Executor) ResolveArgs(args []Argument, data []byte) ResolvedArgs {

args = append(args, e.context.ExtraArguments...)
Expand Down
Loading

0 comments on commit 42d7dc4

Please sign in to comment.