From 1b54c2810f419f8d59fc9368b799a0c4a5c5d97b Mon Sep 17 00:00:00 2001 From: takapi327 Date: Sun, 26 May 2024 23:56:45 +0900 Subject: [PATCH 1/8] Added executeLargeUpdate --- .../scala/ldbc/sql/PreparedStatement.scala | 22 ++++++ .../src/main/scala/ldbc/sql/Statement.scala | 70 +++++++++++++++++++ 2 files changed, 92 insertions(+) diff --git a/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala b/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala index fe351e548..b9b5851e0 100644 --- a/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala +++ b/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala @@ -33,6 +33,11 @@ trait PreparedStatement[F[_]] extends Statement[F]: "This method cannot be called on a PreparedStatement." ) + @deprecated("This method cannot be called on a PreparedStatement.", "0.3.0") + override def executeLargeUpdate(sql: String): F[Long] = throw new UnsupportedOperationException( + "This method cannot be called on a PreparedStatement." + ) + @deprecated("This method cannot be called on a PreparedStatement.", "0.3.0") override def execute(sql: String): F[Boolean] = throw new UnsupportedOperationException( "This method cannot be called on a PreparedStatement." @@ -258,3 +263,20 @@ trait PreparedStatement[F[_]] extends Statement[F]: * Adds a set of parameters to this PreparedStatement object's batch of commands. */ def addBatch(): F[Unit] + + /** + * Executes the SQL statement in this PreparedStatement object, + * which must be an SQL Data Manipulation Language (DML) statement, + * such as INSERT, UPDATE or + * DELETE; or an SQL statement that returns nothing, + * such as a DDL statement. + *

+ * This method should be used when the returned row count may exceed + * {@link Integer#MAX_VALUE}. + *

+ * The default implementation will throw {@code UnsupportedOperationException} + * + * @return either (1) the row count for SQL Data Manipulation Language + * (DML) statements or (2) 0 for SQL statements that return nothing + */ + def executeLargeUpdate(): F[Long] diff --git a/module/ldbc-sql/src/main/scala/ldbc/sql/Statement.scala b/module/ldbc-sql/src/main/scala/ldbc/sql/Statement.scala index b38d7246f..0664edfe3 100644 --- a/module/ldbc-sql/src/main/scala/ldbc/sql/Statement.scala +++ b/module/ldbc-sql/src/main/scala/ldbc/sql/Statement.scala @@ -268,6 +268,76 @@ trait Statement[F[_]]: */ def isClosed(): F[Boolean] + /** + * Retrieves the current result as an update count; if the result + * is a ResultSet object or there are no more results, -1 + * is returned. This method should be called only once per result. + *

+ * This method should be used when the returned row count may exceed + * {@link Integer#MAX_VALUE}. + *

+ * The default implementation will throw {@code UnsupportedOperationException} + * + * @return the current result as an update count; -1 if the current result + * is a ResultSet object or there are no more results + */ + def getLargeUpdateCount(): F[Long] + + /** + * Executes the given SQL statement, which may be an INSERT, + * UPDATE, or DELETE statement or an + * SQL statement that returns nothing, such as an SQL DDL statement. + *

+ * This method should be used when the returned row count may exceed + * {@link Integer#MAX_VALUE}. + *

+ * Note:This method cannot be called on a + * PreparedStatement or CallableStatement. + *

+ * The default implementation will throw {@code UnsupportedOperationException} + * + * @param sql an SQL Data Manipulation Language (DML) statement, + * such as INSERT, UPDATE or + * DELETE; or an SQL statement that returns nothing, + * such as a DDL statement. + * + * @return either (1) the row count for SQL Data Manipulation Language + * (DML) statements or (2) 0 for SQL statements that return nothing + */ + def executeLargeUpdate(sql: String): F[Long] + + /** + * Executes the given SQL statement and signals the driver with the + * given flag about whether the + * auto-generated keys produced by this Statement object + * should be made available for retrieval. The driver will ignore the + * flag if the SQL statement + * is not an INSERT statement, or an SQL statement able to return + * auto-generated keys (the list of such statements is vendor-specific). + *

+ * This method should be used when the returned row count may exceed + * {@link Integer#MAX_VALUE}. + *

+ * Note:This method cannot be called on a + * PreparedStatement or CallableStatement. + *

+ * The default implementation will throw {@code SQLFeatureNotSupportedException} + * + * @param sql an SQL Data Manipulation Language (DML) statement, + * such as INSERT, UPDATE or + * DELETE; or an SQL statement that returns nothing, + * such as a DDL statement. + * + * @param autoGeneratedKeys a flag indicating whether auto-generated keys + * should be made available for retrieval; + * one of the following constants: + * Statement.RETURN_GENERATED_KEYS + * Statement.NO_GENERATED_KEYS + * @return either (1) the row count for SQL Data Manipulation Language (DML) statements + * or (2) 0 for SQL statements that return nothing + */ + def executeLargeUpdate(sql: String, autoGeneratedKeys: Int): F[Long] + object Statement: /** From f019e35f7292aa67b3cda33135a60001ae244d5a Mon Sep 17 00:00:00 2001 From: takapi327 Date: Sun, 26 May 2024 23:57:19 +0900 Subject: [PATCH 2/8] Implement executeLargeUpdate for ldbc connector --- .../net/protocol/CallableStatementImpl.scala | 24 +++++++++---------- .../protocol/ClientPreparedStatement.scala | 8 +++---- .../protocol/ServerPreparedStatement.scala | 6 ++--- .../protocol/SharedPreparedStatement.scala | 2 ++ .../net/protocol/StatementImpl.scala | 13 +++++++--- 5 files changed, 31 insertions(+), 22 deletions(-) diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala index 1528d77c3..7968a95ef 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala @@ -97,10 +97,10 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( } } <* params.set(ListMap.empty) - override def executeUpdate(): F[Int] = + override def executeLargeUpdate(): F[Long] = checkClosed() *> checkNullOrEmptyQuery(sql) *> - exchange[F, Int]("statement") { (span: Span[F]) => + exchange[F, Long]("statement") { (span: Span[F]) => if sql.toUpperCase.startsWith("CALL") then executeCallStatement(span).flatMap { resultSets => resultSets.headOption match @@ -108,15 +108,15 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( for lastColumnReadNullable <- Ref[F].of(true) resultSetCurrentCursor <- Ref[F].of(0) - resultSetCurrentRow <- Ref[F].of[Option[ResultSetRowPacket]](None) + resultSetCurrentRow <- Ref[F].of[Option[ResultSetRowPacket]](None) resultSet = ResultSetImpl.empty( - serverVariables, - protocol.initialPacket.serverVersion, - resultSetClosed, - lastColumnReadNullable, - resultSetCurrentCursor, - resultSetCurrentRow - ) + serverVariables, + protocol.initialPacket.serverVersion, + resultSetClosed, + lastColumnReadNullable, + resultSetCurrentCursor, + resultSetCurrentRow + ) _ <- currentResultSet.set(Some(resultSet)) yield resultSet case Some(resultSet) => @@ -131,9 +131,9 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( ))* ) *> sendQuery(buildQuery(sql, params)).flatMap { - case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows.toInt) + case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) - case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) + case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } } } diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala index 5c0735e49..78c615642 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala @@ -125,8 +125,8 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( } <* params.set(ListMap.empty) } - override def executeUpdate(): F[Int] = - checkClosed() *> checkNullOrEmptyQuery(sql) *> exchange[F, Int]("statement") { (span: Span[F]) => + override def executeLargeUpdate(): F[Long] = + checkClosed() *> checkNullOrEmptyQuery(sql) *> exchange[F, Long]("statement") { (span: Span[F]) => params.get.flatMap { params => span.addAttributes( (attributes ++ List( @@ -139,9 +139,9 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( ComQueryPacket(buildQuery(sql, params), protocol.initialPacket.capabilityFlags, ListMap.empty) ) *> protocol.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)).flatMap { - case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows.toInt) + case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) - case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) + case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } } <* params.set(ListMap.empty) } diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala index c66754da7..1704cd239 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala @@ -112,8 +112,8 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( yield resultSet } - override def executeUpdate(): F[Int] = - checkClosed() *> checkNullOrEmptyQuery(sql) *> exchange[F, Int]("statement") { (span: Span[F]) => + override def executeLargeUpdate(): F[Long] = + checkClosed() *> checkNullOrEmptyQuery(sql) *> exchange[F, Long]("statement") { (span: Span[F]) => params.get.flatMap { params => span.addAttributes( (attributes ++ List( @@ -124,7 +124,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( protocol.resetSequenceId *> protocol.send(ComStmtExecutePacket(statementId, params)) *> protocol.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)).flatMap { - case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows.toInt) + case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/SharedPreparedStatement.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/SharedPreparedStatement.scala index fc190c56b..63354ef22 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/SharedPreparedStatement.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/SharedPreparedStatement.scala @@ -86,6 +86,8 @@ private[ldbc] trait SharedPreparedStatement[F[_]: Temporal] setTimestamp(parameterIndex, value.asInstanceOf[LocalDateTime]) case unknown => throw new SQLException(s"Unsupported object type ${ unknown.getClass.getName } for setObject") + override def executeUpdate(): F[Int] = executeLargeUpdate().map(_.toInt) + protected def buildQuery(original: String, params: ListMap[Int, Parameter]): String = val query = original.toCharArray params diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala index 1e7e18213..865feb5ed 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala @@ -101,18 +101,21 @@ private[ldbc] case class StatementImpl[F[_]: Temporal: Exchange: Tracer]( } override def executeUpdate(sql: String): F[Int] = - checkClosed() *> checkNullOrEmptyQuery(sql) *> exchange[F, Int]("statement") { (span: Span[F]) => + executeLargeUpdate(sql).map(_.toInt) + + override def executeLargeUpdate(sql: String): F[Long] = + checkClosed() *> checkNullOrEmptyQuery(sql) *> exchange[F, Long]("statement") { (span: Span[F]) => span.addAttributes( (attributes ++ List(Attribute("execute", "update"), Attribute("sql", sql)))* ) *> protocol.resetSequenceId *> ( protocol.send(ComQueryPacket(sql, protocol.initialPacket.capabilityFlags, ListMap.empty)) *> protocol.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)).flatMap { case result: OKPacket => - lastInsertId.set(result.lastInsertId) *> updateCount.updateAndGet(_ => result.affectedRows).map(_.toInt) + lastInsertId.set(result.lastInsertId) *> updateCount.updateAndGet(_ => result.affectedRows) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } - ) + ) } override def close(): F[Unit] = statementClosed.set(true) *> resultSetClosed.set(true) @@ -223,11 +226,15 @@ object StatementImpl: override def getResultSet(): F[Option[ResultSet[F]]] = checkClosed() *> currentResultSet.get override def getUpdateCount(): F[Int] = checkClosed() *> updateCount.get.map(_.toInt) + override def getLargeUpdateCount(): F[Long] = checkClosed() *> updateCount.get override def getMoreResults(): F[Boolean] = checkClosed() *> moreResults.get override def executeUpdate(sql: String, autoGeneratedKeys: Int): F[Int] = this.autoGeneratedKeys.set(autoGeneratedKeys) *> executeUpdate(sql) + override def executeLargeUpdate(sql: String, autoGeneratedKeys: Int): F[Long] = + this.autoGeneratedKeys.set(autoGeneratedKeys) *> executeLargeUpdate(sql) + override def execute(sql: String, autoGeneratedKeys: Int): F[Boolean] = this.autoGeneratedKeys.set(autoGeneratedKeys) *> execute(sql) From eb06b420a56bf54e1ab09c0e98dcc0b30116541b Mon Sep 17 00:00:00 2001 From: takapi327 Date: Mon, 27 May 2024 00:02:14 +0900 Subject: [PATCH 3/8] Implement executeLargeUpdate for jdbc connector --- .../connector/PreparedStatementImpl.scala | 24 ++++--------------- .../scala/jdbc/connector/StatementImpl.scala | 8 +++++++ .../scala/ldbc/sql/PreparedStatement.scala | 5 ++++ 3 files changed, 18 insertions(+), 19 deletions(-) diff --git a/module/jdbc-connector/src/main/scala/jdbc/connector/PreparedStatementImpl.scala b/module/jdbc-connector/src/main/scala/jdbc/connector/PreparedStatementImpl.scala index 4bc04b6a7..7a494e48a 100644 --- a/module/jdbc-connector/src/main/scala/jdbc/connector/PreparedStatementImpl.scala +++ b/module/jdbc-connector/src/main/scala/jdbc/connector/PreparedStatementImpl.scala @@ -17,25 +17,6 @@ import ldbc.sql.{ PreparedStatement, ResultSet } private[jdbc] open class PreparedStatementImpl[F[_]: Sync](statement: java.sql.PreparedStatement) extends PreparedStatement[F]: - @deprecated("This method cannot be called on a PreparedStatement.", "0.3.0") - override def executeQuery(sql: String): F[ResultSet[F]] = throw new UnsupportedOperationException( - "This method cannot be called on a PreparedStatement." - ) - - @deprecated("This method cannot be called on a PreparedStatement.", "0.3.0") - override def executeUpdate(sql: String): F[Int] = throw new UnsupportedOperationException( - "This method cannot be called on a PreparedStatement." - ) - - @deprecated("This method cannot be called on a PreparedStatement.", "0.3.0") - override def execute(sql: String): F[Boolean] = throw new UnsupportedOperationException( - "This method cannot be called on a PreparedStatement." - ) - - override def addBatch(sql: String): F[Unit] = throw new UnsupportedOperationException( - "This method cannot be called on a PreparedStatement." - ) - override def executeQuery(): F[ResultSet[F]] = Sync[F].blocking(statement.executeQuery()).map(ResultSetImpl.apply) override def executeUpdate(): F[Int] = Sync[F].blocking(statement.executeUpdate()) @@ -115,3 +96,8 @@ private[jdbc] open class PreparedStatementImpl[F[_]: Sync](statement: java.sql.P Sync[F].blocking(statement.execute(sql, autoGeneratedKeys)) override def isClosed(): F[Boolean] = Sync[F].blocking(statement.isClosed) + + override def getLargeUpdateCount(): F[Long] = Sync[F].blocking(statement.getLargeUpdateCount) + + override def executeLargeUpdate(): F[Long] = + Sync[F].blocking(statement.executeLargeUpdate()) diff --git a/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala b/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala index 9dac185c6..c057590dc 100644 --- a/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala +++ b/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala @@ -46,3 +46,11 @@ private[jdbc] case class StatementImpl[F[_]: Sync](statement: java.sql.Statement Sync[F].blocking(statement.execute(sql, autoGeneratedKeys)) override def isClosed(): F[Boolean] = Sync[F].blocking(statement.isClosed) + + override def getLargeUpdateCount(): F[Long] = Sync[F].blocking(statement.getLargeUpdateCount) + + override def executeLargeUpdate(sql: String): F[Long] = + Sync[F].blocking(statement.executeLargeUpdate(sql)) + + override def executeLargeUpdate(sql: String, autoGeneratedKeys: Int): F[Long] = + Sync[F].blocking(statement.executeLargeUpdate(sql, autoGeneratedKeys)) diff --git a/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala b/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala index b9b5851e0..b82014d25 100644 --- a/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala +++ b/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala @@ -38,6 +38,11 @@ trait PreparedStatement[F[_]] extends Statement[F]: "This method cannot be called on a PreparedStatement." ) + @deprecated("This method cannot be called on a PreparedStatement.", "0.3.0") + override def executeLargeUpdate(sql: String, autoGeneratedKeys: Int): F[Long] = throw new UnsupportedOperationException( + "This method cannot be called on a PreparedStatement." + ) + @deprecated("This method cannot be called on a PreparedStatement.", "0.3.0") override def execute(sql: String): F[Boolean] = throw new UnsupportedOperationException( "This method cannot be called on a PreparedStatement." From e4e4e63073dd7c2d5a97270c5fafd3c286afa149 Mon Sep 17 00:00:00 2001 From: takapi327 Date: Mon, 27 May 2024 00:02:44 +0900 Subject: [PATCH 4/8] Action sbt scalafmtAll --- .../scala/jdbc/connector/StatementImpl.scala | 4 ++-- .../net/protocol/CallableStatementImpl.scala | 18 +++++++++--------- .../net/protocol/ClientPreparedStatement.scala | 2 +- .../connector/net/protocol/StatementImpl.scala | 10 +++++----- .../scala/ldbc/sql/PreparedStatement.scala | 7 ++++--- 5 files changed, 21 insertions(+), 20 deletions(-) diff --git a/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala b/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala index c057590dc..201ec1365 100644 --- a/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala +++ b/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala @@ -46,9 +46,9 @@ private[jdbc] case class StatementImpl[F[_]: Sync](statement: java.sql.Statement Sync[F].blocking(statement.execute(sql, autoGeneratedKeys)) override def isClosed(): F[Boolean] = Sync[F].blocking(statement.isClosed) - + override def getLargeUpdateCount(): F[Long] = Sync[F].blocking(statement.getLargeUpdateCount) - + override def executeLargeUpdate(sql: String): F[Long] = Sync[F].blocking(statement.executeLargeUpdate(sql)) diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala index 7968a95ef..510f603c9 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala @@ -108,15 +108,15 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( for lastColumnReadNullable <- Ref[F].of(true) resultSetCurrentCursor <- Ref[F].of(0) - resultSetCurrentRow <- Ref[F].of[Option[ResultSetRowPacket]](None) + resultSetCurrentRow <- Ref[F].of[Option[ResultSetRowPacket]](None) resultSet = ResultSetImpl.empty( - serverVariables, - protocol.initialPacket.serverVersion, - resultSetClosed, - lastColumnReadNullable, - resultSetCurrentCursor, - resultSetCurrentRow - ) + serverVariables, + protocol.initialPacket.serverVersion, + resultSetClosed, + lastColumnReadNullable, + resultSetCurrentCursor, + resultSetCurrentRow + ) _ <- currentResultSet.set(Some(resultSet)) yield resultSet case Some(resultSet) => @@ -133,7 +133,7 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( sendQuery(buildQuery(sql, params)).flatMap { case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) - case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) + case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } } } diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala index 78c615642..3ec2743c5 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala @@ -141,7 +141,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( protocol.receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)).flatMap { case result: OKPacket => lastInsertId.set(result.lastInsertId) *> ev.pure(result.affectedRows) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) - case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) + case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } } <* params.set(ListMap.empty) } diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala index 865feb5ed..aeda7bda8 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala @@ -115,7 +115,7 @@ private[ldbc] case class StatementImpl[F[_]: Temporal: Exchange: Tracer]( case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } - ) + ) } override def close(): F[Unit] = statementClosed.set(true) *> resultSetClosed.set(true) @@ -224,10 +224,10 @@ object StatementImpl: def autoGeneratedKeys: Ref[F, Int] def lastInsertId: Ref[F, Long] - override def getResultSet(): F[Option[ResultSet[F]]] = checkClosed() *> currentResultSet.get - override def getUpdateCount(): F[Int] = checkClosed() *> updateCount.get.map(_.toInt) - override def getLargeUpdateCount(): F[Long] = checkClosed() *> updateCount.get - override def getMoreResults(): F[Boolean] = checkClosed() *> moreResults.get + override def getResultSet(): F[Option[ResultSet[F]]] = checkClosed() *> currentResultSet.get + override def getUpdateCount(): F[Int] = checkClosed() *> updateCount.get.map(_.toInt) + override def getLargeUpdateCount(): F[Long] = checkClosed() *> updateCount.get + override def getMoreResults(): F[Boolean] = checkClosed() *> moreResults.get override def executeUpdate(sql: String, autoGeneratedKeys: Int): F[Int] = this.autoGeneratedKeys.set(autoGeneratedKeys) *> executeUpdate(sql) diff --git a/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala b/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala index b82014d25..595fe95d9 100644 --- a/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala +++ b/module/ldbc-sql/src/main/scala/ldbc/sql/PreparedStatement.scala @@ -39,9 +39,10 @@ trait PreparedStatement[F[_]] extends Statement[F]: ) @deprecated("This method cannot be called on a PreparedStatement.", "0.3.0") - override def executeLargeUpdate(sql: String, autoGeneratedKeys: Int): F[Long] = throw new UnsupportedOperationException( - "This method cannot be called on a PreparedStatement." - ) + override def executeLargeUpdate(sql: String, autoGeneratedKeys: Int): F[Long] = + throw new UnsupportedOperationException( + "This method cannot be called on a PreparedStatement." + ) @deprecated("This method cannot be called on a PreparedStatement.", "0.3.0") override def execute(sql: String): F[Boolean] = throw new UnsupportedOperationException( From d8c4bde8cafd54aa23e6ee213478f474321594e7 Mon Sep 17 00:00:00 2001 From: takapi327 Date: Mon, 27 May 2024 00:17:10 +0900 Subject: [PATCH 5/8] Added executeLargeBatch --- .../src/main/scala/ldbc/sql/Statement.scala | 44 +++++++++++++++++++ 1 file changed, 44 insertions(+) diff --git a/module/ldbc-sql/src/main/scala/ldbc/sql/Statement.scala b/module/ldbc-sql/src/main/scala/ldbc/sql/Statement.scala index 0664edfe3..4dca1a454 100644 --- a/module/ldbc-sql/src/main/scala/ldbc/sql/Statement.scala +++ b/module/ldbc-sql/src/main/scala/ldbc/sql/Statement.scala @@ -338,6 +338,50 @@ trait Statement[F[_]]: */ def executeLargeUpdate(sql: String, autoGeneratedKeys: Int): F[Long] + /** + * Submits a batch of commands to the database for execution and + * if all commands execute successfully, returns an array of update counts. + * The long elements of the array that is returned are ordered + * to correspond to the commands in the batch, which are ordered + * according to the order in which they were added to the batch. + * The elements in the array returned by the method {@code executeLargeBatch} + * may be one of the following: + *

    + *
  1. A number greater than or equal to zero -- indicates that the + * command was processed successfully and is an update count giving the + * number of rows in the database that were affected by the command's + * execution + *
  2. A value of SUCCESS_NO_INFO -- indicates that the command was + * processed successfully but that the number of rows affected is + * unknown + *

    + * If one of the commands in a batch update fails to execute properly, + * this method throws a BatchUpdateException, and a JDBC + * driver may or may not continue to process the remaining commands in + * the batch. However, the driver's behavior must be consistent with a + * particular DBMS, either always continuing to process commands or never + * continuing to process commands. If the driver continues processing + * after a failure, the array returned by the method + * BatchUpdateException.getLargeUpdateCounts + * will contain as many elements as there are commands in the batch, and + * at least one of the elements will be the following: + * + *

  3. A value of EXECUTE_FAILED -- indicates that the command failed + * to execute successfully and occurs only if a driver continues to + * process commands after a command fails + *
+ *

+ * This method should be used when the returned row count may exceed + * [[Int.MaxValue]]. + *

+ * The default implementation will throw {@code UnsupportedOperationException} + * + * @return an array of update counts containing one element for each + * command in the batch. The elements of the array are ordered according + * to the order in which commands were added to the batch. + */ + def executeLargeBatch(): F[Array[Long]] + object Statement: /** From 37235b089a0c4cfa995e56f8074b854a652fb455 Mon Sep 17 00:00:00 2001 From: takapi327 Date: Mon, 27 May 2024 00:17:23 +0900 Subject: [PATCH 6/8] Implement executeLargeBatch for ldbc connector --- .../exception/BatchUpdateException.scala | 2 +- .../net/packet/response/ERRPacket.scala | 2 +- .../net/protocol/CallableStatementImpl.scala | 14 ++++++------ .../protocol/ClientPreparedStatement.scala | 22 +++++++++---------- .../protocol/ServerPreparedStatement.scala | 18 +++++++-------- .../protocol/SharedPreparedStatement.scala | 3 +++ .../net/protocol/StatementImpl.scala | 16 ++++++++------ 7 files changed, 41 insertions(+), 36 deletions(-) diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/exception/BatchUpdateException.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/exception/BatchUpdateException.scala index 14ae753a7..dd64d5c26 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/exception/BatchUpdateException.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/exception/BatchUpdateException.scala @@ -20,7 +20,7 @@ import org.typelevel.otel4s.Attribute */ class BatchUpdateException( message: String, - updateCounts: List[Int], + updateCounts: List[Long], sqlState: Option[String] = None, vendorCode: Option[Int] = None, sql: Option[String] = None, diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/packet/response/ERRPacket.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/packet/response/ERRPacket.scala index 11ed7a063..300ab8ecf 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/packet/response/ERRPacket.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/packet/response/ERRPacket.scala @@ -131,7 +131,7 @@ case class ERRPacket( def toException(message: String, sql: String): SQLException = toException(message, Some(sql)) - def toException(message: String, updateCounts: Vector[Int]): SQLException = BatchUpdateException( + def toException(message: String, updateCounts: Vector[Long]): SQLException = BatchUpdateException( message = message, updateCounts = updateCounts.toList, sqlState = sqlState, diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala index 510f603c9..a3ae52bf0 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala @@ -193,10 +193,10 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( override def clearBatch(): F[Unit] = batchedArgs.set(Vector.empty) - override def executeBatch(): F[Array[Int]] = + override def executeLargeBatch(): F[Array[Long]] = checkClosed() *> checkNullOrEmptyQuery(sql) *> - exchange[F, Array[Int]]("statement") { (span: Span[F]) => + exchange[F, Array[Long]]("statement") { (span: Span[F]) => batchedArgs.get.flatMap { args => span.addAttributes( (attributes ++ List( @@ -211,9 +211,9 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( case q if q.startsWith("INSERT") => sendQuery(sql.split("VALUES").head + " VALUES" + args.mkString(",")) .flatMap { - case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO)) + case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO.toLong)) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) - case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) + case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } case q if q.startsWith("update") || q.startsWith("delete") || q.startsWith("CALL") => protocol.resetSequenceId *> @@ -227,7 +227,7 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( ) ) *> args - .foldLeft(ev.pure(Vector.empty[Int])) { ($acc, _) => + .foldLeft(ev.pure(Vector.empty[Long])) { ($acc, _) => for acc <- $acc result <- @@ -235,7 +235,7 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( .receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)) .flatMap { case result: OKPacket => - lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows.toInt) + lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute batch", acc)) case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) @@ -249,7 +249,7 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( ev.raiseError( new SQLException("The batch query must be an INSERT, UPDATE, or DELETE, CALL statement.") ) - ) + ) } } <* params.set(ListMap.empty) <* batchedArgs.set(Vector.empty) diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala index 3ec2743c5..3a6dc26c4 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala @@ -161,11 +161,11 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( override def clearBatch(): F[Unit] = batchedArgs.set(Vector.empty) - override def executeBatch(): F[Array[Int]] = + override def executeLargeBatch(): F[Array[Long]] = checkClosed() *> checkNullOrEmptyQuery(sql) *> ( sql.trim.toLowerCase match case q if q.startsWith("insert") => - exchange[F, Array[Int]]("statement") { (span: Span[F]) => + exchange[F, Array[Long]]("statement") { (span: Span[F]) => protocol.resetSequenceId *> batchedArgs.get.flatMap { args => span.addAttributes( @@ -173,7 +173,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( Attribute("execute", "batch"), Attribute("size", args.length.toLong), Attribute("sql", args.toArray.toSeq) - ))* + )) * ) *> ( if args.isEmpty then ev.pure(Array.empty) else @@ -188,17 +188,17 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( protocol .receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)) .flatMap { - case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO)) + case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO.toLong)) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) - case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) + case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } - ) + ) } } <* params.set(ListMap.empty) <* batchedArgs.set(Vector.empty) case q if q.startsWith("update") || q.startsWith("delete") => protocol.resetSequenceId *> protocol.comSetOption(EnumMySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_ON) *> - exchange[F, Array[Int]]("statement") { (span: Span[F]) => + exchange[F, Array[Long]]("statement") { (span: Span[F]) => protocol.resetSequenceId *> batchedArgs.get.flatMap { args => span.addAttributes( @@ -219,7 +219,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( ) ) *> args - .foldLeft(ev.pure(Vector.empty[Int])) { ($acc, _) => + .foldLeft(ev.pure(Vector.empty[Long])) { ($acc, _) => for acc <- $acc result <- @@ -227,7 +227,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( .receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)) .flatMap { case result: OKPacket => - lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows.toInt) + lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute batch", acc)) case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) @@ -235,7 +235,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( yield result } .map(_.toArray) - ) + ) } } <* protocol.resetSequenceId <* @@ -246,7 +246,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( ev.raiseError( new IllegalArgumentException("The batch query must be an INSERT, UPDATE, or DELETE statement.") ) - ) + ) override def getGeneratedKeys(): F[ResultSet[F]] = autoGeneratedKeys.get.flatMap { diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala index 1704cd239..fb447c9bf 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala @@ -146,11 +146,11 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( override def clearBatch(): F[Unit] = batchedArgs.set(Vector.empty) - override def executeBatch(): F[Array[Int]] = + override def executeLargeBatch(): F[Array[Long]] = checkClosed() *> checkNullOrEmptyQuery(sql) *> ( sql.trim.toLowerCase match case q if q.startsWith("insert") => - exchange[F, Array[Int]]("statement") { (span: Span[F]) => + exchange[F, Array[Long]]("statement") { (span: Span[F]) => protocol.resetSequenceId *> batchedArgs.get.flatMap { args => span.addAttributes( @@ -173,17 +173,17 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( protocol .receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)) .flatMap { - case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO)) + case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO.toLong)) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } - ) + ) } } <* params.set(ListMap.empty) <* batchedArgs.set(Vector.empty) case q if q.startsWith("update") || q.startsWith("delete") => protocol.resetSequenceId *> protocol.comSetOption(EnumMySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_ON) *> - exchange[F, Array[Int]]("statement") { (span: Span[F]) => + exchange[F, Array[Long]]("statement") { (span: Span[F]) => protocol.resetSequenceId *> batchedArgs.get.flatMap { args => span.addAttributes( @@ -204,7 +204,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( ) ) *> args - .foldLeft(ev.pure(Vector.empty[Int])) { ($acc, _) => + .foldLeft(ev.pure(Vector.empty[Long])) { ($acc, _) => for acc <- $acc result <- @@ -212,7 +212,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( .receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)) .flatMap { case result: OKPacket => - lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows.toInt) + lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute batch", acc)) case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) @@ -220,7 +220,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( yield result } .map(_.toArray) - ) + ) } } <* protocol.resetSequenceId <* @@ -231,7 +231,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( ev.raiseError( new IllegalArgumentException("The batch query must be an INSERT, UPDATE, or DELETE statement.") ) - ) + ) override def getGeneratedKeys(): F[ResultSet[F]] = autoGeneratedKeys.get.flatMap { diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/SharedPreparedStatement.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/SharedPreparedStatement.scala index 63354ef22..79df6e94f 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/SharedPreparedStatement.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/SharedPreparedStatement.scala @@ -88,6 +88,9 @@ private[ldbc] trait SharedPreparedStatement[F[_]: Temporal] override def executeUpdate(): F[Int] = executeLargeUpdate().map(_.toInt) + override def executeBatch(): F[Array[Int]] = + executeLargeBatch().map(_.map(_.toInt)) + protected def buildQuery(original: String, params: ListMap[Int, Parameter]): String = val query = original.toCharArray params diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala index aeda7bda8..95a622ddc 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala @@ -134,10 +134,10 @@ private[ldbc] case class StatementImpl[F[_]: Temporal: Exchange: Tracer]( override def clearBatch(): F[Unit] = batchedArgs.set(Vector.empty) - override def executeBatch(): F[Array[Int]] = + override def executeLargeBatch(): F[Array[Long]] = checkClosed() *> protocol.resetSequenceId *> protocol.comSetOption(EnumMySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_ON) *> - exchange[F, Array[Int]]("statement") { (span: Span[F]) => + exchange[F, Array[Long]]("statement") { (span: Span[F]) => batchedArgs.get.flatMap { args => span.addAttributes( (attributes ++ List( @@ -153,7 +153,7 @@ private[ldbc] case class StatementImpl[F[_]: Temporal: Exchange: Tracer]( ComQueryPacket(args.mkString(";"), protocol.initialPacket.capabilityFlags, ListMap.empty) ) *> args - .foldLeft(ev.pure(Vector.empty[Int])) { ($acc, _) => + .foldLeft(ev.pure(Vector.empty[Long])) { ($acc, _) => for acc <- $acc result <- @@ -161,7 +161,7 @@ private[ldbc] case class StatementImpl[F[_]: Temporal: Exchange: Tracer]( .receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)) .flatMap { case result: OKPacket => - lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows.toInt) + lastInsertId.set(result.lastInsertId) *> ev.pure(acc :+ result.affectedRows) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute batch", acc)) case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) @@ -169,11 +169,11 @@ private[ldbc] case class StatementImpl[F[_]: Temporal: Exchange: Tracer]( yield result } .map(_.toArray) - ) + ) } } <* protocol.resetSequenceId <* protocol.comSetOption( - EnumMySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF - ) <* clearBatch() + EnumMySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF + ) <* clearBatch() override def getGeneratedKeys(): F[ResultSet[F]] = autoGeneratedKeys.get.flatMap { @@ -244,6 +244,8 @@ object StatementImpl: stmtClosed <- statementClosed.get yield connClosed || stmtClosed + override def executeBatch(): F[Array[Int]] = executeLargeBatch().map(_.map(_.toInt)) + protected def checkClosed(): F[Unit] = isClosed().ifM( close() *> ev.raiseError(new SQLException("No operations allowed after statement closed.")), From 7944967ac45b3139d64e35e4bd3b00df33c5caef Mon Sep 17 00:00:00 2001 From: takapi327 Date: Mon, 27 May 2024 00:19:02 +0900 Subject: [PATCH 7/8] Implement executeLargeBatch for jdbc connector --- .../src/main/scala/jdbc/connector/PreparedStatementImpl.scala | 3 +++ .../src/main/scala/jdbc/connector/StatementImpl.scala | 3 +++ 2 files changed, 6 insertions(+) diff --git a/module/jdbc-connector/src/main/scala/jdbc/connector/PreparedStatementImpl.scala b/module/jdbc-connector/src/main/scala/jdbc/connector/PreparedStatementImpl.scala index 7a494e48a..f3185a1f9 100644 --- a/module/jdbc-connector/src/main/scala/jdbc/connector/PreparedStatementImpl.scala +++ b/module/jdbc-connector/src/main/scala/jdbc/connector/PreparedStatementImpl.scala @@ -101,3 +101,6 @@ private[jdbc] open class PreparedStatementImpl[F[_]: Sync](statement: java.sql.P override def executeLargeUpdate(): F[Long] = Sync[F].blocking(statement.executeLargeUpdate()) + + override def executeLargeBatch(): F[Array[Long]] = + Sync[F].blocking(statement.executeLargeBatch()) diff --git a/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala b/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala index 201ec1365..2e357f3e0 100644 --- a/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala +++ b/module/jdbc-connector/src/main/scala/jdbc/connector/StatementImpl.scala @@ -54,3 +54,6 @@ private[jdbc] case class StatementImpl[F[_]: Sync](statement: java.sql.Statement override def executeLargeUpdate(sql: String, autoGeneratedKeys: Int): F[Long] = Sync[F].blocking(statement.executeLargeUpdate(sql, autoGeneratedKeys)) + + override def executeLargeBatch(): F[Array[Long]] = + Sync[F].blocking(statement.executeLargeBatch()) From e3485aab44be600afd3d37d1f191ae52aa5885bd Mon Sep 17 00:00:00 2001 From: takapi327 Date: Mon, 27 May 2024 00:19:06 +0900 Subject: [PATCH 8/8] Action sbt scalafmtAll --- .../net/protocol/CallableStatementImpl.scala | 6 +++--- .../net/protocol/ClientPreparedStatement.scala | 12 ++++++------ .../net/protocol/ServerPreparedStatement.scala | 6 +++--- .../ldbc/connector/net/protocol/StatementImpl.scala | 6 +++--- 4 files changed, 15 insertions(+), 15 deletions(-) diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala index a3ae52bf0..0d63e525e 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/CallableStatementImpl.scala @@ -211,9 +211,9 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( case q if q.startsWith("INSERT") => sendQuery(sql.split("VALUES").head + " VALUES" + args.mkString(",")) .flatMap { - case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO.toLong)) + case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO.toLong)) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) - case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) + case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } case q if q.startsWith("update") || q.startsWith("delete") || q.startsWith("CALL") => protocol.resetSequenceId *> @@ -249,7 +249,7 @@ case class CallableStatementImpl[F[_]: Temporal: Exchange: Tracer]( ev.raiseError( new SQLException("The batch query must be an INSERT, UPDATE, or DELETE, CALL statement.") ) - ) + ) } } <* params.set(ListMap.empty) <* batchedArgs.set(Vector.empty) diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala index 3a6dc26c4..db7bec8c0 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ClientPreparedStatement.scala @@ -173,7 +173,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( Attribute("execute", "batch"), Attribute("size", args.length.toLong), Attribute("sql", args.toArray.toSeq) - )) * + ))* ) *> ( if args.isEmpty then ev.pure(Array.empty) else @@ -188,11 +188,11 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( protocol .receive(GenericResponsePackets.decoder(protocol.initialPacket.capabilityFlags)) .flatMap { - case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO.toLong)) + case _: OKPacket => ev.pure(Array.fill(args.length)(Statement.SUCCESS_NO_INFO.toLong)) case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) - case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) + case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } - ) + ) } } <* params.set(ListMap.empty) <* batchedArgs.set(Vector.empty) case q if q.startsWith("update") || q.startsWith("delete") => @@ -235,7 +235,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( yield result } .map(_.toArray) - ) + ) } } <* protocol.resetSequenceId <* @@ -246,7 +246,7 @@ case class ClientPreparedStatement[F[_]: Temporal: Exchange: Tracer]( ev.raiseError( new IllegalArgumentException("The batch query must be an INSERT, UPDATE, or DELETE statement.") ) - ) + ) override def getGeneratedKeys(): F[ResultSet[F]] = autoGeneratedKeys.get.flatMap { diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala index fb447c9bf..8cf15aae1 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/ServerPreparedStatement.scala @@ -177,7 +177,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( case error: ERRPacket => ev.raiseError(error.toException("Failed to execute query", sql)) case _: EOFPacket => ev.raiseError(new SQLException("Unexpected EOF packet")) } - ) + ) } } <* params.set(ListMap.empty) <* batchedArgs.set(Vector.empty) case q if q.startsWith("update") || q.startsWith("delete") => @@ -220,7 +220,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( yield result } .map(_.toArray) - ) + ) } } <* protocol.resetSequenceId <* @@ -231,7 +231,7 @@ case class ServerPreparedStatement[F[_]: Temporal: Exchange: Tracer]( ev.raiseError( new IllegalArgumentException("The batch query must be an INSERT, UPDATE, or DELETE statement.") ) - ) + ) override def getGeneratedKeys(): F[ResultSet[F]] = autoGeneratedKeys.get.flatMap { diff --git a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala index 95a622ddc..e5223635e 100644 --- a/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala +++ b/module/ldbc-connector/shared/src/main/scala/ldbc/connector/net/protocol/StatementImpl.scala @@ -169,11 +169,11 @@ private[ldbc] case class StatementImpl[F[_]: Temporal: Exchange: Tracer]( yield result } .map(_.toArray) - ) + ) } } <* protocol.resetSequenceId <* protocol.comSetOption( - EnumMySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF - ) <* clearBatch() + EnumMySQLSetOption.MYSQL_OPTION_MULTI_STATEMENTS_OFF + ) <* clearBatch() override def getGeneratedKeys(): F[ResultSet[F]] = autoGeneratedKeys.get.flatMap {