Skip to content

Commit

Permalink
Optimize connector reset
Browse files Browse the repository at this point in the history
When persistent prepared statement exist, instead of sending DISCARD ALL
to reset the connector we send a series of specific reset commands. These
were previously done as multiple extended protocol messages, switching to
a single multi-statement simple Query message yields a substantial
performance increase.

Relates to #483
  • Loading branch information
roji committed Sep 27, 2016
1 parent dabc881 commit 4b6f0cd
Show file tree
Hide file tree
Showing 4 changed files with 74 additions and 56 deletions.
65 changes: 21 additions & 44 deletions src/Npgsql/FrontendMessages/PregeneratedMessage.cs
Expand Up @@ -65,48 +65,33 @@ internal override void WriteFully(WriteBuffer buf)

public override string ToString() => _description ?? "[?]";

static readonly WriteBuffer _tempBuf;
static readonly QueryMessage _tempQuery;

static PregeneratedMessage()
{
_tempBuf = new WriteBuffer(null, new MemoryStream(), WriteBuffer.MinimumBufferSize, Encoding.ASCII);
_tempQuery = new QueryMessage(PGUtil.UTF8Encoding);

BeginTrans = BuildQuery("BEGIN");
SetTransRepeatableRead = BuildQuery("SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
SetTransSerializable = BuildQuery("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
SetTransReadCommitted = BuildQuery("SET TRANSACTION ISOLATION LEVEL READ COMMITTED");
SetTransReadUncommitted = BuildQuery("SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
CommitTransaction = BuildQuery("COMMIT");
RollbackTransaction = BuildQuery("ROLLBACK");
KeepAlive = BuildQuery("SELECT NULL");

DiscardAll = BuildQuery("DISCARD ALL");

ResetSessionAuthorization = BuildQuery("SET SESSION AUTHORIZATION DEFAULT");
ResetAll = BuildQuery("RESET ALL");
CloseAll = BuildQuery("CLOSE ALL");
UnlistenAll = BuildQuery("UNLISTEN *");
AdvisoryUnlockAll = BuildQuery("SELECT pg_advisory_unlock_all()", 3);
DiscardTemp = BuildQuery("DISCARD TEMP");
DiscardSequences = BuildQuery("DISCARD SEQUENCES");

_tempBuf = null;
_tempQuery = null;
var buf = new WriteBuffer(null, new MemoryStream(), WriteBuffer.MinimumBufferSize, Encoding.ASCII);
var message = new QueryMessage(PGUtil.UTF8Encoding);

BeginTrans = Generate(buf, message, "BEGIN");
SetTransRepeatableRead = Generate(buf, message, "SET TRANSACTION ISOLATION LEVEL REPEATABLE READ");
SetTransSerializable = Generate(buf, message, "SET TRANSACTION ISOLATION LEVEL SERIALIZABLE");
SetTransReadCommitted = Generate(buf, message, "SET TRANSACTION ISOLATION LEVEL READ COMMITTED");
SetTransReadUncommitted = Generate(buf, message, "SET TRANSACTION ISOLATION LEVEL READ UNCOMMITTED");
CommitTransaction = Generate(buf, message, "COMMIT");
RollbackTransaction = Generate(buf, message, "ROLLBACK");
KeepAlive = Generate(buf, message, "SELECT NULL");

DiscardAll = Generate(buf, message, "DISCARD ALL");
}

static PregeneratedMessage BuildQuery(string query, int responseMessageCount=2)
internal static PregeneratedMessage Generate(WriteBuffer buf, QueryMessage queryMessage, string query, int responseMessageCount=2)
{
Debug.Assert(buf.WritePosition == 0);
Debug.Assert(query != null && query.All(c => c < 128));

var totalLen = 5 + query.Length;
var ms = new MemoryStream(totalLen);
_tempBuf.Underlying = ms;
_tempQuery.Populate(query);
_tempQuery.Write(_tempBuf);
_tempBuf.Flush();
return new PregeneratedMessage(ms.ToArray(), _tempQuery.ToString(), responseMessageCount);
queryMessage.Populate(query);
var description = queryMessage.ToString();
queryMessage.Write(buf);
var bytes = buf.GetContents();
buf.Clear();
return new PregeneratedMessage(bytes, description, responseMessageCount);
}

internal static readonly PregeneratedMessage BeginTrans;
Expand All @@ -119,13 +104,5 @@ static PregeneratedMessage BuildQuery(string query, int responseMessageCount=2)
internal static readonly PregeneratedMessage KeepAlive;

internal static readonly PregeneratedMessage DiscardAll;

internal static readonly PregeneratedMessage ResetSessionAuthorization;
internal static readonly PregeneratedMessage ResetAll;
internal static readonly PregeneratedMessage CloseAll;
internal static readonly PregeneratedMessage UnlistenAll;
internal static readonly PregeneratedMessage AdvisoryUnlockAll;
internal static readonly PregeneratedMessage DiscardTemp;
internal static readonly PregeneratedMessage DiscardSequences;
}
}
1 change: 1 addition & 0 deletions src/Npgsql/GeneratedAsync.cs
Expand Up @@ -446,6 +446,7 @@ internal async Task OpenAsync(NpgsqlTimeout timeout, CancellationToken cancellat
await WriteBuffer.FlushAsync(cancellationToken);
timeout.Check();
await HandleAuthenticationAsync(username, timeout, cancellationToken);
GenerateResetMessage();
await TypeHandlerRegistry.SetupAsync(this, timeout, cancellationToken);
Counters.HardConnectsPerSecond.Increment();
Log.Debug($"Opened connection to {Host}:{Port}", Id);
Expand Down
53 changes: 41 additions & 12 deletions src/Npgsql/NpgsqlConnector.cs
Expand Up @@ -246,6 +246,9 @@ int ReceiveTimeout
// ParseMessage and QueryMessage depend on the encoding, which isn't known until open-time
internal ParseMessage ParseMessage;
internal QueryMessage QueryMessage;
// The reset message depends on the server version, which isn't known until open-time
[CanBeNull]
PregeneratedMessage _resetWithoutDeallocateMessage;

// Backend
readonly CommandCompleteMessage _commandCompleteMessage = new CommandCompleteMessage();
Expand All @@ -254,6 +257,7 @@ int ReceiveTimeout
readonly DataRowSequentialMessage _dataRowSequentialMessage = new DataRowSequentialMessage();
readonly DataRowNonSequentialMessage _dataRowNonSequentialMessage = new DataRowNonSequentialMessage();


// Since COPY is rarely used, allocate these lazily
CopyInResponseMessage _copyInResponseMessage;
CopyOutResponseMessage _copyOutResponseMessage;
Expand Down Expand Up @@ -407,6 +411,7 @@ internal void Open(NpgsqlTimeout timeout)
timeout.Check();

HandleAuthentication(username, timeout);
GenerateResetMessage();
TypeHandlerRegistry.Setup(this, timeout);
Counters.HardConnectsPerSecond.Increment();
Log.Debug($"Opened connection to {Host}:{Port}", Id);
Expand Down Expand Up @@ -1457,6 +1462,41 @@ void Cleanup()
}
}

void GenerateResetMessage()
{
var sb = new StringBuilder("SET SESSION AUTHORIZATION DEFAULT;RESET ALL;");
var responseMessages = 2;
if (SupportsCloseAll)
{
sb.Append("CLOSE ALL;");
responseMessages++;
}
if (SupportsUnlisten)
{
sb.Append("UNLISTEN *;");
responseMessages++;
}
if (SupportsAdvisoryLocks)
{
sb.Append("SELECT pg_advisory_unlock_all();");
responseMessages += 2;
}
if (SupportsDiscardSequences)
{
sb.Append("DISCARD SEQUENCES;");
responseMessages++;
}
if (SupportsDiscardTemp)
{
sb.Append("DISCARD TEMP");
responseMessages++;
}

responseMessages++; // One ReadyForQuery at the end

_resetWithoutDeallocateMessage = PregeneratedMessage.Generate(WriteBuffer, QueryMessage, sb.ToString(), responseMessages);
}

/// <summary>
/// Called when a pooled connection is closed, and its connector is returned to the pool.
/// Resets the connector back to its initial state, releasing server-side sources
Expand Down Expand Up @@ -1518,18 +1558,7 @@ internal void Reset()
{
// We have persistent prepared statements, so we can't reset the connection state with DISCARD ALL
// Note: the send buffer has been cleared above, and we assume all this will fit in it.
PrependInternalMessage(PregeneratedMessage.ResetSessionAuthorization);
PrependInternalMessage(PregeneratedMessage.ResetAll);
if (SupportsCloseAll)
PrependInternalMessage(PregeneratedMessage.CloseAll);
if (SupportsUnlisten)
PrependInternalMessage(PregeneratedMessage.UnlistenAll);
if (SupportsAdvisoryLocks)
PrependInternalMessage(PregeneratedMessage.AdvisoryUnlockAll);
if (SupportsDiscardSequences)
PrependInternalMessage(PregeneratedMessage.DiscardSequences);
if (SupportsDiscardTemp)
PrependInternalMessage(PregeneratedMessage.DiscardTemp);
PrependInternalMessage(_resetWithoutDeallocateMessage);

// This needs to come last, because it can produce an arbitrary number of messages, possibly sending
// them now if they are larger than the buffer.
Expand Down
11 changes: 11 additions & 0 deletions src/Npgsql/WriteBuffer.cs
Expand Up @@ -342,6 +342,17 @@ internal void Clear()
WritePosition = 0;
}

/// <summary>
/// Returns all contents currently written to the buffer (but not flushed).
/// Useful for pregenerating messages.
/// </summary>
internal byte[] GetContents()
{
var buf = new byte[_writePosition];
Array.Copy(_buf, buf, _writePosition);
return buf;
}

internal void ResetTotalBytesFlushed()
{
TotalBytesFlushed = 0;
Expand Down

0 comments on commit 4b6f0cd

Please sign in to comment.