Skip to content

Commit

Permalink
fix(assets): group asset patches per document (#4)
Browse files Browse the repository at this point in the history
Groups patching of assets per document to reduce lock contention when concurrently
executing mutations.

```
{
  "mutations": [
    {
      "patch": {
        "id": "a-doc-id",
        "set": {
          "pathA.asset": {
            "_ref": "image-foo-200x200-png",
            "_type": "reference"
          },
          "pathB.asset": {
            "_ref": "image-bar-200x200-png",
            "_type": "reference"
          }
        },
        "setIfMissing": {
          "pathA": {
            "_type": "image"
          },
          "pathB": {
            "_type": "image"
          }
        }
      }
    }
  ]
}
```
  • Loading branch information
mwain committed May 17, 2024
1 parent 79524ff commit 1787dd3
Show file tree
Hide file tree
Showing 5 changed files with 1,639 additions and 157 deletions.
86 changes: 57 additions & 29 deletions src/uploadAssets.js
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ const urlExists = require('./util/urlExists')
const ASSET_UPLOAD_CONCURRENCY = 8
const ASSET_PATCH_CONCURRENCY = 30
const ASSET_PATCH_BATCH_SIZE = 50
const ASSET_PATCH_BATCH_TASK_SIZE = 1000

async function uploadAssets(assets, options) {
const concurrency = options.assetConcurrency || ASSET_UPLOAD_CONCURRENCY
Expand Down Expand Up @@ -214,28 +215,52 @@ function getUploadFailures(assetRefMap, assetIds) {
function setAssetReferences(assetRefMap, assetIds, options) {
const {client, tag} = options
const lookup = assetRefMap.values()
const patchTasks = assetIds.reduce((tasks, assetId) => {

// Collects patch tasks per document to avoid patching the same document multiple times
const patchTasksPerDoc = assetIds.reduce((tasks, assetId) => {
const documents = lookup.next().value
if (typeof assetId !== 'string') {
return tasks
}

return tasks.concat(
documents.map(({documentId, path}) => ({
documentId,
path,
assetId,
})),
documents.forEach(({documentId, path}) => {
tasks[documentId] = tasks[documentId] || []
tasks[documentId].push({path, assetId})
})
return tasks
}, {})

const patchTasks = Object.entries(patchTasksPerDoc).map(([documentId, tasks]) => ({
documentId,
tasks,
}))

// We now have an array of tasks per document, each containing:
// {documentId: string, tasks: [{path, assetId}]}
// Instead of doing a single mutation per document, let's batch them up
const batches = patchTasks.reduce((acc, task) => {
if (acc.length === 0) {
return [[task]]
}

const currentBatch = acc[acc.length - 1]
const overallSize = currentBatch.reduce(
(prev, add) => (prev + add.tasks ? add.tasks.length : 0),
0,
)
}, [])

// We now have an array of simple tasks, each containing:
// {documentId, path, assetId}
// Instead of doing a single mutation per asset, let's batch them up
const batches = []
for (let i = 0; i < patchTasks.length; i += ASSET_PATCH_BATCH_SIZE) {
batches.push(patchTasks.slice(i, i + ASSET_PATCH_BATCH_SIZE))
}
if (
overallSize + task.tasks.length > ASSET_PATCH_BATCH_TASK_SIZE ||
currentBatch.length >= ASSET_PATCH_BATCH_SIZE
) {
// Create a new batch if the current one is full
acc.push([task])
return acc
}

currentBatch.push(task)
return acc
}, [])

if (batches.length === 0) {
return Promise.resolve([0])
Expand All @@ -260,27 +285,30 @@ function setAssetReferenceBatch(client, progress, tag, batch) {
.reduce(reducePatch, client.transaction())
.commit({visibility: 'async', tag: suffixTag(tag, 'asset.set-refs')})
.then(progress)
.then((res) => res.results.length),
.then(() => batch.reduce((prev, add) => prev + add.tasks.length, 0)),
)
}

function getAssetType(assetId) {
return assetId.slice(0, assetId.indexOf('-'))
}

function reducePatch(trx, task) {
return trx.patch(task.documentId, (patch) =>
patch
.setIfMissing({
[task.path]: {_type: getAssetType(task.assetId)},
})
.set({
[`${task.path}.asset`]: {
_type: 'reference',
_ref: task.assetId,
},
}),
)
function reducePatch(trx, documentTasks) {
return trx.patch(documentTasks.documentId, (patch) => {
documentTasks.tasks.forEach((task) =>
patch
.setIfMissing({
[task.path]: {_type: getAssetType(task.assetId)},
})
.set({
[`${task.path}.asset`]: {
_type: 'reference',
_ref: task.assetId,
},
}),
)
return patch
})
}

module.exports = uploadAssets

0 comments on commit 1787dd3

Please sign in to comment.