add cp throttling#312
Conversation
Signed-off-by: Ayush Kamat <ayush@latch.bio>
Signed-off-by: Ayush Kamat <ayush@latch.bio>
Signed-off-by: Ayush Kamat <ayush@latch.bio>
AidanAbd
left a comment
There was a problem hiding this comment.
Do you want to throttle on complete_upload as well? Treat start_upload and complete_upload as basically the same in terms of DB load (I think this is true) and then use the same queue.
| @dataclass | ||
| class Throttle: | ||
| delay: float = 0 | ||
|
|
||
| def get_delay(self): | ||
| return self.delay | ||
|
|
||
| def set_delay(self, d: float): | ||
| self.delay = d |
There was a problem hiding this comment.
Why is there a class for a constant?
There was a problem hiding this comment.
needed for multiprocessing manager so that more than one process can interact with it
also i believe theres a built in Value in multiprocessing.SyncManager but i think this is more readable
| UploadInfoBySrcType: TypeAlias = DictProxy[Path, "StartUploadReturnType"] | ||
|
|
||
| logging.basicConfig() | ||
| multiprocessing.get_logger().setLevel(logging.DEBUG) |
There was a problem hiding this comment.
ah need to del this
| if config.progress != Progress.none: | ||
| click.secho(f"Uploading {src_path.name}", fg="blue") |
There was a problem hiding this comment.
It is strange to me that the progress bar setting is used to indicate the verbosity of our logger. I would leave this as is, or add a separate verbosity flag.
There was a problem hiding this comment.
there is a separate verbosity flag, this is based on the assumption that if a person picks --progress=none then they probably don't want verbose output either (i.e. passing --progress=none --verbose doesn't really make sense)
im not strongly opinionated here though
| click.clear() | ||
| click.echo( | ||
| f"""{click.style("Upload Complete", fg="green")} | ||
| if config.progress != Progress.none: |
There was a problem hiding this comment.
Same comment (idk if this should dictate verbosity...)
| if throttle is not None: | ||
| time.sleep(throttle.get_delay()) |
There was a problem hiding this comment.
From what I can see here, a delay is set and all the start uploads wait. Then when the delay is unset, they all wake up and spam together. I think some jitter on the wakeup would help a lot, probably based on what we deem to be a reasonable amount of time for a start_upload request to complete (50ms)? Then sample from distribution around 50ms and use this to modify the delay?
There was a problem hiding this comment.
this isn't what happens empirically, easier to talk about irl
| time.sleep(0.1 * random.random()) | ||
|
|
| parts_by_src: "PartsBySrcType", | ||
| upload_info_by_src: "UploadInfoBySrcType", | ||
| ): | ||
| def throttler(t: Throttle, q: "LatencyQueueType"): |
There was a problem hiding this comment.
This design is weird because it has a separate process that calculates based on the last posted latency.
- We want to use some mean (probably arithmetic) of the last n posted latencies within some timeframe.
- We can probably implement a function called
get_delayor something that takes the latency queue and returns a number or None - If we have been blocked a lot recently (like more than 10 seconds), we should probably try the request and fail with a Load error if that does not work.
- Then, we do not need a separate spinning process. The only concurrent object we need is a queue that evicts its oldest object when it overflows some set size.
| if latency_q is not None: | ||
| latency_q.put(end - start) |
There was a problem hiding this comment.
Idk why this is optional but idrc
There was a problem hiding this comment.
in the case where u upload a single file, none of the concurrency stuff is passed to the function
Signed-off-by: Ayush Kamat <ayush@latch.bio>
Signed-off-by: Ayush Kamat ayush@latch.bio