Skip to content

Commit

Permalink
tune upload perf, better cleanup logic
Browse files Browse the repository at this point in the history
  • Loading branch information
nl0 committed Nov 15, 2021
1 parent 6889d4b commit c7a4447
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 57 deletions.
20 changes: 13 additions & 7 deletions catalog/app/containers/Bucket/PackageDialog/FilesInput.tsx
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import cx from 'classnames'
import pLimit from 'p-limit'
import * as R from 'ramda'
import * as React from 'react'
import { useDropzone, FileWithPath } from 'react-dropzone'
Expand Down Expand Up @@ -28,31 +29,36 @@ const COLORS = {

interface FileWithHash extends File {
hash: {
value: string | undefined
ready: boolean
promise: Promise<string>
value?: string
error?: Error
promise: Promise<string | undefined>
}
}

const hasHash = (f: File): f is FileWithHash => !!f && !!(f as FileWithHash).hash

// XXX: it might make sense to limit concurrency, tho the tests show perf is ok, since hashing is async anyways
const hashLimit = pLimit(2)

function computeHash(f: File) {
if (hasHash(f)) return f
const promise = PD.hashFile(f)
const hashP = hashLimit(PD.hashFile, f)
const fh = f as FileWithHash
fh.hash = { value: undefined, ready: false, promise }
promise
fh.hash = { ready: false } as any
fh.hash.promise = hashP
.catch((e) => {
// eslint-disable-next-line no-console
console.log('Error hashing file:')
console.log(`Error hashing file "${fh.name}":`)
// eslint-disable-next-line no-console
console.error(e)
fh.hash.error = e
fh.hash.ready = true
return undefined
})
.then((hash) => {
fh.hash.value = hash
fh.hash.ready = true
return hash
})
return fh
}
Expand Down
4 changes: 2 additions & 2 deletions catalog/app/containers/Bucket/PackageDialog/PackageDialog.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,9 @@ export const getNormalizedPath = (f: { path?: string; name: string }) => {
}

export async function hashFile(file: File) {
if (!window.crypto || !window.crypto.subtle || !window.crypto.subtle.digest)
throw new Error('Crypto API unavailable')
if (!window.crypto?.subtle?.digest) throw new Error('Crypto API unavailable')
const buf = await file.arrayBuffer()
// XXX: consider using hashwasm for stream-based hashing to support larger files
const hashBuf = await window.crypto.subtle.digest('SHA-256', buf)
return Array.from(new Uint8Array(hashBuf))
.map((b) => b.toString(16).padStart(2, '0'))
Expand Down
107 changes: 59 additions & 48 deletions catalog/app/containers/Bucket/PackageDialog/Uploads.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import type { S3 } from 'aws-sdk'
import * as FP from 'fp-ts'
import invariant from 'invariant'
import pLimit from 'p-limit'
import * as R from 'ramda'
import * as React from 'react'
Expand All @@ -12,7 +11,7 @@ import useMemoEq from 'utils/useMemoEq'

import type { LocalFile, ExistingFile } from './FilesInput'

export interface UploadResult extends S3.ManagedUpload.SendData {
interface UploadResult extends S3.ManagedUpload.SendData {
VersionId: string
}

Expand All @@ -25,8 +24,7 @@ export interface UploadTotalProgress {
export interface UploadsState {
[path: string]: {
file: File
upload: S3.ManagedUpload
promise: Promise<UploadResult>
promise: Promise<S3.ManagedUpload.SendData>
progress?: { total: number; loaded: number }
}
}
Expand Down Expand Up @@ -78,41 +76,52 @@ export function useUploads() {
}) => {
const limit = pLimit(2)
let rejected = false
const uploadStates = files.map(({ path, file }) => {
// reuse state if file hasnt changed
const entry = uploads[path]
if (entry && entry.file === file) return { ...entry, path }
const pendingUploads: Record<string, S3.ManagedUpload> = {}

const uploadFile = async (path: string, file: LocalFile) => {
if (rejected) {
remove(path)
return undefined as never
}

const upload: S3.ManagedUpload = s3.upload({
Bucket: bucket,
Key: `${prefix}/${path}`,
Body: file,
})

const upload: S3.ManagedUpload = s3.upload(
{
Bucket: bucket,
Key: `${prefix}/${path}`,
Body: file,
},
{
queueSize: 2,
},
)
upload.on('httpUploadProgress', ({ loaded }) => {
if (rejected) return
setUploads(R.assocPath([path, 'progress', 'loaded'], loaded))
})
const promise = limit(async () => {
if (rejected) {
remove(path)
return undefined
}
try {
const uploadP = upload.promise()
await file.hash.promise
return await uploadP
} catch (e) {

pendingUploads[path] = upload

try {
const uploadP = upload.promise()
await file.hash.promise
return await uploadP
} catch (e) {
if ((e as any).code !== 'RequestAbortedError') {
// eslint-disable-next-line no-console
console.log(`Error uploading file "${file.name}"`)
rejected = true
remove(path)
throw e
Object.values(pendingUploads).forEach((u) => u.abort())
}
}) as Promise<UploadResult>
return { path, file, upload, promise, progress: { total: file.size, loaded: 0 } }
remove(path)
throw e
} finally {
delete pendingUploads[path]
}
}

const uploadStates = files.map(({ path, file }) => {
// reuse state if file hasnt changed
const entry = uploads[path]
if (entry && entry.file === file) return { ...entry, path }

const promise = limit(uploadFile, path, file)
return { path, file, promise, progress: { total: file.size, loaded: 0 } }
})

FP.function.pipe(
Expand All @@ -125,22 +134,24 @@ export function useUploads() {
const uploaded = await Promise.all(uploadStates.map((x) => x.promise))

return FP.function.pipe(
FP.array.zipWith(files, uploaded, (f, r) => {
invariant(f.file.hash.value, 'File must have a hash')
return [
f.path,
{
physicalKey: s3paths.handleToS3Url({
bucket,
key: r.Key,
version: r.VersionId,
}),
size: f.file.size,
hash: f.file.hash.value,
meta: getMeta?.(f.path),
},
] as R.KeyValuePair<string, ExistingFile>
}),
FP.array.zipWith(
files,
uploaded,
(f, r) =>
[
f.path,
{
physicalKey: s3paths.handleToS3Url({
bucket,
key: r.Key,
version: (r as UploadResult).VersionId,
}),
size: f.file.size,
hash: f.file.hash.value,
meta: getMeta?.(f.path),
},
] as R.KeyValuePair<string, ExistingFile>,
),
R.fromPairs,
)
},
Expand Down
2 changes: 2 additions & 0 deletions catalog/app/utils/ResourceCache.js
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,8 @@ function* handleInit({ resource, input, resolver }) {
}

function* cleanup() {
// TODO: refactor cleanup logic, so that the cleanup action is only dispatched
// when there's anything to cleanup (to avoid re-renders every 5 sec)
while (true) {
yield effects.delay(RELEASE_TIME)
yield effects.put(Action.CleanUp({ time: new Date() }))
Expand Down

0 comments on commit c7a4447

Please sign in to comment.