Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,12 @@ r.Table("test").Insert(doc, r.InsertOpts{

As shown above in the Between example optional arguments are passed to the function as a struct. Each function that has optional arguments as a related struct. This structs are named in the format FunctionNameOpts, for example BetweenOpts is the related struct for Between.

#### Cancelling queries

For query cancellation use `Context` argument at `RunOpts`. If `Context` is `nil` and `ReadTimeout` or `WriteTimeout` is not 0 from `ConnectionOpts`, `Context` will be formed by summation of these timeouts.

For unlimited timeouts for `Changes()` pass `context.Background()`.

## Results

Different result types are returned depending on what function is used to execute the query.
Expand Down
13 changes: 7 additions & 6 deletions cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/Sirupsen/logrus"
"github.com/cenkalti/backoff"
"github.com/hailocab/go-hostpool"
"golang.org/x/net/context"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would recommend using the built-in context package.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Most of the libraries use x/net/context, such as grpc-go, I'm not sure about old version of go, It can be difficult to pass x/net/context to context argument.

)

// A Cluster represents a connection to a RethinkDB cluster, a cluster is created
Expand Down Expand Up @@ -57,7 +58,7 @@ func NewCluster(hosts []Host, opts *ConnectOpts) (*Cluster, error) {
}

// Query executes a ReQL query using the cluster to connect to the database
func (c *Cluster) Query(q Query) (cursor *Cursor, err error) {
func (c *Cluster) Query(ctx context.Context, q Query) (cursor *Cursor, err error) {
for i := 0; i < c.numRetries(); i++ {
var node *Node
var hpr hostpool.HostPoolResponse
Expand All @@ -67,7 +68,7 @@ func (c *Cluster) Query(q Query) (cursor *Cursor, err error) {
return nil, err
}

cursor, err = node.Query(q)
cursor, err = node.Query(ctx, q)
hpr.Mark(err)

if !shouldRetryQuery(q, err) {
Expand All @@ -79,7 +80,7 @@ func (c *Cluster) Query(q Query) (cursor *Cursor, err error) {
}

// Exec executes a ReQL query using the cluster to connect to the database
func (c *Cluster) Exec(q Query) (err error) {
func (c *Cluster) Exec(ctx context.Context, q Query) (err error) {
for i := 0; i < c.numRetries(); i++ {
var node *Node
var hpr hostpool.HostPoolResponse
Expand All @@ -89,7 +90,7 @@ func (c *Cluster) Exec(q Query) (err error) {
return err
}

err = node.Exec(q)
err = node.Exec(ctx, q)
hpr.Mark(err)

if !shouldRetryQuery(q, err) {
Expand Down Expand Up @@ -204,7 +205,7 @@ func (c *Cluster) listenForNodeChanges() error {
return fmt.Errorf("Error building query: %s", err)
}

cursor, err := node.Query(q)
cursor, err := node.Query(context.Background(), q) // no need for timeout due to Changes()
if err != nil {
hpr.Mark(err)
return err
Expand Down Expand Up @@ -279,7 +280,7 @@ func (c *Cluster) connectNodes(hosts []Host) error {
continue
}

_, cursor, err := conn.Query(q)
_, cursor, err := conn.Query(nil, q) // nil = connection opts' timeout
if err != nil {
attemptErr = err
Log.Warnf("Error fetching cluster status: %s", err)
Expand Down
88 changes: 57 additions & 31 deletions connection.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"sync/atomic"
"time"

"golang.org/x/net/context"
p "gopkg.in/gorethink/gorethink.v3/ql2"
)

Expand Down Expand Up @@ -104,7 +105,11 @@ func (c *Connection) Close() error {
// Cursor which should be used to view the query's response.
//
// This function is used internally by Run which should be used for most queries.
func (c *Connection) Query(q Query) (*Response, *Cursor, error) {
func (c *Connection) Query(ctx context.Context, q Query) (*Response, *Cursor, error) {
if ctx == nil {
ctx = c.contextFromConnectionOpts()
}

if c == nil {
return nil, nil, ErrConnectionClosed
}
Expand All @@ -131,30 +136,51 @@ func (c *Connection) Query(q Query) (*Response, *Cursor, error) {
}
c.mu.Unlock()

err := c.sendQuery(q)
if err != nil {
return nil, nil, err
}
var response *Response
var cursor *Cursor
var errchan chan error = make(chan error, 1)
go func() {
err := c.sendQuery(q)
if err != nil {
errchan <- err
return
}

if noreply, ok := q.Opts["noreply"]; ok && noreply.(bool) {
return nil, nil, nil
}
if noreply, ok := q.Opts["noreply"]; ok && noreply.(bool) {
errchan <- nil
return
}

for {
response, err := c.readResponse()
if err != nil {
return nil, nil, err
for {
response, err := c.readResponse()
if err != nil {
errchan <- err
return
}

if response.Token == q.Token {
// If this was the requested response process and return
response, cursor, err = c.processResponse(ctx, q, response)
errchan <- err
return
} else if _, ok := c.cursors[response.Token]; ok {
// If the token is in the cursor cache then process the response
c.processResponse(ctx, q, response)
} else {
putResponse(response)
}
}
}()

if response.Token == q.Token {
// If this was the requested response process and return
return c.processResponse(q, response)
} else if _, ok := c.cursors[response.Token]; ok {
// If the token is in the cursor cache then process the response
c.processResponse(q, response)
} else {
putResponse(response)
select {
case err := <-errchan:
return response, cursor, err
case <-ctx.Done():
if q.Type != p.Query_STOP {
stopQuery := newStopQuery(q.Token)
c.Query(c.contextFromConnectionOpts(), stopQuery)
}
return nil, nil, ErrQueryTimeout
}
}

Expand All @@ -167,7 +193,7 @@ type ServerResponse struct {
func (c *Connection) Server() (ServerResponse, error) {
var response ServerResponse

_, cur, err := c.Query(Query{
_, cur, err := c.Query(c.contextFromConnectionOpts(), Query{
Type: p.Query_SERVER_INFO,
})
if err != nil {
Expand Down Expand Up @@ -255,7 +281,7 @@ func (c *Connection) readResponse() (*Response, error) {
return response, nil
}

func (c *Connection) processResponse(q Query, response *Response) (*Response, *Cursor, error) {
func (c *Connection) processResponse(ctx context.Context, q Query, response *Response) (*Response, *Cursor, error) {
switch response.Type {
case p.Response_CLIENT_ERROR:
return c.processErrorResponse(q, response, RQLClientError{rqlServerError{response, q.Term}})
Expand All @@ -264,11 +290,11 @@ func (c *Connection) processResponse(q Query, response *Response) (*Response, *C
case p.Response_RUNTIME_ERROR:
return c.processErrorResponse(q, response, createRuntimeError(response.ErrorType, response, q.Term))
case p.Response_SUCCESS_ATOM, p.Response_SERVER_INFO:
return c.processAtomResponse(q, response)
return c.processAtomResponse(ctx, q, response)
case p.Response_SUCCESS_PARTIAL:
return c.processPartialResponse(q, response)
return c.processPartialResponse(ctx, q, response)
case p.Response_SUCCESS_SEQUENCE:
return c.processSequenceResponse(q, response)
return c.processSequenceResponse(ctx, q, response)
case p.Response_WAIT_COMPLETE:
return c.processWaitResponse(q, response)
default:
Expand All @@ -287,17 +313,17 @@ func (c *Connection) processErrorResponse(q Query, response *Response, err error
return response, cursor, err
}

func (c *Connection) processAtomResponse(q Query, response *Response) (*Response, *Cursor, error) {
func (c *Connection) processAtomResponse(ctx context.Context, q Query, response *Response) (*Response, *Cursor, error) {
// Create cursor
cursor := newCursor(c, "Cursor", response.Token, q.Term, q.Opts)
cursor := newCursor(ctx, c, "Cursor", response.Token, q.Term, q.Opts)
cursor.profile = response.Profile

cursor.extend(response)

return response, cursor, nil
}

func (c *Connection) processPartialResponse(q Query, response *Response) (*Response, *Cursor, error) {
func (c *Connection) processPartialResponse(ctx context.Context, q Query, response *Response) (*Response, *Cursor, error) {
cursorType := "Cursor"
if len(response.Notes) > 0 {
switch response.Notes[0] {
Expand All @@ -318,7 +344,7 @@ func (c *Connection) processPartialResponse(q Query, response *Response) (*Respo
cursor, ok := c.cursors[response.Token]
if !ok {
// Create a new cursor if needed
cursor = newCursor(c, cursorType, response.Token, q.Term, q.Opts)
cursor = newCursor(ctx, c, cursorType, response.Token, q.Term, q.Opts)
cursor.profile = response.Profile

c.cursors[response.Token] = cursor
Expand All @@ -330,12 +356,12 @@ func (c *Connection) processPartialResponse(q Query, response *Response) (*Respo
return response, cursor, nil
}

func (c *Connection) processSequenceResponse(q Query, response *Response) (*Response, *Cursor, error) {
func (c *Connection) processSequenceResponse(ctx context.Context, q Query, response *Response) (*Response, *Cursor, error) {
c.mu.Lock()
cursor, ok := c.cursors[response.Token]
if !ok {
// Create a new cursor if needed
cursor = newCursor(c, "Cursor", response.Token, q.Term, q.Opts)
cursor = newCursor(ctx, c, "Cursor", response.Token, q.Term, q.Opts)
cursor.profile = response.Profile
}

Expand Down
14 changes: 13 additions & 1 deletion connection_helper.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,9 @@
package gorethink

import "encoding/binary"
import (
"encoding/binary"
"golang.org/x/net/context"
)

// Write 'data' to conn
func (c *Connection) writeData(data []byte) error {
Expand Down Expand Up @@ -39,3 +42,12 @@ func (c *Connection) writeQuery(token int64, q []byte) error {

return c.writeData(data)
}

func (c *Connection) contextFromConnectionOpts() context.Context {
sum := c.opts.ReadTimeout + c.opts.WriteTimeout
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am a little worried that this behaviour might be a little unexpected, I think it might make more sense to keep the previous behaviour if no context is passed to a query (I am just trying to think about existing code that might be affected)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Previous behaviour is a quite useless. If server has continuous queries flow, socket's deadline updates every time so that even first query will not timeout never due to this reason. For me it seems like a bug.
And this is unexpected if you don't know that timeouts from config are just passed to socket.
The new behaviour is that every query ends not more than read+write timeout.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok fair enough, I am still a little worried about changing the existing behaviour but hopefully the new behaviour should not cause too many issues.

if sum == 0 {
return context.Background()
}
ctx, _ := context.WithTimeout(context.Background(), sum)
return ctx
}
17 changes: 6 additions & 11 deletions cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"reflect"
"sync"

"golang.org/x/net/context"
"gopkg.in/gorethink/gorethink.v3/encoding"
p "gopkg.in/gorethink/gorethink.v3/ql2"
)
Expand All @@ -16,7 +17,7 @@ var (
errCursorClosed = errors.New("connection closed, cannot read cursor")
)

func newCursor(conn *Connection, cursorType string, token int64, term *Term, opts map[string]interface{}) *Cursor {
func newCursor(ctx context.Context, conn *Connection, cursorType string, token int64, term *Term, opts map[string]interface{}) *Cursor {
if cursorType == "" {
cursorType = "Cursor"
}
Expand All @@ -35,6 +36,7 @@ func newCursor(conn *Connection, cursorType string, token int64, term *Term, opt
opts: opts,
buffer: make([]interface{}, 0),
responses: make([]json.RawMessage, 0),
ctx: ctx,
}

return cursor
Expand Down Expand Up @@ -64,6 +66,7 @@ type Cursor struct {
cursorType string
term *Term
opts map[string]interface{}
ctx context.Context

mu sync.RWMutex
lastErr error
Expand Down Expand Up @@ -145,15 +148,7 @@ func (c *Cursor) Close() error {

// Stop any unfinished queries
if !c.finished {
q := Query{
Type: p.Query_STOP,
Token: c.token,
Opts: map[string]interface{}{
"noreply": true,
},
}

_, _, err = conn.Query(q)
_, _, err = conn.Query(c.ctx, newStopQuery(c.token))
}

if c.releaseConn != nil {
Expand Down Expand Up @@ -552,7 +547,7 @@ func (c *Cursor) fetchMore() error {
}

c.mu.Unlock()
_, _, err = c.conn.Query(q)
_, _, err = c.conn.Query(c.ctx, q)
c.mu.Lock()
}

Expand Down
2 changes: 2 additions & 0 deletions errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ var (
// ErrConnectionClosed is returned when trying to send a query with a closed
// connection.
ErrConnectionClosed = errors.New("gorethink: the connection is closed")
// ErrQueryTimeout is returned when query context deadline exceeded.
ErrQueryTimeout = errors.New("gorethink: query timeout")
)

func printCarrots(t Term, frames []*p.Frame) string {
Expand Down
9 changes: 5 additions & 4 deletions mock.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"sync"
"time"

"golang.org/x/net/context"
p "gopkg.in/gorethink/gorethink.v3/ql2"
)

Expand Down Expand Up @@ -290,7 +291,7 @@ func (m *Mock) IsConnected() bool {
return true
}

func (m *Mock) Query(q Query) (*Cursor, error) {
func (m *Mock) Query(ctx context.Context, q Query) (*Cursor, error) {
found, query := m.findExpectedQuery(q)

if found < 0 {
Expand Down Expand Up @@ -328,7 +329,7 @@ func (m *Mock) Query(q Query) (*Cursor, error) {
}

// Build cursor and return
c := newCursor(nil, "", query.Query.Token, query.Query.Term, query.Query.Opts)
c := newCursor(ctx, nil, "", query.Query.Token, query.Query.Term, query.Query.Opts)
c.finished = true
c.fetching = false
c.isAtom = true
Expand All @@ -345,8 +346,8 @@ func (m *Mock) Query(q Query) (*Cursor, error) {
return c, nil
}

func (m *Mock) Exec(q Query) error {
_, err := m.Query(q)
func (m *Mock) Exec(ctx context.Context, q Query) error {
_, err := m.Query(ctx, q)

return err
}
Expand Down
Loading