Skip to content

Commit

Permalink
Serious exception work
Browse files Browse the repository at this point in the history
* Created new PostgresException for PostgreSQL error responses, distinct
  from NpgsqlException which now represents other server-related errors
  (e.g. network issues). Fixes #897.
* Moved network exception catch logic to the read/write buffers, which now
  have the responsibility to break the connector. Should resolve many
  cases where the connector wasn't been properly broken. Fixes #1068.
  • Loading branch information
roji committed May 5, 2016
1 parent 70fcf32 commit 610e319
Show file tree
Hide file tree
Showing 22 changed files with 305 additions and 167 deletions.
2 changes: 1 addition & 1 deletion src/Npgsql/FrontendMessages/PregeneratedMessage.cs
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public override string ToString()

static PregeneratedMessage()
{
_tempBuf = new WriteBuffer(new MemoryStream(), WriteBuffer.MinimumBufferSize, Encoding.ASCII);
_tempBuf = new WriteBuffer(null, new MemoryStream(), WriteBuffer.MinimumBufferSize, Encoding.ASCII);
_tempQuery = new QueryMessage();

BeginTransRepeatableRead = BuildQuery("BEGIN; SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;");
Expand Down
137 changes: 90 additions & 47 deletions src/Npgsql/GeneratedAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -147,8 +147,10 @@
using System.Diagnostics.Contracts;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Runtime.InteropServices;
using System.Text;
using JetBrains.Annotations;
using System.Threading;
using System.Threading.Tasks;
#pragma warning disable
Expand Down Expand Up @@ -185,6 +187,7 @@
using System.Net;
using System.Runtime.InteropServices;
using System.Text;
using JetBrains.Annotations;
using System.Threading;
using System.Threading.Tasks;
#pragma warning disable
Expand Down Expand Up @@ -260,7 +263,7 @@ async Task SendAsync(PopulateMethod populateMethod, CancellationToken cancellati
// through our buffer
if (directBuf.Buffer != null)
{
await _connector.Stream.WriteAsync(directBuf.Buffer, directBuf.Offset, directBuf.Size == 0 ? directBuf.Buffer.Length : directBuf.Size, cancellationToken);
await _connector.WriteBuffer.DirectWriteAsync(directBuf.Buffer, directBuf.Offset, directBuf.Size == 0 ? directBuf.Buffer.Length : directBuf.Size, cancellationToken);
directBuf.Buffer = null;
directBuf.Size = 0;
}
Expand Down Expand Up @@ -435,9 +438,9 @@ async Task RawOpenAsync(NpgsqlTimeout timeout, CancellationToken cancellationTok
await ConnectAsync(timeout, cancellationToken);
Contract.Assert(_socket != null);
_baseStream = new NetworkStream(_socket, true);
Stream = _baseStream;
ReadBuffer = new ReadBuffer(Stream, BufferSize, PGUtil.UTF8Encoding);
WriteBuffer = new WriteBuffer(Stream, BufferSize, PGUtil.UTF8Encoding);
_stream = _baseStream;
ReadBuffer = new ReadBuffer(this, _stream, BufferSize, PGUtil.UTF8Encoding);
WriteBuffer = new WriteBuffer(this, _stream, BufferSize, PGUtil.UTF8Encoding);
if (SslMode == SslMode.Require || SslMode == SslMode.Prefer)
{
Log.Trace("Attempting SSL negotiation");
Expand Down Expand Up @@ -476,26 +479,26 @@ async Task RawOpenAsync(NpgsqlTimeout timeout, CancellationToken cancellationTok

if (!UseSslStream)
{
var sslStream = new TlsClientStream.TlsClientStream(Stream);
var sslStream = new TlsClientStream.TlsClientStream(_stream);
sslStream.PerformInitialHandshake(Host, clientCertificates, certificateValidationCallback, false);
Stream = sslStream;
_stream = sslStream;
}
else
{
var sslStream = new SslStream(Stream, false, certificateValidationCallback);
var sslStream = new SslStream(_stream, false, certificateValidationCallback);
#if NETSTANDARD1_3
// CoreCLR removed sync methods from SslStream, see https://github.com/dotnet/corefx/pull/4868.
// Consider exactly what to do here.
sslStream.AuthenticateAsClientAsync(Host, clientCertificates, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false).Wait();
#else
sslStream.AuthenticateAsClient(Host, clientCertificates, SslProtocols.Tls | SslProtocols.Tls11 | SslProtocols.Tls12, false);
#endif
Stream = sslStream;
_stream = sslStream;
}

timeout.Check();
ReadBuffer.Underlying = Stream;
WriteBuffer.Underlying = Stream;
ReadBuffer.Underlying = _stream;
WriteBuffer.Underlying = _stream;
IsSecure = true;
Log.Trace("SSL negotiation successful");
break;
Expand All @@ -506,18 +509,18 @@ async Task RawOpenAsync(NpgsqlTimeout timeout, CancellationToken cancellationTok
}
catch
{
if (Stream != null)
if (_stream != null)
{
try
{
Stream.Dispose();
_stream.Dispose();
}
catch
{
// ignored
}

Stream = null;
_stream = null;
}

if (_baseStream != null)
Expand Down Expand Up @@ -628,7 +631,7 @@ async Task<IBackendMessage> ReadMessageWithPrependedAsync(CancellationToken canc
}
}
}
catch
catch (PostgresException)
{
Break();
throw;
Expand All @@ -655,11 +658,6 @@ async Task<IBackendMessage> ReadMessageWithPrependedAsync(CancellationToken canc

throw;
}
catch
{
Break();
throw;
}
}

async Task<IBackendMessage> DoReadMessageAsync(CancellationToken cancellationToken, DataRowLoadingMode dataRowLoadingMode = DataRowLoadingMode.NonSequential, bool isPrependedMessage = false)
Expand Down Expand Up @@ -767,7 +765,7 @@ internal async Task<T> ReadExpectingAsync<T>(CancellationToken cancellationToken
internal async Task ReadAsyncMessageAsync(CancellationToken cancellationToken)
{
ReceiveTimeout = UserTimeout;
await ReadBuffer.EnsureAsync(5, cancellationToken);
await ReadBuffer.EnsureAsync(5, cancellationToken, true);
var messageCode = (BackendMessageCode)ReadBuffer.ReadByte();
Contract.Assume(Enum.IsDefined(typeof (BackendMessageCode), messageCode), "Unknown message code: " + messageCode);
var len = ReadBuffer.ReadInt32() - 4; // Transmitted length includes itself
Expand All @@ -783,10 +781,10 @@ internal async Task ReadAsyncMessageAsync(CancellationToken cancellationToken)
// We can get certain asynchronous errors if the remote process is terminated, etc.
// We assume this is fatal.
Break();
throw new NpgsqlException(buf);
throw new PostgresException(buf);
default:
Break();
throw new Exception($"Received unexpected message of type {msg} while waiting for an asynchronous message");
throw new NpgsqlException($"Received unexpected message {msg} while waiting for an asynchronous message");
}
}

Expand Down Expand Up @@ -1463,7 +1461,7 @@ internal async Task<NpgsqlConnector> AllocateAsync(NpgsqlConnection conn, Npgsql

internal partial class ReadBuffer
{
internal async Task EnsureAsync(int count, CancellationToken cancellationToken)
internal async Task EnsureAsync(int count, CancellationToken cancellationToken, bool dontBreakOnTimeouts = false)
{
Contract.Requires(count <= Size);
count -= ReadBytesLeft;
Expand All @@ -1483,17 +1481,28 @@ internal async Task EnsureAsync(int count, CancellationToken cancellationToken)
ReadPosition = 0;
}

while (count > 0)
try
{
var toRead = Size - _filledBytes;
var read = await (Underlying.ReadAsync(_buf, _filledBytes, toRead, cancellationToken));
if (read == 0)
while (count > 0)
{
throw new EndOfStreamException();
var toRead = Size - _filledBytes;
var read = await (Underlying.ReadAsync(_buf, _filledBytes, toRead, cancellationToken));
if (read == 0)
throw new EndOfStreamException();
count -= read;
_filledBytes += read;
}

count -= read;
_filledBytes += read;
}
// We have a special case when reading async notifications - a timeout may be normal
// shouldn't be fatal
catch (IOException e)when (dontBreakOnTimeouts && (e.InnerException as SocketException)?.SocketErrorCode == SocketError.TimedOut)
{
throw new TimeoutException("Timeout while reading from stream");
}
catch (Exception e)
{
Connector.Break();
throw new NpgsqlException("Exception while reading from stream", e);
}
}

Expand All @@ -1513,7 +1522,7 @@ internal async Task<ReadBuffer> EnsureOrAllocateTempAsync(int count, Cancellatio
// Worst case: our buffer isn't big enough. For now, allocate a new buffer
// and copy into it
// TODO: Optimize with a pool later?
var tempBuf = new ReadBuffer(Underlying, count, TextEncoding);
var tempBuf = new ReadBuffer(Connector, Underlying, count, TextEncoding);
CopyTo(tempBuf);
Clear();
await tempBuf.EnsureAsync(count, cancellationToken);
Expand Down Expand Up @@ -1553,21 +1562,23 @@ internal async Task<int> ReadAllBytesAsync(byte[] output, int outputOffset, int
var offset = outputOffset + ReadBytesLeft;
var totalRead = ReadBytesLeft;
Clear();
while (totalRead < len)
try
{
var read = await (Underlying.ReadAsync(output, offset, len - totalRead, cancellationToken));
if (read == 0)
while (totalRead < len)
{
throw new EndOfStreamException();
var read = await (Underlying.ReadAsync(output, offset, len - totalRead, cancellationToken));
if (read == 0)
throw new EndOfStreamException();
totalRead += read;
if (readOnce)
return totalRead;
offset += read;
}

totalRead += read;
if (readOnce)
{
return totalRead;
}

offset += read;
}
catch (Exception e)
{
Connector.Break();
throw new NpgsqlException("Exception while reading from stream", e);
}

return len;
Expand Down Expand Up @@ -1662,16 +1673,48 @@ static async Task<BackendTypes> LoadBackendTypesAsync(NpgsqlConnector connector,

internal partial class WriteBuffer
{
public async Task FlushAsync(CancellationToken cancellationToken)
internal async Task FlushAsync(CancellationToken cancellationToken)
{
if (_writePosition != 0)
{
await Underlying.WriteAsync(_buf, 0, _writePosition, cancellationToken);
await Underlying.FlushAsync(cancellationToken);
try
{
await Underlying.WriteAsync(_buf, 0, _writePosition, cancellationToken);
}
catch (Exception e)
{
Connector.Break();
throw new NpgsqlException("Exception while writing to stream", e);
}

try
{
await Underlying.FlushAsync(cancellationToken);
}
catch (Exception e)
{
Connector.Break();
throw new NpgsqlException("Exception while flushing stream", e);
}

TotalBytesFlushed += _writePosition;
_writePosition = 0;
}
}

internal async Task DirectWriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
{
Contract.Requires(WritePosition == 0);
try
{
await Underlying.WriteAsync(buffer, offset, count, cancellationToken);
}
catch (Exception e)
{
Connector.Break();
throw new NpgsqlException("Exception while writing to stream", e);
}
}
}
}

Expand Down
1 change: 1 addition & 0 deletions src/Npgsql/Npgsql.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@
<Compile Include="Logging\NpgsqlLogManager.cs" />
<Compile Include="Logging\NpgsqlLogLevel.cs" />
<Compile Include="NameTranslation\NpgsqlNullNameTranslator.cs" />
<Compile Include="NpgsqlException.cs" />
<Compile Include="NpgsqlLargeObjectManager.cs" />
<Compile Include="NpgsqlLargeObjectStream.cs" />
<Compile Include="NpgsqlBinaryExporter.cs" />
Expand Down
2 changes: 1 addition & 1 deletion src/Npgsql/NpgsqlBinaryExporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ void ReadHeader()
var headerLen = NpgsqlRawCopyStream.BinarySignature.Length + 4 + 4;
_buf.Ensure(headerLen);
if (NpgsqlRawCopyStream.BinarySignature.Any(t => _buf.ReadByte() != t)) {
throw new Exception("Invalid COPY binary signature at beginning!");
throw new NpgsqlException("Invalid COPY binary signature at beginning!");
}
var flags = _buf.ReadInt32();
if (flags != 0) {
Expand Down
4 changes: 2 additions & 2 deletions src/Npgsql/NpgsqlBinaryImporter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ void DoWrite<T>(TypeHandler handler, [CanBeNull] T value)
_buf.WriteInt32(len + 4);
_buf.Flush();
_writingDataMsg = false;
_buf.Underlying.Write(directBuf.Buffer, directBuf.Offset, len);
_buf.DirectWrite(directBuf.Buffer, directBuf.Offset, len);
directBuf.Buffer = null;
directBuf.Size = 0;
}
Expand Down Expand Up @@ -336,7 +336,7 @@ public void Cancel()
var msg = _connector.ReadMessage(DataRowLoadingMode.NonSequential);
// The CopyFail should immediately trigger an exception from the read above.
_connector.Break();
throw new Exception("Expected ErrorResponse when cancelling COPY but got: " + msg.Code);
throw new NpgsqlException("Expected ErrorResponse when cancelling COPY but got: " + msg.Code);
} catch (PostgresException e) {
if (e.SqlState == "57014") { return; }
throw;
Expand Down
4 changes: 2 additions & 2 deletions src/Npgsql/NpgsqlCommand.cs
Original file line number Diff line number Diff line change
Expand Up @@ -653,7 +653,7 @@ void Send(PopulateMethod populateMethod)
// through our buffer
if (directBuf.Buffer != null)
{
_connector.Stream.Write(directBuf.Buffer, directBuf.Offset, directBuf.Size == 0 ? directBuf.Buffer.Length : directBuf.Size);
_connector.WriteBuffer.DirectWrite(directBuf.Buffer, directBuf.Offset, directBuf.Size == 0 ? directBuf.Buffer.Length : directBuf.Size);
directBuf.Buffer = null;
directBuf.Size = 0;
}
Expand Down Expand Up @@ -694,7 +694,7 @@ async Task SendRemaining(PopulateMethod populateMethod, CancellationToken cancel
// through our buffer
if (directBuf.Buffer != null)
{
await _connector.Stream.WriteAsync(directBuf.Buffer, directBuf.Offset,
await _connector.WriteBuffer.DirectWriteAsync(directBuf.Buffer, directBuf.Offset,
directBuf.Size == 0 ? directBuf.Buffer.Length : directBuf.Size, cancellationToken);
directBuf.Buffer = null;
directBuf.Size = 0;
Expand Down
12 changes: 6 additions & 6 deletions src/Npgsql/NpgsqlConnection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1171,13 +1171,13 @@ public bool Wait(int timeout)
using (Connector.StartUserAction(ConnectorState.Waiting))
{
Connector.UserTimeout = timeout;
try {
try
{
Connector.ReadAsyncMessage();
} catch (IOException e) {
var socketException = e.InnerException as SocketException;
if (socketException?.SocketErrorCode == SocketError.TimedOut)
return false;
throw;
}
catch (TimeoutException)
{
return false;
}
}
return true;
Expand Down
Loading

4 comments on commit 610e319

@oguzyildizz
Copy link

@oguzyildizz oguzyildizz commented on 610e319 May 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd be really happy if you guys also used the protected DbException(string message, int errorCode); constructor in the new NpgSql class :)

Still, thanks for throwing DbException instead!

@roji
Copy link
Member Author

@roji roji commented on 610e319 May 6, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oguzyildiz1991 the problem with that constructor is that it's impossible to pass both an inner exceptions and an error code.

When an inner exception is assigned (e.g. network errors) it seems like an error code is less important - you can simply check the inner exception to understand the error. But for the few other cases where an NpgsqlException is thrown without an inner exception, I can see there might be a need to programmatically identify it...

Do you have a specific exception/scenario that's bothering you?

@oguzyildizz
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, it's the SSL exception.. Normally we catch it by the error codes in other drivers, but this one threw an InvalidOperationException. So I had to check with the error message (even though being afraid of the localization possibility in the future) but then it returned the same error message in different casing: "Ssl connection requested. No Ssl enabled connection from this host is configured."
So it would be nice if it had an error code too, so we wouldn't have to watch out for localization :)

@roji
Copy link
Member Author

@roji roji commented on 610e319 May 8, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@oguzyildiz1991 I've opened #1072 to track your request. Unfortunately I don't think this will make it in time for 3.1...

Please sign in to comment.