Skip to content

Commit

Permalink
Add basic transaction semantics
Browse files Browse the repository at this point in the history
Fixes apache#23.
  • Loading branch information
lidavidm committed Jun 22, 2022
1 parent c3607c5 commit 66be449
Show file tree
Hide file tree
Showing 5 changed files with 114 additions and 36 deletions.
80 changes: 52 additions & 28 deletions adbc.h
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,6 @@ struct ArrowArray {

#endif // ARROW_C_DATA_INTERFACE

// EXPERIMENTAL: C stream interface

#ifndef ARROW_C_STREAM_INTERFACE
#define ARROW_C_STREAM_INTERFACE

Expand Down Expand Up @@ -252,6 +250,11 @@ struct ADBC_EXPORT AdbcError {

/// }@

/// \brief Canonical option value for enabling an option.
#define ADBC_OPTION_VALUE_ENABLED "true"
/// \brief Canonical option value for disabling an option.
#define ADBC_OPTION_VALUE_DISABLED "false"

/// \defgroup adbc-database Database initialization.
/// Clients first initialize a database, then connect to the database
/// (below). For client-server databases, one of these steps may be a
Expand Down Expand Up @@ -340,32 +343,6 @@ ADBC_EXPORT
AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error);

/// \defgroup adbc-connection-partition Partitioned Results
/// Some databases may internally partition the results. These
/// partitions are exposed to clients who may wish to integrate them
/// with a threaded or distributed execution model, where partitions
/// can be divided among threads or machines for processing.
///
/// Drivers are not required to support partitioning.
///
/// Partitions are not ordered. If the result set is sorted,
/// implementations should return a single partition.
///
/// @{

/// \brief Construct a statement for a partition of a query. The
/// statement can then be read independently.
///
/// A partition can be retrieved from AdbcStatementGetPartitionDesc.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* connection,
const uint8_t* serialized_partition,
size_t serialized_length,
struct AdbcStatement* statement,
struct AdbcError* error);

/// }@

/// \defgroup adbc-connection-metadata Metadata
/// Functions for retrieving metadata about the database.
///
Expand Down Expand Up @@ -653,6 +630,52 @@ AdbcStatusCode AdbcConnectionGetColumns(struct AdbcConnection* connection,
struct AdbcError* error);
/// }@

/// \defgroup adbc-connection-partition Partitioned Results
/// Some databases may internally partition the results. These
/// partitions are exposed to clients who may wish to integrate them
/// with a threaded or distributed execution model, where partitions
/// can be divided among threads or machines for processing.
///
/// Drivers are not required to support partitioning.
///
/// Partitions are not ordered. If the result set is sorted,
/// implementations should return a single partition.
///
/// @{

/// \brief Construct a statement for a partition of a query. The
/// statement can then be read independently.
///
/// A partition can be retrieved from AdbcStatementGetPartitionDesc.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionDeserializePartitionDesc(struct AdbcConnection* connection,
const uint8_t* serialized_partition,
size_t serialized_length,
struct AdbcStatement* statement,
struct AdbcError* error);

/// }@

/// \defgroup adbc-connection-transaction Transaction Semantics
///
/// Connections start out in auto-commit mode by default (if
/// applicable for the given vendor). Use
/// ADBC_CONNECTION_OPTION_AUTO_COMMIT to change this.
///
/// @{

/// \brief The name of the canonical option for whether autocommit is
/// enabled.
#define ADBC_CONNECTION_OPTION_AUTOCOMMIT "adbc.connection.autocommit"

/// \brief Commit any pending transactions. Only used if autocommit is
/// disabled.
ADBC_EXPORT
AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error);

/// }@

/// }@

/// \defgroup adbc-statement Managing statements.
Expand Down Expand Up @@ -912,6 +935,7 @@ struct ADBC_EXPORT AdbcDriver {
struct AdbcStatement*,
struct AdbcError*);

AdbcStatusCode (*ConnectionCommit)(struct AdbcConnection*, struct AdbcError*);
AdbcStatusCode (*ConnectionGetCatalogs)(struct AdbcConnection*, struct AdbcStatement*,
struct AdbcError*);
AdbcStatusCode (*ConnectionGetColumns)(struct AdbcConnection*, const char*, const char*,
Expand Down
36 changes: 29 additions & 7 deletions adbc_driver_manager/adbc_driver_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,11 @@ void SetError(struct AdbcError* error, const std::string& message) {
}

// Default stubs

AdbcStatusCode ConnectionCommit(struct AdbcConnection*, struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode ConnectionGetCatalogs(struct AdbcConnection*, struct AdbcStatement*,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
Expand Down Expand Up @@ -290,15 +295,12 @@ AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
return status;
}

AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
struct AdbcConnection* connection,
struct AdbcError* error) {
if (!database->private_driver) {
AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
if (!connection->private_driver) {
return ADBC_STATUS_INVALID_STATE;
}
auto status = database->private_driver->ConnectionNew(database, connection, error);
connection->private_driver = database->private_driver;
return status;
return connection->private_driver->ConnectionCommit(connection, error);
}

AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
Expand All @@ -309,6 +311,17 @@ AdbcStatusCode AdbcConnectionInit(struct AdbcConnection* connection,
return connection->private_driver->ConnectionInit(connection, error);
}

AdbcStatusCode AdbcConnectionNew(struct AdbcDatabase* database,
struct AdbcConnection* connection,
struct AdbcError* error) {
if (!database->private_driver) {
return ADBC_STATUS_INVALID_STATE;
}
auto status = database->private_driver->ConnectionNew(database, connection, error);
connection->private_driver = database->private_driver;
return status;
}

AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
struct AdbcError* error) {
if (!connection->private_driver) {
Expand All @@ -319,6 +332,14 @@ AdbcStatusCode AdbcConnectionRelease(struct AdbcConnection* connection,
return status;
}

AdbcStatusCode AdbcConnectionSetOption(struct AdbcConnection* connection, const char* key,
const char* value, struct AdbcError* error) {
if (!connection->private_driver) {
return ADBC_STATUS_INVALID_STATE;
}
return connection->private_driver->ConnectionSetOption(connection, key, value, error);
}

AdbcStatusCode AdbcStatementBind(struct AdbcStatement* statement,
struct ArrowArray* values, struct ArrowSchema* schema,
struct AdbcError* error) {
Expand Down Expand Up @@ -559,6 +580,7 @@ AdbcStatusCode AdbcLoadDriver(const char* driver_name, const char* entrypoint,
CHECK_REQUIRED(driver, DatabaseInit);
CHECK_REQUIRED(driver, DatabaseRelease);

FILL_DEFAULT(driver, ConnectionCommit);
FILL_DEFAULT(driver, ConnectionGetCatalogs);
FILL_DEFAULT(driver, ConnectionGetColumns);
FILL_DEFAULT(driver, ConnectionGetDbSchemas);
Expand Down
8 changes: 8 additions & 0 deletions adbc_driver_manager/adbc_driver_manager_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -217,4 +217,12 @@ TEST_F(DriverManager, BulkIngestStream) {
}
}

TEST_F(DriverManager, Transactions) {
// SQLite driver doesn't currently implement these
ASSERT_EQ(ADBC_STATUS_NOT_IMPLEMENTED,
AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
ADBC_OPTION_VALUE_DISABLED, &error));
ASSERT_EQ(ADBC_STATUS_NOT_IMPLEMENTED, AdbcConnectionCommit(&connection, &error));
}

} // namespace adbc
18 changes: 17 additions & 1 deletion drivers/sqlite/sqlite.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1364,6 +1364,11 @@ AdbcStatusCode SqliteDatabaseRelease(struct AdbcDatabase* database,
return status;
}

AdbcStatusCode SqliteConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode SqliteConnectionGetColumns(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name, const char* column_name,
Expand Down Expand Up @@ -1467,7 +1472,12 @@ AdbcStatusCode SqliteConnectionRelease(struct AdbcConnection* connection,
AdbcStatusCode SqliteConnectionSetOption(struct AdbcConnection* connection,
const char* key, const char* value,
struct AdbcError* error) {
return ADBC_STATUS_OK;
if (std::strcmp(key, ADBC_CONNECTION_OPTION_AUTOCOMMIT) == 0) {
SetError(error, "Cannot disable autocommit");
} else {
SetError(error, "Unknown option");
}
return ADBC_STATUS_NOT_IMPLEMENTED;
}

AdbcStatusCode SqliteStatementBind(struct AdbcStatement* statement,
Expand Down Expand Up @@ -1581,6 +1591,11 @@ AdbcStatusCode AdbcDatabaseRelease(struct AdbcDatabase* database,
return SqliteDatabaseRelease(database, error);
}

AdbcStatusCode AdbcConnectionCommit(struct AdbcConnection* connection,
struct AdbcError* error) {
return SqliteConnectionCommit(connection, error);
}

AdbcStatusCode AdbcConnectionGetColumns(struct AdbcConnection* connection,
const char* catalog, const char* db_schema,
const char* table_name, const char* column_name,
Expand Down Expand Up @@ -1730,6 +1745,7 @@ AdbcStatusCode AdbcSqliteDriverInit(size_t count, struct AdbcDriver* driver,
driver->DatabaseRelease = SqliteDatabaseRelease;
driver->DatabaseSetOption = SqliteDatabaseSetOption;

driver->ConnectionCommit = SqliteConnectionCommit;
driver->ConnectionGetCatalogs = SqliteConnectionGetCatalogs;
driver->ConnectionGetColumns = SqliteConnectionGetColumns;
driver->ConnectionGetDbSchemas = SqliteConnectionGetDbSchemas;
Expand Down
8 changes: 8 additions & 0 deletions drivers/sqlite/sqlite_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -935,4 +935,12 @@ TEST_F(Sqlite, MetadataGetTableSchema) {
ASSERT_SCHEMA_EQ(*schema, *bulk_schema);
}

TEST_F(Sqlite, Transactions) {
// Driver doesn't currently implement these
ASSERT_EQ(ADBC_STATUS_NOT_IMPLEMENTED,
AdbcConnectionSetOption(&connection, ADBC_CONNECTION_OPTION_AUTOCOMMIT,
ADBC_OPTION_VALUE_DISABLED, &error));
ASSERT_EQ(ADBC_STATUS_NOT_IMPLEMENTED, AdbcConnectionCommit(&connection, &error));
}

} // namespace adbc

0 comments on commit 66be449

Please sign in to comment.