Skip to content

Getting Started with the new MongoDB ClientBulkWrite API in PHP

Notifications You must be signed in to change notification settings

jaymoh/mongodb-bulkwrite-php

Repository files navigation

Getting Started with MongoDB BulkWrite in PHP

This tutorial demonstrates how to use the new MongoDB Client BulkWrite API introduced in MongoDB PHP Library 2.x. The new API offers significant improvements over the legacy Collection::bulkWrite() method, including cross-database operations and cursor-based results.

Table of Contents


Prerequisites

Installation

  1. Install the MongoDB PHP extension via PIE:
pie install mongodb/mongodb-extension
  1. Install the MongoDB PHP Library via Composer:
composer require mongodb/mongodb:^2.1
  1. Create a .env file with your MongoDB connection string:
MONGODB_URI=mongodb+srv://username:password@cluster.mongodb.net/
MONGODB_DB=bulkwritedb

Connecting to MongoDB

Before performing bulk write operations, verify your connection to MongoDB:

<?php
require 'vendor/autoload.php';

use MongoDB\Client;
use Dotenv\Dotenv;

$dotenv = Dotenv::createImmutable(__DIR__);
$dotenv->load();

$uri = $_ENV['MONGODB_URI'];

$client = new Client($uri);

// Test connection
$databases = $client->listDatabases();
echo "Successfully connected to MongoDB!\n";

Run the connection test:

php bulk_write_connect.php

Understanding Client BulkWrite

The new Client::bulkWrite() API allows you to combine multiple write operations into a single batch request that can be executed across multiple collections and multiple databases in the same cluster.

Key Advantages Over Legacy API

Feature Legacy Collection::bulkWrite() New Client::bulkWrite()
Scope Single collection only Multiple collections/databases
Results Single BSON document (16MB limit) Cursor-based (no size limit)
Max Operations Limited by response size Virtually unlimited
Batch Size Typically 500-1,000 5,000-10,000+

Creating a ClientBulkWrite Instance

use MongoDB\ClientBulkWrite;

// Create bulk write starting with a collection
$bulkWrite = ClientBulkWrite::createWithCollection($collection, [
    'ordered' => true,        // Execute operations in order (default: true)
    'verboseResults' => true, // Get per-operation details
]);

BulkWrite Operations

insertOne()

Insert a single document into the collection:

// Insert with captured ID
$bulkWrite->insertOne(
    ['name' => 'Alice Johnson', 'email' => 'alice@example.com', 'status' => 'active'],
    $insertedId  // Optional: captures the generated _id
);

updateOne()

Update a single document matching the filter:

$bulkWrite->updateOne(
    ['email' => 'alice@example.com'],                    // Filter
    ['$set' => ['status' => 'premium', 'updated_at' => new MongoDB\BSON\UTCDateTime()]]  // Update
);

// With upsert option
$bulkWrite->updateOne(
    ['email' => 'new@example.com'],
    ['$set' => ['name' => 'New User', 'email' => 'new@example.com']],
    ['upsert' => true]
);

updateMany()

Update all documents matching the filter:

$bulkWrite->updateMany(
    ['status' => 'active'],           // Filter
    ['$set' => ['newsletter' => true]] // Update
);

replaceOne()

Replace an entire document:

$bulkWrite->replaceOne(
    ['email' => 'old@example.com'],   // Filter
    [                                  // Replacement document
        'name' => 'Updated Name',
        'email' => 'new@example.com',
        'migrated' => true
    ]
);

// With upsert
$bulkWrite->replaceOne(
    ['guest_name' => 'VIP Guest'],
    ['guest_name' => 'VIP Guest', 'party_size' => 8, 'status' => 'confirmed'],
    ['upsert' => true]
);

deleteOne()

Delete a single document:

$bulkWrite->deleteOne(['email' => 'delete@example.com']);

deleteMany()

Delete all documents matching the filter:

$bulkWrite->deleteMany(['status' => 'inactive']);

Switching Collections and Databases

One of the most powerful features of the new API is the ability to perform operations across multiple collections and databases in a single batch.

// Start with e-commerce database
$ecommerceDb = $client->ecommerce;
$customersCollection = $ecommerceDb->customers;
$ordersCollection = $ecommerceDb->orders;

// Restaurant database
$restaurantDb = $client->restaurant;
$menusCollection = $restaurantDb->menus;

// Create bulk write starting with customers
$bulkWrite = ClientBulkWrite::createWithCollection($customersCollection);

// Add customer operations
$bulkWrite->insertOne(['name' => 'Alice', 'email' => 'alice@example.com']);
$bulkWrite->updateOne(['name' => 'Alice'], ['$set' => ['status' => 'premium']]);

// Switch to orders collection (same database)
$bulkWrite = $bulkWrite->withCollection($ordersCollection);
$bulkWrite->insertOne(['customer' => 'Alice', 'total' => 99.99]);

// Switch to menus collection (different database!)
$bulkWrite = $bulkWrite->withCollection($menusCollection);
$bulkWrite->insertOne(['name' => 'Pizza', 'price' => 12.99]);
$bulkWrite->deleteMany(['available' => false]);

// Execute ALL operations in a single request
$result = $client->bulkWrite($bulkWrite);

Handling BulkWrite Results

Summary Counts

$result = $client->bulkWrite($bulkWrite);

echo "Inserted:  " . $result->getInsertedCount() . "\n";
echo "Matched:   " . $result->getMatchedCount() . "\n";
echo "Modified:  " . $result->getModifiedCount() . "\n";
echo "Upserted:  " . $result->getUpsertedCount() . "\n";
echo "Deleted:   " . $result->getDeletedCount() . "\n";
echo "Acknowledged: " . ($result->isAcknowledged() ? 'Yes' : 'No') . "\n";

Verbose Results (Cursor-Based)

Enable verboseResults to get per-operation details returned via cursor:

$bulkWrite = ClientBulkWrite::createWithCollection($collection, [
    'verboseResults' => true
]);

// ... add operations ...

$result = $client->bulkWrite($bulkWrite);

// Insert results - streamed via cursor
foreach ($result->getInsertResults() as $index => $insertResult) {
    echo "Insert #$index - ID: " . $insertResult->insertedId . "\n";
}

// Update results
foreach ($result->getUpdateResults() as $index => $updateResult) {
    echo "Update #$index - Matched: " . $updateResult->matchedCount . "\n";
    echo "             Modified: " . $updateResult->modifiedCount . "\n";
    if (isset($updateResult->upsertedId)) {
        echo "             Upserted ID: " . $updateResult->upsertedId . "\n";
    }
}

// Delete results
foreach ($result->getDeleteResults() as $index => $deleteResult) {
    echo "Delete #$index - Deleted: " . $deleteResult->deletedCount . "\n";
}

Why Cursor-Based Results Matter

The legacy Collection::bulkWrite() API returned results in a single BSON document. If you performed thousands of operations, the response could exceed MongoDB's 16MB BSON document size limit, causing the operation to fail.

The new Client::bulkWrite() API returns verbose results via a cursor, which means:

  • ✓ Results are streamed incrementally
  • ✓ No 16MB response size limit
  • ✓ Memory efficient for large bulk writes
  • ✓ Handle millions of operations without failure

Real-World Example: CSV Import

Import data from multiple CSV files into different collections:

// Configuration
$batchSize = 5000; // Much larger than legacy API's 500-1000 // You can still go higher/lower based on memory/network

// Open CSV files
$customersHandle = fopen('customers.csv', 'r');
$orgsHandle = fopen('organizations.csv', 'r');

// Skip headers
fgetcsv($customersHandle);
fgetcsv($orgsHandle);

$bulkWrite = null;
$operationCount = 0;

while (!feof($customersHandle) || !feof($orgsHandle)) {
    // Read and add customer
    if (!feof($customersHandle)) {
        $row = fgetcsv($customersHandle);
        if ($row) {
            $bulkWrite = $bulkWrite ?? ClientBulkWrite::createWithCollection($customersCollection, [
                'ordered' => false,
                'verboseResults' => false
            ]);
            $bulkWrite = $bulkWrite->withCollection($customersCollection);
            $bulkWrite->updateOne(
                ['customer_id' => $row[0]],
                ['$set' => ['name' => $row[1], 'email' => $row[2], 'imported_at' => new UTCDateTime()]],
                ['upsert' => true]
            );
            $operationCount++;
        }
    }

    // Read and add organization
    if (!feof($orgsHandle)) {
        $row = fgetcsv($orgsHandle);
        if ($row) {
            $bulkWrite = $bulkWrite->withCollection($organizationsCollection);
            $bulkWrite->updateOne(
                ['org_id' => $row[0]],
                ['$set' => ['name' => $row[1], 'industry' => $row[2], 'imported_at' => new UTCDateTime()]],
                ['upsert' => true]
            );
            $operationCount++;
        }
    }

    // Execute batch when threshold reached
    if ($operationCount >= $batchSize) {
        $client->bulkWrite($bulkWrite);
        $bulkWrite = null;
        $operationCount = 0;
    }
}

// Execute remaining operations
if ($bulkWrite && $operationCount > 0) {
    $client->bulkWrite($bulkWrite);
}

Error Handling

try {
    $result = $client->bulkWrite($bulkWrite);
    
} catch (MongoDB\Driver\Exception\BulkWriteCommandException $e) {
    echo "Bulk Write Error: " . $e->getMessage() . "\n";
    
    // Get partial results (operations that succeeded)
    $partialResult = $e->getPartialResult();
    if ($partialResult) {
        echo "Partial Results:\n";
        echo "  Inserted: " . $partialResult->getInsertedCount() . "\n";
        echo "  Upserted: " . $partialResult->getUpsertedCount() . "\n";
        echo "  Modified: " . $partialResult->getModifiedCount() . "\n";
    }
    
    // Get specific write errors
    $writeErrors = $e->getWriteErrors();
    foreach ($writeErrors as $index => $error) {
        echo "Operation #$index failed: " . $error->getMessage() . "\n";
    }
    
    // Get write concern errors
    $writeConcernErrors = $e->getWriteConcernErrors();
    foreach ($writeConcernErrors as $wcError) {
        echo "Write Concern Error: " . $wcError->getMessage() . "\n";
    }
}

Best Practices

1. Batching Strategy

While the new API can handle very large operations, batching is still recommended:

$batchSize = 5000; // 5-10x larger than legacy API

Why batch?

  • Memory efficiency (don't load entire dataset as operations)
  • Network reliability (smaller retries on failure)
  • Progress tracking and resumability
  • Server resource management

2. Ordered vs Unordered

// Ordered (default) - stops on first error
$bulkWrite = ClientBulkWrite::createWithCollection($collection, [
    'ordered' => true
]);

// Unordered - continues after errors, better performance
$bulkWrite = ClientBulkWrite::createWithCollection($collection, [
    'ordered' => false
]);

3. Verbose Results

// Enable for debugging/auditing
$bulkWrite = ClientBulkWrite::createWithCollection($collection, [
    'verboseResults' => true
]);

// Disable for large imports (memory efficiency)
$bulkWrite = ClientBulkWrite::createWithCollection($collection, [
    'verboseResults' => false
]);

4. Write Concern

use MongoDB\Driver\WriteConcern;

$writeConcern = new WriteConcern(WriteConcern::MAJORITY, 1000);

$bulkWrite = ClientBulkWrite::createWithCollection($collection, [
    'writeConcern' => $writeConcern
]);

5. Transactions for "All or Nothing"

When you need atomic guarantees—where either all operations succeed or none are applied—wrap your bulk write in a transaction. The MongoDB\with_transaction() helper simplifies this by handling retries for transient errors automatically:

use MongoDB\Client;
use MongoDB\ClientBulkWrite;
use MongoDB\Driver\Session;

use function MongoDB\with_transaction;

$client = new Client($uri);

// Start a session for the transaction
$session = $client->startSession();

$customersCollection = $client->selectCollection('shop', 'customers');
$ordersCollection = $client->selectCollection('shop', 'orders');

// Use MongoDB\with_transaction() helper for automatic retry handling
with_transaction($session, function (Session $session) use ($client, $customersCollection, $ordersCollection) {
    // Create bulk write with the session
    $bulkWrite = ClientBulkWrite::createWithCollection($customersCollection, [
        'session' => $session,
        'ordered' => true
    ]);

    // Add operations that must all succeed together
    $bulkWrite->insertOne(['name' => 'Alice', 'email' => 'alice@example.com', 'balance' => 1000]);
    $bulkWrite->updateOne(
        ['name' => 'Bob'],
        ['$inc' => ['balance' => -500]]
    );

    // Switch to orders collection (same transaction)
    $bulkWrite = $bulkWrite->withCollection($ordersCollection);
    $bulkWrite->insertOne([
        'customer' => 'Bob',
        'recipient' => 'Alice',
        'amount' => 500,
        'type' => 'transfer'
    ]);

    // Execute all operations atomically
    $client->bulkWrite($bulkWrite);
});

echo "Transaction committed successfully!\n";

Why use transactions with bulk writes?

  • Atomicity: All operations commit together or roll back on failure
  • Automatic retries: with_transaction() retries on transient errors (e.g., network issues)
  • Cross-collection consistency: Maintain data integrity across multiple collections

When to use transactions:

  • Financial operations (transfers, payments)
  • Related data that must stay consistent (e.g., inventory + orders)
  • Multi-collection updates that depend on each other
  • Any scenario where partial writes would leave data in an invalid state

Note: Transactions require a replica set or sharded cluster. They are not available on standalone MongoDB instances.


Tutorial Files

File Description
bulk_write_connect.php Connection test script
bulk_write_demo.php Complete demo of all operations across multiple databases
bulk_write_results.php Handling verbose results and cursor-based responses
bulk_write_csv_import.php Real-world CSV import example

Running the Examples

# Test connection
php bulk_write_connect.php

# Run operations demo
php bulk_write_demo.php

# Explore results handling
php bulk_write_results.php

# Import CSV data
php bulk_write_csv_import.php

Using Docker

If you prefer using Docker, we have included a Dockerfile that sets up the PHP environment with the MongoDB extension and necessary dependencies. We also provide a docker-compose.yml that sets up a MongoDB Atlas container alongside the PHP environment.

Update the MONGO_URI in the .env to point to your MongoDB Atlas cluster or local MongoDB instance, e.g. MONGODB_URI=mongodb://bulkwriteuser:secret@mongodb:27017/bulkwritedb?authSource=bulkwritedb

To build and run the Docker container, use the following commands:

 docker compose up -d --build

Then, you can run any script directly, e.g., to run the connect example:

 docker compose exec php php bulk_write_connect.php

You can enter the container shell for interactive use:

 docker compose exec php sh

Stop the containers when done:

 docker compose down

Next Steps

Explore the following resources to deepen your understanding of MongoDB and PHP:


License

MIT License - Feel free to use this tutorial code in your projects.

About

Getting Started with the new MongoDB ClientBulkWrite API in PHP

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published