Skip to content

Commit

Permalink
Transaction support (#2465)
Browse files Browse the repository at this point in the history
* Add support for transactions

Co-authored-by: klinson <klinson@163.com>
Co-authored-by: levon80999 <levonb@ucraft.com>

* Start single-member replica set in CI

Co-authored-by: levon80999 <levonb@ucraft.com>

* Add connection options for faster failures in tests

The faster connection and server selection timeouts ensure we don't spend too much time waiting for the inevitable as we're expecting fast connections on CI systems

Co-authored-by: levon80999 <levonb@ucraft.com>

* Apply readme code review suggestions

* Simplify replica set creation in CI

* Apply feedback from code review

* Update naming of database env variable in tests

* Use default argument for server selection (which defaults to primary)

* Revert "Simplify replica set creation in CI"

This partially reverts commit 203160e. The simplified call unfortunately breaks tests.

* Pass connection instance to transactional closure

This is consistent with the behaviour of the original ManagesTransactions concern.

* Correctly re-throw exception when callback attempts have been exceeded.

* Limit transaction lifetime to 5 seconds

This ensures that hung transactions don't block any subsequent operations for an unnecessary period of time.

* Add build step to print MongoDB server status

* Update src/Concerns/ManagesTransactions.php

Co-authored-by: Jeremy Mikola <jmikola@gmail.com>

Co-authored-by: klinson <klinson@163.com>
Co-authored-by: levon80999 <levonb@ucraft.com>
Co-authored-by: Jeremy Mikola <jmikola@gmail.com>
  • Loading branch information
4 people committed Nov 27, 2022
1 parent 0606fc0 commit e5e9193
Show file tree
Hide file tree
Showing 9 changed files with 672 additions and 18 deletions.
16 changes: 11 additions & 5 deletions .github/workflows/build-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,6 @@ jobs:
- '8.0'
- '8.1'
services:
mongo:
image: mongo:${{ matrix.mongodb }}
ports:
- 27017:27017
mysql:
image: mysql:5.7
ports:
Expand All @@ -59,6 +55,16 @@ jobs:

steps:
- uses: actions/checkout@v2
- name: Create MongoDB Replica Set
run: |
docker run --name mongodb -p 27017:27017 -e MONGO_INITDB_DATABASE=unittest --detach mongo:${{ matrix.mongodb }} mongod --replSet rs --setParameter transactionLifetimeLimitSeconds=5
until docker exec --tty mongodb mongo 127.0.0.1:27017 --eval "db.runCommand({ ping: 1 })"; do
sleep 1
done
sudo docker exec --tty mongodb mongo 127.0.0.1:27017 --eval "rs.initiate({\"_id\":\"rs\",\"members\":[{\"_id\":0,\"host\":\"127.0.0.1:27017\" }]})"
- name: Show MongoDB server status
run: |
docker exec --tty mongodb mongo 127.0.0.1:27017 --eval "db.runCommand({ serverStatus: 1 })"
- name: "Installing php"
uses: shivammathur/setup-php@v2
with:
Expand Down Expand Up @@ -88,7 +94,7 @@ jobs:
run: |
./vendor/bin/phpunit --coverage-clover coverage.xml
env:
MONGODB_URI: 'mongodb://127.0.0.1/'
MONGODB_URI: 'mongodb://127.0.0.1/?replicaSet=rs'
MYSQL_HOST: 0.0.0.0
MYSQL_PORT: 3307
- uses: codecov/codecov-action@v1
Expand Down
47 changes: 47 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ This package adds functionalities to the Eloquent model and Query builder for Mo
- [Query Builder](#query-builder)
- [Basic Usage](#basic-usage-2)
- [Available operations](#available-operations)
- [Transactions](#transactions)
- [Schema](#schema)
- [Basic Usage](#basic-usage-3)
- [Geospatial indexes](#geospatial-indexes)
Expand Down Expand Up @@ -968,6 +969,52 @@ If you are familiar with [Eloquent Queries](http://laravel.com/docs/queries), th
### Available operations
To see the available operations, check the [Eloquent](#eloquent) section.

Transactions
------------
Transactions require MongoDB version ^4.0 as well as deployment of replica set or sharded clusters. You can find more information [in the MongoDB docs](https://docs.mongodb.com/manual/core/transactions/)

### Basic Usage

```php
DB::transaction(function () {
User::create(['name' => 'john', 'age' => 19, 'title' => 'admin', 'email' => 'john@example.com']);
DB::collection('users')->where('name', 'john')->update(['age' => 20]);
DB::collection('users')->where('name', 'john')->delete();
});
```

```php
// begin a transaction
DB::beginTransaction();
User::create(['name' => 'john', 'age' => 19, 'title' => 'admin', 'email' => 'john@example.com']);
DB::collection('users')->where('name', 'john')->update(['age' => 20]);
DB::collection('users')->where('name', 'john')->delete();

// commit changes
DB::commit();
```

To abort a transaction, call the `rollBack` method at any point during the transaction:
```php
DB::beginTransaction();
User::create(['name' => 'john', 'age' => 19, 'title' => 'admin', 'email' => 'john@example.com']);

// Abort the transaction, discarding any data created as part of it
DB::rollBack();
```

**NOTE:** Transactions in MongoDB cannot be nested. DB::beginTransaction() function will start new transactions in a new created or existing session and will raise the RuntimeException when transactions already exist. See more in MongoDB official docs [Transactions and Sessions](https://www.mongodb.com/docs/manual/core/transactions/#transactions-and-sessions)
```php
DB::beginTransaction();
User::create(['name' => 'john', 'age' => 20, 'title' => 'admin']);

// This call to start a nested transaction will raise a RuntimeException
DB::beginTransaction();
DB::collection('users')->where('name', 'john')->update(['age' => 20]);
DB::commit();
DB::rollBack();
```

Schema
------
The database driver also has (limited) schema builder support. You can easily manipulate collections and set indexes.
Expand Down
5 changes: 4 additions & 1 deletion phpunit.xml.dist
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,9 @@
<file>tests/QueryBuilderTest.php</file>
<file>tests/QueryTest.php</file>
</testsuite>
<testsuite name="transaction">
<file>tests/TransactionTest.php</file>
</testsuite>
<testsuite name="model">
<file>tests/ModelTest.php</file>
<file>tests/RelationsTest.php</file>
Expand All @@ -36,7 +39,7 @@
</testsuites>
<php>
<env name="MONGODB_URI" value="mongodb://127.0.0.1/" />
<env name="MONGO_DATABASE" value="unittest"/>
<env name="MONGODB_DATABASE" value="unittest"/>
<env name="MYSQL_HOST" value="mysql"/>
<env name="MYSQL_PORT" value="3306"/>
<env name="MYSQL_DATABASE" value="unittest"/>
Expand Down
116 changes: 116 additions & 0 deletions src/Concerns/ManagesTransactions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,116 @@
<?php

namespace Jenssegers\Mongodb\Concerns;

use Closure;
use MongoDB\Client;
use MongoDB\Driver\Exception\RuntimeException;
use MongoDB\Driver\Session;
use function MongoDB\with_transaction;
use Throwable;

/**
* @see https://docs.mongodb.com/manual/core/transactions/
*/
trait ManagesTransactions
{
protected ?Session $session = null;

protected $transactions = 0;

/**
* @return Client
*/
abstract public function getMongoClient();

public function getSession(): ?Session
{
return $this->session;
}

private function getSessionOrCreate(): Session
{
if ($this->session === null) {
$this->session = $this->getMongoClient()->startSession();
}

return $this->session;
}

private function getSessionOrThrow(): Session
{
$session = $this->getSession();

if ($session === null) {
throw new RuntimeException('There is no active session.');
}

return $session;
}

/**
* Starts a transaction on the active session. An active session will be created if none exists.
*/
public function beginTransaction(array $options = []): void
{
$this->getSessionOrCreate()->startTransaction($options);
$this->transactions = 1;
}

/**
* Commit transaction in this session.
*/
public function commit(): void
{
$this->getSessionOrThrow()->commitTransaction();
$this->transactions = 0;
}

/**
* Abort transaction in this session.
*/
public function rollBack($toLevel = null): void
{
$this->getSessionOrThrow()->abortTransaction();
$this->transactions = 0;
}

/**
* Static transaction function realize the with_transaction functionality provided by MongoDB.
*
* @param int $attempts
*/
public function transaction(Closure $callback, $attempts = 1, array $options = []): mixed
{
$attemptsLeft = $attempts;
$callbackResult = null;
$throwable = null;

$callbackFunction = function (Session $session) use ($callback, &$attemptsLeft, &$callbackResult, &$throwable) {
$attemptsLeft--;

if ($attemptsLeft < 0) {
$session->abortTransaction();

return;
}

// Catch, store, and re-throw any exception thrown during execution
// of the callable. The last exception is re-thrown if the transaction
// was aborted because the number of callback attempts has been exceeded.
try {
$callbackResult = $callback($this);
} catch (Throwable $throwable) {
throw $throwable;
}
};

with_transaction($this->getSessionOrCreate(), $callbackFunction, $options);

if ($attemptsLeft < 0 && $throwable) {
throw $throwable;
}

return $callbackResult;
}
}
3 changes: 3 additions & 0 deletions src/Connection.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,14 @@
use Illuminate\Database\Connection as BaseConnection;
use Illuminate\Support\Arr;
use InvalidArgumentException;
use Jenssegers\Mongodb\Concerns\ManagesTransactions;
use MongoDB\Client;
use MongoDB\Database;

class Connection extends BaseConnection
{
use ManagesTransactions;

/**
* The MongoDB database handler.
*
Expand Down
47 changes: 37 additions & 10 deletions src/Query/Builder.php
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,8 @@ public function getFresh($columns = [], $returnLazy = false)
$options = array_merge($options, $this->options);
}

$options = $this->inheritConnectionOptions($options);

// Execute aggregation
$results = iterator_to_array($this->collection->aggregate($pipeline, $options));

Expand All @@ -356,12 +358,10 @@ public function getFresh($columns = [], $returnLazy = false)
// Return distinct results directly
$column = isset($this->columns[0]) ? $this->columns[0] : '_id';

$options = $this->inheritConnectionOptions();

// Execute distinct
if ($wheres) {
$result = $this->collection->distinct($column, $wheres);
} else {
$result = $this->collection->distinct($column);
}
$result = $this->collection->distinct($column, $wheres ?: [], $options);

return new Collection($result);
} // Normal query
Expand Down Expand Up @@ -407,6 +407,8 @@ public function getFresh($columns = [], $returnLazy = false)
$options = array_merge($options, $this->options);
}

$options = $this->inheritConnectionOptions($options);

// Execute query and get MongoCursor
$cursor = $this->collection->find($wheres, $options);

Expand Down Expand Up @@ -581,8 +583,9 @@ public function insert(array $values)
$values = [$values];
}

// Batch insert
$result = $this->collection->insertMany($values);
$options = $this->inheritConnectionOptions();

$result = $this->collection->insertMany($values, $options);

return 1 == (int) $result->isAcknowledged();
}
Expand All @@ -592,7 +595,9 @@ public function insert(array $values)
*/
public function insertGetId(array $values, $sequence = null)
{
$result = $this->collection->insertOne($values);
$options = $this->inheritConnectionOptions();

$result = $this->collection->insertOne($values, $options);

if (1 == (int) $result->isAcknowledged()) {
if ($sequence === null) {
Expand All @@ -614,6 +619,8 @@ public function update(array $values, array $options = [])
$values = ['$set' => $values];
}

$options = $this->inheritConnectionOptions($options);

return $this->performUpdate($values, $options);
}

Expand All @@ -635,6 +642,8 @@ public function increment($column, $amount = 1, array $extra = [], array $option
$query->orWhereNotNull($column);
});

$options = $this->inheritConnectionOptions($options);

return $this->performUpdate($query, $options);
}

Expand Down Expand Up @@ -696,7 +705,10 @@ public function delete($id = null)
}

$wheres = $this->compileWheres();
$result = $this->collection->DeleteMany($wheres);
$options = $this->inheritConnectionOptions();

$result = $this->collection->deleteMany($wheres, $options);

if (1 == (int) $result->isAcknowledged()) {
return $result->getDeletedCount();
}
Expand All @@ -721,7 +733,8 @@ public function from($collection, $as = null)
*/
public function truncate(): bool
{
$result = $this->collection->deleteMany([]);
$options = $this->inheritConnectionOptions();
$result = $this->collection->deleteMany([], $options);

return 1 === (int) $result->isAcknowledged();
}
Expand Down Expand Up @@ -855,6 +868,8 @@ protected function performUpdate($query, array $options = [])
$options['multiple'] = true;
}

$options = $this->inheritConnectionOptions($options);

$wheres = $this->compileWheres();
$result = $this->collection->UpdateMany($wheres, $query, $options);
if (1 == (int) $result->isAcknowledged()) {
Expand Down Expand Up @@ -1249,6 +1264,18 @@ public function options(array $options)
return $this;
}

/**
* Apply the connection's session to options if it's not already specified.
*/
private function inheritConnectionOptions(array $options = []): array
{
if (! isset($options['session']) && ($session = $this->connection->getSession())) {
$options['session'] = $session;
}

return $options;
}

/**
* @inheritdoc
*/
Expand Down

0 comments on commit e5e9193

Please sign in to comment.