-
Notifications
You must be signed in to change notification settings - Fork 1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: write to carpark #139
Conversation
License: MIT Signed-off-by: Oli Evans <oli@protocol.ai>
License: MIT Signed-off-by: Oli Evans <oli@protocol.ai>
View stack outputs
|
License: MIT Signed-off-by: Oli Evans <oli@protocol.ai>
# Indexer base url | ||
# 0 -> all request to Indexer | ||
# 100 -> All request to Pickup | ||
# values in between the balancer is applied (eg. 15 -> 15% of the request to Pickup, 85% to Indexer) | ||
BALANCER_RATE=10 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Removed as we no longer split requests between pickup and cluster, and the balancer code has gone.
@@ -50,5 +50,5 @@ export async function checkCar (car) { | |||
} | |||
})) | |||
const carCid = createCarCid(sha256) | |||
return { carCid, carSize, report } | |||
return { carCid: carCid.toString(), carSize, report } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we're working with cid strings everywhere else in this code, as we're handed it from and sqs message. We log the carCid to loki via pino, and it was serialising the cid object instead of calling toString on it, so we normalise it here.
async updatePinStatus ({ cid, status = 'pinned' }) { | ||
const res = await this.dynamo.send(new UpdateCommand({ | ||
TableName: this.table, | ||
Key: { cid }, | ||
ExpressionAttributeNames: { | ||
'#status': 'status' | ||
}, | ||
ExpressionAttributeValues: { | ||
':s': status | ||
}, | ||
UpdateExpression: 'set #status = :s', | ||
ReturnValues: 'ALL_NEW' | ||
})) | ||
return res.Attributes | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pickup can now update the pin status once it's done
const fetchTimer = debounce(() => abort(FETCH_TOO_SLOW), fetchTimeoutMs) | ||
const chunkTimer = debounce(() => abort(CHUNK_TOO_SLOW), fetchChunkTimeoutMs) | ||
const clearTimers = () => { | ||
fetchTimer.clear() | ||
chunkTimer.clear() | ||
} | ||
function abort (reason) { | ||
clearTimers() | ||
if (!abortCtl.signal.aborted) { | ||
abortCtl.abort(reason) | ||
} | ||
} | ||
|
||
// start the clock! | ||
fetchTimer() | ||
chunkTimer() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
start the timers as soon as fetch is called, not as some point later when the stream composer decides to call the streamWatcher fn.
|
||
await s3Upload.done() | ||
|
||
export async function verify ({ client, validationBucket, destinationBucket, key, cid }) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
move the uploading to the upload function and have verify just do verification. simples!
}) | ||
signal.addEventListener('abort', () => s3Upload.abort()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
important line! can't pass an abort signal to the s3Uploader oh no. but you can call abort in it so we deal with that here, to ensure the stream is destroyed when the abort signal is fired by our fetch an chunk timeouts.
@@ -78,7 +78,7 @@ export async function verifyMessage ({ msg, cars, t, bucket, s3 }) { | |||
try { | |||
const message = msg.body | |||
const index = Number(message.requestid) | |||
if (cars[index].expectedResult === 'success') { | |||
if (cars[index].expectedResult === 'pinned') { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
align test expectations with database status values
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
const updatePinQueue = new Queue(stack, 'UpdatePinQueue', { | ||
consumer: { | ||
function: { | ||
handler: 'basic/update-pin.sqsEventHandler', |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
update-pin.js
can be removed?
Co-authored-by: Alan Shaw <alan.shaw@protocol.ai>
License: MIT Signed-off-by: Oli Evans <oli@protocol.ai>
Write to the existing carpark bucket rather than creating a dedicated one for pickup.
Updater the pin status on successful write to s3 from the pickup worker. Previously we'd listen for object_created events and trigger a lambda to update the pin status, but we're moving to a shared bucket, so handle it in the worker.
Key changes
Part of #136