Skip to content

Commit

Permalink
Execute prepared commands comprising a single statement.
Browse files Browse the repository at this point in the history
Signed-off-by: Bradley Grainger <bgrainger@gmail.com>
  • Loading branch information
bgrainger committed Jul 24, 2018
1 parent 8f84081 commit a52beb8
Show file tree
Hide file tree
Showing 8 changed files with 575 additions and 15 deletions.
5 changes: 5 additions & 0 deletions docs/content/tutorials/migrating-from-connector-net.md
Expand Up @@ -128,3 +128,8 @@ The following bugs in Connector/NET are fixed by switching to MySqlConnector.
* [#89335](https://bugs.mysql.com/bug.php?id=89335): `MySqlCommandBuilder.DeriveParameters` fails for `JSON` type
* [#91123](https://bugs.mysql.com/bug.php?id=91123): Database names are case-sensitive when calling a stored procedure
* [#91199](https://bugs.mysql.com/bug.php?id=91199): Can't insert `MySqlDateTime` values
* [#91751](https://bugs.mysql.com/bug.php?id=91751): `YEAR` column retrieved incorrectly with prepared command
* [#91752](https://bugs.mysql.com/bug.php?id=91752): `00:00:00` is converted to `NULL` with prepared command
* [#91753](https://bugs.mysql.com/bug.php?id=91753): Unnamed parameter not supported by `MySqlCommand.Prepare`
* [#91754](https://bugs.mysql.com/bug.php?id=91754): Inserting 16MiB `BLOB` shifts it by four bytes when prepared
* [#91770](https://bugs.mysql.com/bug.php?id=91770): `TIME(n)` column loses microseconds with prepared command
23 changes: 23 additions & 0 deletions src/MySqlConnector/Core/PreparedStatement.cs
@@ -0,0 +1,23 @@
using MySqlConnector.Protocol.Payloads;

namespace MySqlConnector.Core
{
/// <summary>
/// <see cref="PreparedStatement"/> is a statement that has been prepared on the MySQL Server.
/// </summary>
internal sealed class PreparedStatement
{
public PreparedStatement(int statementId, ParsedStatement statement, ColumnDefinitionPayload[] columns, ColumnDefinitionPayload[] parameters)
{
StatementId = statementId;
Statement = statement;
Columns = columns;
Parameters = parameters;
}

public int StatementId { get; }
public ParsedStatement Statement { get; }
public ColumnDefinitionPayload[] Columns { get; }
public ColumnDefinitionPayload[] Parameters { get; }
}
}
109 changes: 109 additions & 0 deletions src/MySqlConnector/Core/PreparedStatementCommandExecutor.cs
@@ -0,0 +1,109 @@
using System;
using System.Data;
using System.Data.Common;
using System.IO;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using MySql.Data.MySqlClient;
using MySqlConnector.Logging;
using MySqlConnector.Protocol;
using MySqlConnector.Protocol.Serialization;
using MySqlConnector.Utilities;

namespace MySqlConnector.Core
{
internal sealed class PreparedStatementCommandExecutor : ICommandExecutor
{
public PreparedStatementCommandExecutor(MySqlCommand command)
{
m_command = command;
}

public async Task<DbDataReader> ExecuteReaderAsync(string commandText, MySqlParameterCollection parameterCollection, CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested();
if (Log.IsDebugEnabled())
Log.Debug("Session{0} ExecuteBehavior {1} CommandText: {2}", m_command.Connection.Session.Id, ioBehavior, commandText);
using (var payload = CreateQueryPayload(m_command.PreparedStatements[0], parameterCollection, m_command.Connection.GuidFormat))
using (m_command.RegisterCancel(cancellationToken))
{
m_command.Connection.Session.StartQuerying(m_command);
m_command.LastInsertedId = -1;
try
{
await m_command.Connection.Session.SendAsync(payload, ioBehavior, CancellationToken.None).ConfigureAwait(false);
return await MySqlDataReader.CreateAsync(m_command, behavior, ResultSetProtocol.Binary, ioBehavior).ConfigureAwait(false);
}
catch (MySqlException ex) when (ex.Number == (int) MySqlErrorCode.QueryInterrupted && cancellationToken.IsCancellationRequested)
{
Log.Warn("Session{0} query was interrupted", m_command.Connection.Session.Id);
throw new OperationCanceledException(cancellationToken);
}
catch (Exception ex) when (payload.ArraySegment.Count > 4_194_304 && (ex is SocketException || ex is IOException || ex is MySqlProtocolException))
{
// the default MySQL Server value for max_allowed_packet (in MySQL 5.7) is 4MiB: https://dev.mysql.com/doc/refman/5.7/en/server-system-variables.html#sysvar_max_allowed_packet
// use "decimal megabytes" (to round up) when creating the exception message
int megabytes = payload.ArraySegment.Count / 1_000_000;
throw new MySqlException("Error submitting {0}MB packet; ensure 'max_allowed_packet' is greater than {0}MB.".FormatInvariant(megabytes), ex);
}
}
}

private PayloadData CreateQueryPayload(PreparedStatement preparedStatement, MySqlParameterCollection parameterCollection, MySqlGuidFormat guidFormat)
{
var writer = new ByteBufferWriter();
writer.Write((byte) CommandKind.StatementExecute);
writer.Write(preparedStatement.StatementId);
writer.Write((byte) 0);
writer.Write(1);
if (preparedStatement.Parameters?.Length > 0)
{
// TODO: How to handle incorrect number of parameters?

// build subset of parameters for this statement
var parameters = new MySqlParameter[preparedStatement.Statement.ParameterNames.Count];
for (var i = 0; i < preparedStatement.Statement.ParameterNames.Count; i++)
{
var parameterName = preparedStatement.Statement.ParameterNames[i];
var parameterIndex = parameterName != null ? parameterCollection.NormalizedIndexOf(parameterName) : preparedStatement.Statement.ParameterIndexes[i];
parameters[i] = parameterCollection[parameterIndex];
}

// write null bitmap
byte nullBitmap = 0;
for (var i = 0; i < parameters.Length; i++)
{
var parameter = parameters[i];
if (parameter.Value == null || parameter.Value == DBNull.Value)
{
if (i > 0 && i % 8 == 0)
{
writer.Write(nullBitmap);
nullBitmap = 0;
}

nullBitmap |= (byte) (1 << (i % 8));
}
}
writer.Write(nullBitmap);

// write "new parameters bound" flag
writer.Write((byte) 1);

foreach (var parameter in parameters)
writer.Write(TypeMapper.ConvertToColumnTypeAndFlags(parameter.MySqlDbType, guidFormat));

var options = m_command.CreateStatementPreparerOptions();
foreach (var parameter in parameters)
parameter.AppendBinary(writer, options);
}

return writer.ToPayloadData();
}

static IMySqlConnectorLogger Log { get; } = MySqlConnectorLogManager.CreateLogger(nameof(PreparedStatementCommandExecutor));

readonly MySqlCommand m_command;
}
}
112 changes: 97 additions & 15 deletions src/MySqlConnector/MySql.Data.MySqlClient/MySqlCommand.cs
@@ -1,9 +1,12 @@
using System;
using System.Collections.Generic;
using System.Data;
using System.Data.Common;
using System.Threading;
using System.Threading.Tasks;
using MySqlConnector.Core;
using MySqlConnector.Protocol;
using MySqlConnector.Protocol.Payloads;
using MySqlConnector.Protocol.Serialization;
using MySqlConnector.Utilities;

Expand Down Expand Up @@ -62,14 +65,85 @@ public new MySqlParameterCollection Parameters

public new MySqlDataReader ExecuteReader(CommandBehavior commandBehavior) => (MySqlDataReader) base.ExecuteReader(commandBehavior);

public override void Prepare()
public override void Prepare() => PrepareAsync(IOBehavior.Synchronous, default).GetAwaiter().GetResult();
public Task PrepareAsync() => PrepareAsync(AsyncIOBehavior, default);
public Task PrepareAsync(CancellationToken cancellationToken) => PrepareAsync(AsyncIOBehavior, cancellationToken);

private async Task PrepareAsync(IOBehavior ioBehavior, CancellationToken cancellationToken)
{
if (Connection == null)
throw new InvalidOperationException("Connection property must be non-null.");
if (Connection.State != ConnectionState.Open)
throw new InvalidOperationException("Connection must be Open; current state is {0}".FormatInvariant(Connection.State));
if (string.IsNullOrWhiteSpace(CommandText))
throw new InvalidOperationException("CommandText must be specified");
if (m_connection?.HasActiveReader ?? false)
throw new InvalidOperationException("Cannot call Prepare when there is an open DataReader for this command; it must be closed first.");
if (Connection.IgnorePrepare)
return;

if (CommandType != CommandType.Text)
throw new NotSupportedException("Only CommandType.Text is currently supported by MySqlCommand.Prepare");

var statementPreparer = new StatementPreparer(CommandText, Parameters, CreateStatementPreparerOptions());
var parsedStatements = statementPreparer.SplitStatements();

if (parsedStatements.Statements.Count > 1)
throw new NotSupportedException("Multiple semicolon-delimited SQL statements are not supported by MySqlCommand.Prepare");

var columnsAndParameters = new ResizableArray<byte>();
var columnsAndParametersSize = 0;

var preparedStatements = new List<PreparedStatement>(parsedStatements.Statements.Count);
foreach (var statement in parsedStatements.Statements)
{
await Connection.Session.SendAsync(new PayloadData(statement.StatementBytes), ioBehavior, cancellationToken).ConfigureAwait(false);
var payload = await Connection.Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
var response = StatementPrepareResponsePayload.Create(payload);

ColumnDefinitionPayload[] parameters = null;
if (response.ParameterCount > 0)
{
parameters = new ColumnDefinitionPayload[response.ParameterCount];
for (var i = 0; i < response.ParameterCount; i++)
{
payload = await Connection.Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
Utility.Resize(ref columnsAndParameters, columnsAndParametersSize + payload.ArraySegment.Count);
Buffer.BlockCopy(payload.ArraySegment.Array, payload.ArraySegment.Offset, columnsAndParameters.Array, columnsAndParametersSize, payload.ArraySegment.Count);
parameters[i] = ColumnDefinitionPayload.Create(new ResizableArraySegment<byte>(columnsAndParameters, columnsAndParametersSize, payload.ArraySegment.Count));
columnsAndParametersSize += payload.ArraySegment.Count;
}
if (!Connection.Session.SupportsDeprecateEof)
{
payload = await Connection.Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
EofPayload.Create(payload);
}
}

ColumnDefinitionPayload[] columns = null;
if (response.ColumnCount > 0)
{
columns = new ColumnDefinitionPayload[response.ColumnCount];
for (var i = 0; i < response.ColumnCount; i++)
{
payload = await Connection.Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
Utility.Resize(ref columnsAndParameters, columnsAndParametersSize + payload.ArraySegment.Count);
Buffer.BlockCopy(payload.ArraySegment.Array, payload.ArraySegment.Offset, columnsAndParameters.Array, columnsAndParametersSize, payload.ArraySegment.Count);
columns[i] = ColumnDefinitionPayload.Create(new ResizableArraySegment<byte>(columnsAndParameters, columnsAndParametersSize, payload.ArraySegment.Count));
columnsAndParametersSize += payload.ArraySegment.Count;
}
if (!Connection.Session.SupportsDeprecateEof)
{
payload = await Connection.Session.ReceiveReplyAsync(ioBehavior, cancellationToken).ConfigureAwait(false);
EofPayload.Create(payload);
}
}

preparedStatements.Add(new PreparedStatement(response.StatementId, statement, columns, parameters));
}

m_parsedStatements = parsedStatements;
m_statements = preparedStatements;
}

public override string CommandText
Expand All @@ -80,6 +154,7 @@ public override string CommandText
if (m_connection?.HasActiveReader ?? false)
throw new InvalidOperationException("Cannot set MySqlCommand.CommandText when there is an open DataReader for this command; it must be closed first.");
m_commandText = value;
m_statements = null;
}
}

Expand All @@ -104,22 +179,12 @@ public override int CommandTimeout

public override CommandType CommandType
{
get
{
return m_commandType;
}
get => m_commandType;
set
{
if (value != CommandType.Text && value != CommandType.StoredProcedure)
throw new ArgumentException("CommandType must be Text or StoredProcedure.", nameof(value));
if (value == m_commandType)
return;

m_commandType = value;
if (value == CommandType.Text)
m_commandExecutor = new TextCommandExecutor(this);
else if (value == CommandType.StoredProcedure)
m_commandExecutor = new StoredProcedureCommandExecutor(this);
}
}

Expand Down Expand Up @@ -203,9 +268,20 @@ protected override Task<DbDataReader> ExecuteDbDataReaderAsync(CommandBehavior b
return ExecuteReaderAsync(behavior, AsyncIOBehavior, cancellationToken);
}

internal Task<DbDataReader> ExecuteReaderAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken) =>
!IsValid(out var exception) ? Utility.TaskFromException<DbDataReader>(exception) :
m_commandExecutor.ExecuteReaderAsync(CommandText, m_parameterCollection, behavior, ioBehavior, cancellationToken);
internal Task<DbDataReader> ExecuteReaderAsync(CommandBehavior behavior, IOBehavior ioBehavior, CancellationToken cancellationToken)
{
if (!IsValid(out var exception))
return Utility.TaskFromException<DbDataReader>(exception);

if (m_statements != null)
m_commandExecutor = new PreparedStatementCommandExecutor(this);
else if (m_commandType == CommandType.Text)
m_commandExecutor = new TextCommandExecutor(this);
else if (m_commandType == CommandType.StoredProcedure)
m_commandExecutor = new StoredProcedureCommandExecutor(this);

return m_commandExecutor.ExecuteReaderAsync(CommandText, m_parameterCollection, behavior, ioBehavior, cancellationToken);
}

protected override void Dispose(bool disposing)
{
Expand All @@ -214,6 +290,8 @@ protected override void Dispose(bool disposing)
if (disposing)
{
m_parameterCollection = null;
m_parsedStatements?.Dispose();
m_parsedStatements = null;
}
}
finally
Expand Down Expand Up @@ -243,6 +321,8 @@ internal IDisposable RegisterCancel(CancellationToken token)

internal int CancelAttemptCount { get; set; }

internal IReadOnlyList<PreparedStatement> PreparedStatements => m_statements;

/// <summary>
/// Causes the effective command timeout to be reset back to the value specified by <see cref="CommandTimeout"/>.
/// </summary>
Expand Down Expand Up @@ -324,6 +404,8 @@ private bool IsValid(out Exception exception)
MySqlConnection m_connection;
string m_commandText;
MySqlParameterCollection m_parameterCollection;
ParsedStatements m_parsedStatements;
IReadOnlyList<PreparedStatement> m_statements;
int? m_commandTimeout;
CommandType m_commandType;
ICommandExecutor m_commandExecutor;
Expand Down

0 comments on commit a52beb8

Please sign in to comment.