-
Notifications
You must be signed in to change notification settings - Fork 910
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Fix race condition in query cancellation #578
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -6,6 +6,8 @@ import ( | |
"context" | ||
"database/sql/driver" | ||
"errors" | ||
"io" | ||
"io/ioutil" | ||
) | ||
|
||
// Implement the "QueryerContext" interface | ||
|
@@ -14,12 +16,12 @@ func (cn *conn) QueryContext(ctx context.Context, query string, args []driver.Na | |
for i, nv := range args { | ||
list[i] = nv.Value | ||
} | ||
closed := cn.watchCancel(ctx) | ||
finish := cn.watchCancel(ctx) | ||
r, err := cn.query(query, list) | ||
if err != nil { | ||
return nil, err | ||
} | ||
r.closed = closed | ||
r.finish = finish | ||
return r, nil | ||
} | ||
|
||
|
@@ -30,8 +32,8 @@ func (cn *conn) ExecContext(ctx context.Context, query string, args []driver.Nam | |
list[i] = nv.Value | ||
} | ||
|
||
if closed := cn.watchCancel(ctx); closed != nil { | ||
defer close(closed) | ||
if finish := cn.watchCancel(ctx); finish != nil { | ||
defer finish() | ||
} | ||
|
||
return cn.Exec(query, list) | ||
|
@@ -49,41 +51,57 @@ func (cn *conn) BeginTx(ctx context.Context, opts driver.TxOptions) (driver.Tx, | |
if err != nil { | ||
return nil, err | ||
} | ||
cn.txnClosed = cn.watchCancel(ctx) | ||
cn.txnFinish = cn.watchCancel(ctx) | ||
return tx, nil | ||
} | ||
|
||
func (cn *conn) watchCancel(ctx context.Context) chan<- struct{} { | ||
func (cn *conn) watchCancel(ctx context.Context) func() { | ||
if done := ctx.Done(); done != nil { | ||
closed := make(chan struct{}) | ||
finished := make(chan struct{}) | ||
go func() { | ||
select { | ||
case <-done: | ||
cn.cancel() | ||
case <-closed: | ||
_ = cn.cancel() | ||
finished <- struct{}{} | ||
case <-finished: | ||
} | ||
}() | ||
return closed | ||
return func() { | ||
select { | ||
case <-finished: | ||
case finished <- struct{}{}: | ||
} | ||
} | ||
} | ||
return nil | ||
} | ||
|
||
func (cn *conn) cancel() { | ||
var err error | ||
can := &conn{} | ||
can.c, err = dial(cn.dialer, cn.opts) | ||
func (cn *conn) cancel() error { | ||
c, err := dial(cn.dialer, cn.opts) | ||
if err != nil { | ||
return | ||
return err | ||
} | ||
can.ssl(cn.opts) | ||
defer c.Close() | ||
|
||
defer can.errRecover(&err) | ||
{ | ||
can := conn{ | ||
c: c, | ||
} | ||
can.ssl(cn.opts) | ||
|
||
w := can.writeBuf(0) | ||
w.int32(80877102) // cancel request code | ||
w.int32(cn.processID) | ||
w.int32(cn.secretKey) | ||
w := can.writeBuf(0) | ||
w.int32(80877102) // cancel request code | ||
w.int32(cn.processID) | ||
w.int32(cn.secretKey) | ||
|
||
can.sendStartupPacket(w) | ||
_ = can.c.Close() | ||
if err := can.sendStartupPacket(w); err != nil { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I don't see how the panics are caught here. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. sendStartupPacket no longer panics, though. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ah, I didn't see that change cuz no reviewable. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yeah, GH reviews are terrible. Pro tip though: you can just navigate to https://reviewable.io/reviews/lib/pq/578. |
||
return err | ||
} | ||
} | ||
|
||
// Read until EOF to ensure that the server received the cancel. | ||
{ | ||
_, err := io.Copy(ioutil.Discard, c) | ||
return err | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why are we returning errors here at all? They are never used. In case we want them in the future and it's just good practice?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, that was my thinking.