Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/2024 05 extension sql #212

Closed
wants to merge 13 commits into from
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import cats.syntax.all.*
import cats.effect.Temporal

import ldbc.sql.*
import ldbc.sql.logging.*

/**
* A model with a query string and parameters to be bound to the query string that is executed by PreparedStatement,
Expand All @@ -27,59 +26,49 @@ import ldbc.sql.logging.*
* @tparam F
* The effect type
*/
case class Mysql[F[_]: Temporal](statement: String, params: Seq[ParameterBinder[F]]) extends SQL[F]:
case class Mysql[F[_]: Temporal](statement: String, params: List[ParameterBinder[F]]) extends SQL[F]:

@targetName("combine")
override def ++(sql: SQL[F]): SQL[F] =
Mysql[F](statement ++ " " ++ sql.statement, params ++ sql.params)

override def update(using logHandler: LogHandler[F]): Kleisli[F, Connection[F], Int] = Kleisli { connection =>
(for
override def update: Query[F, Int] = QueryImpl[F, Int](statement, params.map(_.parameter)) { connection =>
for
statement <- connection.prepareStatement(statement)
result <- params.zipWithIndex.traverse {
case (param, index) => param.bind(statement, index + 1)
} >> statement.executeUpdate() <* statement.close()
yield result)
.onError(ex => logHandler.run(LogEvent.ExecFailure(statement, params.map(_.parameter).toList, ex)))
<* logHandler.run(LogEvent.Success(statement, params.map(_.parameter).toList))
yield result
}

override def returning[T <: String | Int | Long](using
reader: ResultSetReader[F, T],
logHandler: LogHandler[F]
): Kleisli[F, Connection[F], T] = Kleisli { connection =>
reader: ResultSetReader[F, T]
): Query[F, T] = QueryImpl[F, T](statement, params.map(_.parameter)) { connection =>
given Kleisli[F, ResultSet[F], T] = Kleisli { resultSet =>
reader.read(resultSet, 1)
}

(for
for
statement <- connection.prepareStatement(statement, Statement.RETURN_GENERATED_KEYS)
resultSet <- params.zipWithIndex.traverse {
case (param, index) => param.bind(statement, index + 1)
} >> statement.executeUpdate() >> statement.getGeneratedKeys()
result <- summon[ResultSetConsumer[F, T]].consume(resultSet) <* statement.close()
yield result)
.onError(ex => logHandler.run(LogEvent.ExecFailure(statement, params.map(_.parameter).toList, ex)))
<* logHandler.run(LogEvent.Success(statement, params.map(_.parameter).toList))
yield result
}

private[ldbc] override def connection[T](
statement: String,
params: Seq[ParameterBinder[F]],
consumer: ResultSetConsumer[F, T]
)(using logHandler: LogHandler[F]): Kleisli[F, Connection[F], T] =
Kleisli { connection =>
): Query[F, T] =
QueryImpl[F, T](statement, params.map(_.parameter).toList) { connection =>
for
prepareStatement <- connection.prepareStatement(statement)
resultSet <- params.zipWithIndex.traverse {
case (param, index) => param.bind(prepareStatement, index + 1)
} >> prepareStatement
.executeQuery()
result <-
consumer
.consume(resultSet)
.onError(ex => logHandler.run(LogEvent.ProcessingFailure(statement, params.map(_.parameter).toList, ex)))
<* prepareStatement.close()
<* logHandler.run(LogEvent.Success(statement, params.map(_.parameter).toList))
result <- consumer.consume(resultSet) <* prepareStatement.close()
yield result
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
/**
* Copyright (c) 2023-2024 by Takahiko Tominaga
* This software is licensed under the MIT License (MIT).
* For more information see LICENSE or https://opensource.org/licenses/MIT
*/

package ldbc.connector

import cats.syntax.all.*
import cats.effect.{ Temporal, Resource }
import cats.effect.kernel.Resource.ExitCase

import ldbc.sql.*
import ldbc.sql.logging.*

private[ldbc] case class QueryImpl[F[_]: Temporal, T](statement: String, params: List[Any])(func: Connection[F] => F[T])
extends Query[F, T]:

override def run: Connection[F] => F[T] = func

override def readOnly(connection: Connection[F])(using logHandler: LogHandler[F]): F[T] =
(for
_ <- connection.setReadOnly(true)
result <- run(connection)
yield result)
.onError(ex => logHandler.run(LogEvent.ExecFailure(statement, params, ex)))
<* logHandler.run(LogEvent.Success(statement, params))

override def autoCommit(connection: Connection[F])(using logHandler: LogHandler[F]): F[T] =
(for
_ <- connection.setReadOnly(false)
_ <- connection.setAutoCommit(true)
result <- run(connection)
yield result)
.onError(ex => logHandler.run(LogEvent.ExecFailure(statement, params, ex)))
<* logHandler.run(LogEvent.Success(statement, params))

override def transaction(connection: Connection[F])(using logHandler: LogHandler[F]): F[T] =
val acquire = connection.setReadOnly(false) >> connection.setAutoCommit(false) >> Temporal[F].pure(connection)

val release = (connection: Connection[F], exitCase: ExitCase) =>
exitCase match {
case ExitCase.Errored(ex) => connection.rollback() *> Temporal[F].raiseError(ex)
case _ => connection.commit()
}

Resource
.makeCase(acquire)(release)
.use(run)
.onError(ex => logHandler.run(LogEvent.ExecFailure(statement, params, ex)))
<* logHandler.run(LogEvent.Success(statement, params))

override def rollback(connection: Connection[F])(using logHandler: LogHandler[F]): F[T] =
(for
_ <- connection.setReadOnly(false)
_ <- connection.setAutoCommit(false)
result <- run(connection)
_ <- connection.rollback()
yield result)
.onError(ex => logHandler.run(LogEvent.ExecFailure(statement, params, ex)))
<* logHandler.run(LogEvent.Success(statement, params))
Loading
Loading