Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3, s3-multipart, tus: queue requests for soken token for remote files #3797

Merged
merged 7 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
60 changes: 34 additions & 26 deletions packages/@uppy/aws-s3-multipart/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ function assertServerError (res) {
export default class AwsS3Multipart extends BasePlugin {
static VERSION = packageJson.version

#queueRequestSocketToken

constructor (uppy, opts) {
super(uppy, opts)
this.type = 'uploader'
Expand Down Expand Up @@ -47,6 +49,8 @@ export default class AwsS3Multipart extends BasePlugin {
this.uploaders = Object.create(null)
this.uploaderEvents = Object.create(null)
this.uploaderSockets = Object.create(null)

this.#queueRequestSocketToken = this.requests.wrapPromiseFunction(this.#requestSocketToken)
}

/**
Expand Down Expand Up @@ -279,7 +283,32 @@ export default class AwsS3Multipart extends BasePlugin {
})
}

uploadRemote (file) {
#requestSocketToken = async (file) => {
const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
const opts = { ...this.opts }

if (file.tus) {
// Install file-specific upload overrides.
Object.assign(opts, file.tus)
}

try {
// !! cancellation is NOT supported at this stage yet
Murderlon marked this conversation as resolved.
Show resolved Hide resolved
const res = await client.post(file.remote.url, {
...file.remote.body,
protocol: 's3-multipart',
size: file.data.size,
metadata: file.meta,
})
return res.token
} catch (err) {
this.uppy.emit('upload-error', file, err)
throw err
}
Murderlon marked this conversation as resolved.
Show resolved Hide resolved
}

async uploadRemote (file) {
this.resetUploaderReferences(file.id)

// Don't double-emit upload-started for Golden Retriever-restored files that were already started
Expand All @@ -291,29 +320,9 @@ export default class AwsS3Multipart extends BasePlugin {
return this.connectToServerSocket(file)
}

return new Promise((resolve, reject) => {
const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
client.post(
file.remote.url,
{
...file.remote.body,
protocol: 's3-multipart',
size: file.data.size,
metadata: file.meta,
},
).then((res) => {
this.uppy.setFileState(file.id, { serverToken: res.token })
// eslint-disable-next-line no-param-reassign
file = this.uppy.getFile(file.id)
return this.connectToServerSocket(file)
}).then(() => {
resolve()
}).catch((err) => {
this.uppy.emit('upload-error', file, err)
reject(err)
})
})
const serverToken = await this.#queueRequestSocketToken(file)
this.uppy.setFileState(file.id, { serverToken })
return this.connectToServerSocket(this.uppy.getFile(file.id))
}

connectToServerSocket (file) {
Expand All @@ -322,7 +331,7 @@ export default class AwsS3Multipart extends BasePlugin {

const token = file.serverToken
const host = getSocketHost(file.remote.companionUrl)
const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
const socket = new Socket({ target: `${host}/api/${token}` })
this.uploaderSockets[file.id] = socket
this.uploaderEvents[file.id] = new EventTracker(this.uppy)

Expand Down Expand Up @@ -412,7 +421,6 @@ export default class AwsS3Multipart extends BasePlugin {
})

queuedRequest = this.requests.run(() => {
socket.open()
if (file.isPaused) {
socket.send('pause', {})
}
Expand Down
69 changes: 48 additions & 21 deletions packages/@uppy/aws-s3/src/MiniXHRUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ function createFormDataUpload (file, opts) {
const createBareUpload = file => file.data

export default class MiniXHRUpload {
#queueRequestSocketToken

constructor (uppy, opts) {
this.uppy = uppy
this.opts = {
Expand All @@ -65,6 +67,8 @@ export default class MiniXHRUpload {
this.requests = opts[internalRateLimitedQueue]
this.uploaderEvents = Object.create(null)
this.i18n = opts.i18n

this.#queueRequestSocketToken = this.requests.wrapPromiseFunction(this.#requestSocketToken)
}

#getOptions (file) {
Expand Down Expand Up @@ -243,35 +247,58 @@ export default class MiniXHRUpload {
})
}

#uploadRemoteFile (file) {
#requestSocketToken = async (file) => {
const opts = this.#getOptions(file)
// This is done in index.js in the S3 plugin.
// this.uppy.emit('upload-started', file)

const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
const metaFields = Array.isArray(opts.metaFields)
? opts.metaFields
// Send along all fields by default.
// Send along all fields by default.
: Object.keys(file.meta)

const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
return client.post(file.remote.url, {
...file.remote.body,
endpoint: opts.endpoint,
size: file.data.size,
fieldname: opts.fieldName,
metadata: Object.fromEntries(metaFields.map(name => [name, file.meta[name]])),
httpMethod: opts.method,
useFormData: opts.formData,
headers: opts.headers,
}).then(res => new Promise((resolve, reject) => {
const { token } = res
if (file.tus) {
// Install file-specific upload overrides.
Object.assign(opts, file.tus)
}

try {
// !! cancellation is NOT supported at this stage yet
const res = await client.post(file.remote.url, {
...file.remote.body,
endpoint: opts.endpoint,
size: file.data.size,
fieldname: opts.fieldName,
metadata: Object.fromEntries(metaFields.map(name => [name, file.meta[name]])),
httpMethod: opts.method,
useFormData: opts.formData,
headers: opts.headers,
})
return res.token
} catch (err) {
this.uppy.emit('upload-error', file, err)
throw err
}
}

async #uploadRemoteFile (file) {
if (file.serverToken) {
return this.connectToServerSocket(file)
}

const serverToken = await this.#queueRequestSocketToken(file)
this.uppy.setFileState(file.id, { serverToken })
return this.connectToServerSocket(this.uppy.getFile(file.id))
}

connectToServerSocket (file) {
return new Promise((resolve, reject) => {
const opts = this.#getOptions(file)
const token = file.serverToken
const host = getSocketHost(file.remote.companionUrl)
const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
const socket = new Socket({ target: `${host}/api/${token}` })
this.uploaderEvents[file.id] = new EventTracker(this.uppy)

const queuedRequest = this.requests.run(() => {
socket.open()
if (file.isPaused) {
socket.send('pause', {})
}
Expand Down Expand Up @@ -341,6 +368,6 @@ export default class MiniXHRUpload {
}).catch((err) => {
this.uppy.emit('upload-error', file, err)
return Promise.reject(err)
}))
})
}
}
52 changes: 30 additions & 22 deletions packages/@uppy/tus/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export default class Tus extends BasePlugin {

#retryDelayIterator

#queueRequestSocketToken

/**
* @param {Uppy} uppy
* @param {TusOptions} opts
Expand Down Expand Up @@ -97,6 +99,7 @@ export default class Tus extends BasePlugin {

this.handleResetProgress = this.handleResetProgress.bind(this)
this.handleUpload = this.handleUpload.bind(this)
this.#queueRequestSocketToken = this.requests.wrapPromiseFunction(this.#requestSocketToken)
}

handleResetProgress () {
Expand Down Expand Up @@ -427,30 +430,16 @@ export default class Tus extends BasePlugin {
})
}

/**
* @param {UppyFile} file for use with upload
* @returns {Promise<void>}
*/
async uploadRemote (file) {
this.resetUploaderReferences(file.id)

#requestSocketToken = async (file) => {
const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
const opts = { ...this.opts }

if (file.tus) {
// Install file-specific upload overrides.
Object.assign(opts, file.tus)
}

this.uppy.emit('upload-started', file)
this.uppy.log(file.remote.url)

if (file.serverToken) {
await this.connectToServerSocket(file)
return
}
Murderlon marked this conversation as resolved.
Show resolved Hide resolved

const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)

try {
// !! cancellation is NOT supported at this stage yet
const res = await client.post(file.remote.url, {
Expand All @@ -462,14 +451,34 @@ export default class Tus extends BasePlugin {
headers: opts.headers,
metadata: file.meta,
})
this.uppy.setFileState(file.id, { serverToken: res.token })
await this.connectToServerSocket(this.uppy.getFile(file.id))
return res.token
} catch (err) {
this.uppy.emit('upload-error', file, err)
throw err
}
}

/**
* @param {UppyFile} file for use with upload
* @returns {Promise<void>}
*/
async uploadRemote (file) {
this.resetUploaderReferences(file.id)

// Don't double-emit upload-started for Golden Retriever-restored files that were already started
if (!file.progress.uploadStarted || !file.isRestored) {
this.uppy.emit('upload-started', file)
}

if (file.serverToken) {
return this.connectToServerSocket(file)
}

const serverToken = await this.#queueRequestSocketToken(file)
this.uppy.setFileState(file.id, { serverToken })
return this.connectToServerSocket(this.uppy.getFile(file.id))
}

/**
* See the comment on the upload() method.
*
Expand All @@ -482,7 +491,7 @@ export default class Tus extends BasePlugin {
return new Promise((resolve, reject) => {
const token = file.serverToken
const host = getSocketHost(file.remote.companionUrl)
const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
const socket = new Socket({ target: `${host}/api/${token}` })
this.uploaderSockets[file.id] = socket
this.uploaderEvents[file.id] = new EventTracker(this.uppy)

Expand Down Expand Up @@ -591,7 +600,6 @@ export default class Tus extends BasePlugin {
})

queuedRequest = this.requests.run(() => {
socket.open()
if (file.isPaused) {
socket.send('pause', {})
}
Expand Down