@@ -36,6 +36,7 @@ var Client = function (config) {
3636 this . _connecting = false
3737 this . _connected = false
3838 this . _connectionError = false
39+ this . _queryable = true
3940
4041 this . connection = c . connection || new Connection ( {
4142 stream : c . stream ,
@@ -52,16 +53,31 @@ var Client = function (config) {
5253
5354util . inherits ( Client , EventEmitter )
5455
55- Client . prototype . connect = function ( callback ) {
56+ Client . prototype . _errorAllQueries = function ( err ) {
57+ const enqueueError = ( query ) => {
58+ process . nextTick ( ( ) => {
59+ query . handleError ( err , this . connection )
60+ } )
61+ }
62+
63+ if ( this . activeQuery ) {
64+ enqueueError ( this . activeQuery )
65+ this . activeQuery = null
66+ }
67+
68+ this . queryQueue . forEach ( enqueueError )
69+ this . queryQueue . length = 0
70+ }
71+
72+ Client . prototype . _connect = function ( callback ) {
5673 var self = this
5774 var con = this . connection
5875 if ( this . _connecting || this . _connected ) {
5976 const err = new Error ( 'Client has already been connected. You cannot reuse a client.' )
60- if ( callback ) {
77+ process . nextTick ( ( ) => {
6178 callback ( err )
62- return undefined
63- }
64- return Promise . reject ( err )
79+ } )
80+ return
6581 }
6682 this . _connecting = true
6783
@@ -126,15 +142,25 @@ Client.prototype.connect = function (callback) {
126142 }
127143
128144 const connectedErrorHandler = ( err ) => {
129- if ( this . activeQuery ) {
130- var activeQuery = self . activeQuery
131- this . activeQuery = null
132- return activeQuery . handleError ( err , con )
133- }
145+ this . _queryable = false
146+ this . _errorAllQueries ( err )
134147 this . emit ( 'error' , err )
135148 }
136149
150+ const connectedErrorMessageHandler = ( msg ) => {
151+ const activeQuery = this . activeQuery
152+
153+ if ( ! activeQuery ) {
154+ connectedErrorHandler ( msg )
155+ return
156+ }
157+
158+ this . activeQuery = null
159+ activeQuery . handleError ( msg , con )
160+ }
161+
137162 con . on ( 'error' , connectingErrorHandler )
163+ con . on ( 'errorMessage' , connectingErrorHandler )
138164
139165 // hook up query handling events to connection
140166 // after the connection initially becomes ready for queries
@@ -143,7 +169,9 @@ Client.prototype.connect = function (callback) {
143169 self . _connected = true
144170 self . _attachListeners ( con )
145171 con . removeListener ( 'error' , connectingErrorHandler )
172+ con . removeListener ( 'errorMessage' , connectingErrorHandler )
146173 con . on ( 'error' , connectedErrorHandler )
174+ con . on ( 'errorMessage' , connectedErrorMessageHandler )
147175
148176 // process possible callback argument to Client#connect
149177 if ( callback ) {
@@ -166,43 +194,53 @@ Client.prototype.connect = function (callback) {
166194 } )
167195
168196 con . once ( 'end' , ( ) => {
169- if ( this . activeQuery ) {
170- var disconnectError = new Error ( 'Connection terminated' )
171- this . activeQuery . handleError ( disconnectError , con )
172- this . activeQuery = null
173- }
197+ const error = this . _ending
198+ ? new Error ( 'Connection terminated' )
199+ : new Error ( 'Connection terminated unexpectedly' )
200+
201+ this . _errorAllQueries ( error )
202+
174203 if ( ! this . _ending ) {
175204 // if the connection is ended without us calling .end()
176205 // on this client then we have an unexpected disconnection
177206 // treat this as an error unless we've already emitted an error
178207 // during connection.
179- const error = new Error ( 'Connection terminated unexpectedly' )
180208 if ( this . _connecting && ! this . _connectionError ) {
181209 if ( callback ) {
182210 callback ( error )
183211 } else {
184- this . emit ( 'error' , error )
212+ connectedErrorHandler ( error )
185213 }
186214 } else if ( ! this . _connectionError ) {
187- this . emit ( 'error' , error )
215+ connectedErrorHandler ( error )
188216 }
189217 }
190- this . emit ( 'end' )
218+
219+ process . nextTick ( ( ) => {
220+ this . emit ( 'end' )
221+ } )
191222 } )
192223
193224 con . on ( 'notice' , function ( msg ) {
194225 self . emit ( 'notice' , msg )
195226 } )
227+ }
196228
197- if ( ! callback ) {
198- return new global . Promise ( ( resolve , reject ) => {
199- this . once ( 'error' , reject )
200- this . once ( 'connect' , ( ) => {
201- this . removeListener ( 'error' , reject )
229+ Client . prototype . connect = function ( callback ) {
230+ if ( callback ) {
231+ this . _connect ( callback )
232+ return
233+ }
234+
235+ return new Promise ( ( resolve , reject ) => {
236+ this . _connect ( ( error ) => {
237+ if ( error ) {
238+ reject ( error )
239+ } else {
202240 resolve ( )
203- } )
241+ }
204242 } )
205- }
243+ } )
206244}
207245
208246Client . prototype . _attachListeners = function ( con ) {
@@ -340,7 +378,15 @@ Client.prototype._pulseQueryQueue = function () {
340378 if ( this . activeQuery ) {
341379 this . readyForQuery = false
342380 this . hasExecuted = true
343- this . activeQuery . submit ( this . connection )
381+
382+ const queryError = this . activeQuery . submit ( this . connection )
383+ if ( queryError ) {
384+ process . nextTick ( ( ) => {
385+ this . activeQuery . handleError ( queryError , this . connection )
386+ this . readyForQuery = true
387+ this . _pulseQueryQueue ( )
388+ } )
389+ }
344390 } else if ( this . hasExecuted ) {
345391 this . activeQuery = null
346392 this . emit ( 'drain' )
@@ -379,25 +425,40 @@ Client.prototype.query = function (config, values, callback) {
379425 query . _result . _getTypeParser = this . _types . getTypeParser . bind ( this . _types )
380426 }
381427
428+ if ( ! this . _queryable ) {
429+ process . nextTick ( ( ) => {
430+ query . handleError ( new Error ( 'Client has encountered a connection error and is not queryable' ) , this . connection )
431+ } )
432+ return result
433+ }
434+
435+ if ( this . _ending ) {
436+ process . nextTick ( ( ) => {
437+ query . handleError ( new Error ( 'Client was closed and is not queryable' ) , this . connection )
438+ } )
439+ return result
440+ }
441+
382442 this . queryQueue . push ( query )
383443 this . _pulseQueryQueue ( )
384444 return result
385445}
386446
387447Client . prototype . end = function ( cb ) {
388448 this . _ending = true
449+
389450 if ( this . activeQuery ) {
390451 // if we have an active query we need to force a disconnect
391452 // on the socket - otherwise a hung query could block end forever
392- this . connection . stream . destroy ( new Error ( 'Connection terminated by user' ) )
393- return cb ? cb ( ) : Promise . resolve ( )
453+ this . connection . stream . destroy ( )
454+ } else {
455+ this . connection . end ( )
394456 }
457+
395458 if ( cb ) {
396- this . connection . end ( )
397459 this . connection . once ( 'end' , cb )
398460 } else {
399- return new global . Promise ( ( resolve , reject ) => {
400- this . connection . end ( )
461+ return new Promise ( ( resolve ) => {
401462 this . connection . once ( 'end' , resolve )
402463 } )
403464 }
0 commit comments