Skip to content

Commit

Permalink
[form-builder] Rewrite how we limit concurrent uploads (#245)
Browse files Browse the repository at this point in the history
  • Loading branch information
bjoerge committed Oct 12, 2017
1 parent 7adb9b7 commit cc503b1
Show file tree
Hide file tree
Showing 5 changed files with 76 additions and 65 deletions.
Original file line number Diff line number Diff line change
@@ -1,20 +1,23 @@
import client from 'part:@sanity/base/client'
import {observeWithPaths} from 'part:@sanity/base/preview'
import {withMaxConcurrency} from '../../utils/withMaxConcurrency'

function uploadAsset(assetType, file) {
const MAX_CONCURRENT_UPLOADS = 4

function uploadSanityAsset(assetType, file) {
return client.observable.assets.upload(assetType, file)
.map(event => {
if (event.type === 'response') {
// rewrite to a 'complete' event
return {
return event.type === 'response'
? { // rewrite to a 'complete' event
type: 'complete',
id: event.body.document._id,
asset: event.body.document
}
}
return event
} : event
})
}

const uploadAsset = withMaxConcurrency(uploadSanityAsset, MAX_CONCURRENT_UPLOADS)

export const uploadImageAsset = file => uploadAsset('image', file)
export const uploadFileAsset = file => uploadAsset('file', file)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import rotateImage from './image/rotateImage'
import {DEFAULT_ORIENTATION} from './image/orient'
import {set} from '../../utils/patches'
import type {UploadEvent} from './typedefs'

// The eslint import plugin doesn't work well with opaque types
// https://github.com/benmosher/eslint-plugin-import/issues/921
// https://github.com/gajus/eslint-plugin-flowtype/issues/260
Expand All @@ -13,16 +14,14 @@ import type {OrientationId} from './image/orient'
import type {ObservableI} from '../../typedefs/observable'
import {UPLOAD_STATUS_KEY} from './constants'
import {createUploadEvent, createInitialUploadEvent, CLEANUP_EVENT} from './utils'
import {createUploadId, getUploadEvents, scheduleUpload} from './uploadQueue'
import {uploadImageAsset} from '../inputs/client-adapters/assets'

type Exif = {
orientation: OrientationId
}

export default function uploadImage(file: File): ObservableI<UploadEvent> {
const uploadId = createUploadId(file)

const upload$ = getUploadEvents(uploadId)
const upload$ = uploadImageAsset(file)
.filter(event => event.stage !== 'download')
.map(event => ({
...event,
Expand All @@ -49,8 +48,6 @@ export default function uploadImage(file: File): ObservableI<UploadEvent> {
.filter(Boolean)
.map(imageUrl => createUploadEvent([set(imageUrl, [UPLOAD_STATUS_KEY, 'previewImage'])]))

scheduleUpload(uploadId, file)

return Observable.of(createInitialUploadEvent(file))
.concat(Observable.from(upload$).merge(setPreviewUrl$))
.concat(Observable.of(CLEANUP_EVENT))
Expand Down
51 changes: 0 additions & 51 deletions packages/@sanity/form-builder/src/sanity/uploads/uploadQueue.js

This file was deleted.

Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// @flow

// Takes a observable-returning function and returns a new function that limits on the number of
// concurrent observables.

import Multicast from '@sanity/observable/multicast'
import Observable from '@sanity/observable'
import type {ObservableI, Subscription} from '../../typedefs/observable'

const DEFAULT_CONCURRENCY = 4

function remove<T>(array: Array<T>, item: T): Array<T> {
const index = array.indexOf(item)
if (index > -1) {
array.splice(index, 1)
}
return array
}

export function withMaxConcurrency(func: (any) => ObservableI<*>, concurrency: number = DEFAULT_CONCURRENCY) {
const throttler = createThrottler(concurrency)
return (...args: Array<any>) => Observable.from(throttler(func(...args)))
}

export function createThrottler(concurrency: number = DEFAULT_CONCURRENCY) {
const currentSubscriptions: Array<Subscription> = []
const pendingObservables: Array<ObservableI<*>> = []
const ready$ = new Multicast()

return request

function request(observable: ObservableI<*>) {
return new Observable(observer => {
if (currentSubscriptions.length >= concurrency) {
return scheduleAndWait(observable)
.mergeMap(request)
.subscribe(observer)
}
const subscription = observable.subscribe(observer)
currentSubscriptions.push(subscription)
return () => {
remove(currentSubscriptions, subscription)
remove(pendingObservables, observable)
subscription.unsubscribe()
check()
}
})
}

function scheduleAndWait(observable) {
pendingObservables.push(observable)
return ready$.asObservable()
.first(obs => obs === observable)
}

function check() {
while (pendingObservables.length > 0 && currentSubscriptions.length < concurrency) {
ready$.next(pendingObservables.shift())
}
}
}
3 changes: 2 additions & 1 deletion packages/@sanity/form-builder/src/typedefs/observable.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,8 @@ export interface Subscription {
unsubscribe: () => void
}

type FunctionSubscriber<T> = T => void
type FunctionSubscriber<T> = T => any

type ObjectSubscriber<T> = {
next?: T => void,
error?: (error: Error) => void,
Expand Down

0 comments on commit cc503b1

Please sign in to comment.