Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

s3, s3-multipart, tus: queue requests for soken token for remote files #3797

Merged
merged 7 commits into from
Jun 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
65 changes: 36 additions & 29 deletions packages/@uppy/aws-s3-multipart/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ function assertServerError (res) {
export default class AwsS3Multipart extends BasePlugin {
static VERSION = packageJson.version

#queueRequestSocketToken

#client

constructor (uppy, opts) {
Expand Down Expand Up @@ -50,6 +52,8 @@ export default class AwsS3Multipart extends BasePlugin {
this.uploaders = Object.create(null)
this.uploaderEvents = Object.create(null)
this.uploaderSockets = Object.create(null)

this.#queueRequestSocketToken = this.requests.wrapPromiseFunction(this.#requestSocketToken)
}

[Symbol.for('uppy test: getClient')] () { return this.#client }
Expand Down Expand Up @@ -289,41 +293,45 @@ export default class AwsS3Multipart extends BasePlugin {
})
}

uploadRemote (file) {
#requestSocketToken = async (file) => {
const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
const opts = { ...this.opts }

if (file.tus) {
// Install file-specific upload overrides.
Object.assign(opts, file.tus)
}

const res = await client.post(file.remote.url, {
...file.remote.body,
protocol: 's3-multipart',
size: file.data.size,
metadata: file.meta,
})
return res.token
}

async uploadRemote (file) {
this.resetUploaderReferences(file.id)

// Don't double-emit upload-started for Golden Retriever-restored files that were already started
if (!file.progress.uploadStarted || !file.isRestored) {
this.uppy.emit('upload-started', file)
}

if (file.serverToken) {
return this.connectToServerSocket(file)
}

return new Promise((resolve, reject) => {
const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
client.post(
file.remote.url,
{
...file.remote.body,
protocol: 's3-multipart',
size: file.data.size,
metadata: file.meta,
},
).then((res) => {
this.uppy.setFileState(file.id, { serverToken: res.token })
// eslint-disable-next-line no-param-reassign
file = this.uppy.getFile(file.id)
try {
if (file.serverToken) {
return this.connectToServerSocket(file)
}).then(() => {
resolve()
}).catch((err) => {
this.uppy.emit('upload-error', file, err)
reject(err)
})
})
}
const serverToken = await this.#queueRequestSocketToken(file)

this.uppy.setFileState(file.id, { serverToken })
return this.connectToServerSocket(this.uppy.getFile(file.id))
} catch (err) {
this.uppy.emit('upload-error', file, err)
throw err
}
}

connectToServerSocket (file) {
Expand All @@ -332,7 +340,7 @@ export default class AwsS3Multipart extends BasePlugin {

const token = file.serverToken
const host = getSocketHost(file.remote.companionUrl)
const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
const socket = new Socket({ target: `${host}/api/${token}` })
this.uploaderSockets[file.id] = socket
this.uploaderEvents[file.id] = new EventTracker(this.uppy)

Expand Down Expand Up @@ -422,7 +430,6 @@ export default class AwsS3Multipart extends BasePlugin {
})

queuedRequest = this.requests.run(() => {
socket.open()
if (file.isPaused) {
socket.send('pause', {})
}
Expand Down
52 changes: 39 additions & 13 deletions packages/@uppy/aws-s3/src/MiniXHRUpload.js
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ function createFormDataUpload (file, opts) {
const createBareUpload = file => file.data

export default class MiniXHRUpload {
#queueRequestSocketToken

constructor (uppy, opts) {
this.uppy = uppy
this.opts = {
Expand All @@ -65,6 +67,8 @@ export default class MiniXHRUpload {
this.requests = opts[internalRateLimitedQueue]
this.uploaderEvents = Object.create(null)
this.i18n = opts.i18n

this.#queueRequestSocketToken = this.requests.wrapPromiseFunction(this.#requestSocketToken)
}

#getOptions (file) {
Expand Down Expand Up @@ -243,19 +247,21 @@ export default class MiniXHRUpload {
})
}

#uploadRemoteFile (file) {
#requestSocketToken = async (file) => {
const opts = this.#getOptions(file)
// This is done in index.js in the S3 plugin.
// this.uppy.emit('upload-started', file)

const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
const metaFields = Array.isArray(opts.metaFields)
? opts.metaFields
// Send along all fields by default.
// Send along all fields by default.
: Object.keys(file.meta)

const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
return client.post(file.remote.url, {
if (file.tus) {
// Install file-specific upload overrides.
Object.assign(opts, file.tus)
}

const res = await client.post(file.remote.url, {
...file.remote.body,
endpoint: opts.endpoint,
size: file.data.size,
Expand All @@ -264,14 +270,34 @@ export default class MiniXHRUpload {
httpMethod: opts.method,
useFormData: opts.formData,
headers: opts.headers,
}).then(res => new Promise((resolve, reject) => {
const { token } = res
})
return res.token
}

async #uploadRemoteFile (file) {
try {
if (file.serverToken) {
return this.connectToServerSocket(file)
}
const serverToken = await this.#queueRequestSocketToken(file)

this.uppy.setFileState(file.id, { serverToken })
return this.connectToServerSocket(this.uppy.getFile(file.id))
} catch (err) {
this.uppy.emit('upload-error', file, err)
throw err
}
}

connectToServerSocket (file) {
return new Promise((resolve, reject) => {
const opts = this.#getOptions(file)
const token = file.serverToken
const host = getSocketHost(file.remote.companionUrl)
const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
const socket = new Socket({ target: `${host}/api/${token}` })
this.uploaderEvents[file.id] = new EventTracker(this.uppy)

const queuedRequest = this.requests.run(() => {
socket.open()
if (file.isPaused) {
socket.send('pause', {})
}
Expand Down Expand Up @@ -341,6 +367,6 @@ export default class MiniXHRUpload {
}).catch((err) => {
this.uppy.emit('upload-error', file, err)
return Promise.reject(err)
}))
})
}
}
65 changes: 36 additions & 29 deletions packages/@uppy/tus/src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,8 @@ export default class Tus extends BasePlugin {

#retryDelayIterator

#queueRequestSocketToken

/**
* @param {Uppy} uppy
* @param {TusOptions} opts
Expand Down Expand Up @@ -97,6 +99,7 @@ export default class Tus extends BasePlugin {

this.handleResetProgress = this.handleResetProgress.bind(this)
this.handleUpload = this.handleUpload.bind(this)
this.#queueRequestSocketToken = this.requests.wrapPromiseFunction(this.#requestSocketToken)
}

handleResetProgress () {
Expand Down Expand Up @@ -427,43 +430,48 @@ export default class Tus extends BasePlugin {
})
}

/**
* @param {UppyFile} file for use with upload
* @returns {Promise<void>}
*/
async uploadRemote (file) {
this.resetUploaderReferences(file.id)

#requestSocketToken = async (file) => {
const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
const opts = { ...this.opts }

if (file.tus) {
// Install file-specific upload overrides.
Object.assign(opts, file.tus)
}

this.uppy.emit('upload-started', file)
this.uppy.log(file.remote.url)
const res = await client.post(file.remote.url, {
...file.remote.body,
endpoint: opts.endpoint,
uploadUrl: opts.uploadUrl,
protocol: 'tus',
size: file.data.size,
headers: opts.headers,
metadata: file.meta,
})
return res.token
}

if (file.serverToken) {
await this.connectToServerSocket(file)
return
}
Murderlon marked this conversation as resolved.
Show resolved Hide resolved
/**
* @param {UppyFile} file for use with upload
* @returns {Promise<void>}
*/
async uploadRemote (file) {
this.resetUploaderReferences(file.id)

const Client = file.remote.providerOptions.provider ? Provider : RequestClient
const client = new Client(this.uppy, file.remote.providerOptions)
// Don't double-emit upload-started for Golden Retriever-restored files that were already started
if (!file.progress.uploadStarted || !file.isRestored) {
this.uppy.emit('upload-started', file)
}

try {
// !! cancellation is NOT supported at this stage yet
const res = await client.post(file.remote.url, {
...file.remote.body,
endpoint: opts.endpoint,
uploadUrl: opts.uploadUrl,
protocol: 'tus',
size: file.data.size,
headers: opts.headers,
metadata: file.meta,
})
this.uppy.setFileState(file.id, { serverToken: res.token })
await this.connectToServerSocket(this.uppy.getFile(file.id))
if (file.serverToken) {
return this.connectToServerSocket(file)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

what's the purpose of calling this here? I thought this should only happen after await this.#queueRequestSocketToken(file) has returned?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we retry then this is called again but we don't need a new server token, we already have it. In some other instances it's also possible to already have it

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, a comment could be nice

}
const serverToken = await this.#queueRequestSocketToken(file)

this.uppy.setFileState(file.id, { serverToken })
return this.connectToServerSocket(this.uppy.getFile(file.id))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

couldn't we just do return this.connectToServerSocket(file) ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could the await make the file instance outdated? If the file was removed by the user while this code was waiting for the RateLimitedQueue to dispatch maybe? If so, it wouldn't be equivalent to return this.connectToServerSocket(file)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's indeed safer to do it this way. It's also cheap so it doesn't matter much

} catch (err) {
this.uppy.emit('upload-error', file, err)
throw err
Expand All @@ -482,7 +490,7 @@ export default class Tus extends BasePlugin {
return new Promise((resolve, reject) => {
const token = file.serverToken
const host = getSocketHost(file.remote.companionUrl)
const socket = new Socket({ target: `${host}/api/${token}`, autoOpen: false })
const socket = new Socket({ target: `${host}/api/${token}` })
this.uploaderSockets[file.id] = socket
this.uploaderEvents[file.id] = new EventTracker(this.uppy)

Expand Down Expand Up @@ -591,7 +599,6 @@ export default class Tus extends BasePlugin {
})

queuedRequest = this.requests.run(() => {
socket.open()
if (file.isPaused) {
socket.send('pause', {})
}
Expand Down