-
Notifications
You must be signed in to change notification settings - Fork 282
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feat: pintracker revamp #383
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@lanzafame , this looks very nice and it's very clean! And yes I like the operationTracker
!
architecture.md
Outdated
@@ -80,7 +80,7 @@ This sections gives an overview of how some things work in Cluster. Doubts? Some | |||
|
|||
* Initialize the P2P host. | |||
* Initialize Consensus: start looking for a leader asap. | |||
* Setup RPC in all componenets: allow them to communicate with different parts of the cluster. | |||
* Setup RPC in all components: allow them to communicate with different parts of the cluster. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for all these
ipfsconn/ipfshttp/config_test.go
Outdated
"pin_method": "pin" | ||
"pin_method": "pin", | ||
"client_timeout": "10s", | ||
"client_post_timeout": "1m0s" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Please keep in mind to PR a documentation update (new website) when this is merged.
ipfsconn/ipfshttp/config.go
Outdated
ClientTimeout time.Duration | ||
|
||
// IPFS Daemon HTTP Client POST timeout | ||
ClientPostTimeout time.Duration |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hmm, does it make sense to have 2 when the IPFS HTTP Client always makes POST?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ah if it always makes POSTs then, no it doesn't make sense 👍 I will fix this up
ipfsconn/ipfshttp/config.go
Outdated
@@ -23,6 +23,8 @@ const ( | |||
DefaultProxyWriteTimeout = 10 * time.Minute | |||
DefaultProxyIdleTimeout = 60 * time.Second | |||
DefaultPinMethod = "pin" | |||
DefaultClientTimeout = 10 * time.Second | |||
DefaultClientPostTimeout = 60 * time.Second |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Rather than general timeouts, I would like to have a standard timeout (say 5m
) and then specific for pin and another for unpin operations. Where the pinTimeout is like 24h
by default, and then unpin timeout something like 3h
.
ipfsconn/ipfshttp/ipfshttp.go
Outdated
|
||
res, err := client.Post(url, "", nil) | ||
ctx, cancel := context.WithCancel(ipfs.ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
context.WithTimeout(ipfs.ctx, timeout)
?
Also, defer cancel()
so we cleanup as soon as we return.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, there was a reason but I have forgotten it and also I have learnt a lot more about the context
pkg since I wrote this, so my guess is that my reason isn't necessarily valid anymore anyway
operationRecover | ||
) | ||
|
||
//go:generate stringer -type=phase |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
uooo i didn't know this, maybe we can use it in more places!
return | ||
} | ||
opc.cancel() | ||
logger.Infof("operation '%s' on cid '%s' has been cancelled", opc.op.String(), c.String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to self: not sure if these deserve INFO importance.
operationUnknown operation = iota | ||
operationPin | ||
operationUnpin | ||
operationSync |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
do you plan to do something with sync
or recover
? otherwise comment or remove.
opc, ok := mpt.optracker.get(c.Cid) | ||
if !ok { | ||
logger.Debug("pin operation wasn't being tracked") | ||
ctx = mpt.ctx |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm not quite seeing something here.
If the pin was queued, but the operation is cancelled (because an unpin request is received), this will still run when the pin comes out of the queue. Since the previous operation was cancelled, it was deleted and will show the warning that it wasn't tracked. Right?
Right now it would seem that it only cancels ongoing pins (by killing the context), but not queued pins. If an unpin requests comes and the pin is queued, the optracker should mark it as cancelled (phaseCancelled?), and this function should probably exit in that case.
This may force you to carry several operationCtx per Cid (one directly associated to each Track/Untrack) and have any new operation for that Cid affect the others when it runs (an unpin would cancel all pins). A new pin would probably not even enter the queue if there is a pin operationCtx for that Cid already
Probably, the operationCtx can only be removed from the tracker here, once it's no longer queued.
Please correct me if I missed something and things work differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this, I had forgotten to perform the 'filtering' of pin/unpins in the pin/unpinWorkers. I have pushed that change up. Please let me know if that doesn't solve the issue you are raising?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think it does solve it. Looking forward to tests.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, I almost forget, I would like to have TrackerStatusPinQueue and TrackerStatusUnpinQueue types to reflect that something is pinning (in ipfs) or just waiting to be pinned (in our queue).
@hsanjuan hmmm this is for use in the |
@lanzafame anything that returns api.PinInfo stuff (which carries a status) |
There are some errors in the tests (jenkins: Other than that, last commit lgtm. |
95a67cd
to
ff513a5
Compare
ipfsconn/ipfshttp/config.go
Outdated
@@ -69,6 +81,9 @@ type jsonConfig struct { | |||
ProxyWriteTimeout string `json:"proxy_write_timeout"` | |||
ProxyIdleTimeout string `json:"proxy_idle_timeout"` | |||
PinMethod string `json:"pin_method"` | |||
ClientPostTimeout string `json:"client_post_timeout"` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ClientPost
seems a too internal name for a user. Better IPFSTimeout
or IPFSRequestTimeout
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
ipfsconn/ipfshttp/config_test.go
Outdated
"pin_method": "pin", | ||
"client_post_timeout": "5m0s", | ||
"pin_timeout": "24h", | ||
"unpin_timeout": "3h" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
indentation is funny here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
ipfsconn/ipfshttp/ipfshttp.go
Outdated
@@ -713,14 +725,25 @@ func (ipfs *Connector) PinLsCid(hash *cid.Cid) (api.IPFSPinStatus, error) { | |||
return api.IPFSPinStatusFromString(pinObj.Type), nil | |||
} | |||
|
|||
func doPost(client *http.Client, apiURL, path string) (*http.Response, error) { | |||
func (ipfs *Connector) doPost(client *http.Client, apiURL, path string) (*http.Response, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if do post is no longer used for anything, then just rename doPostCtx
to doPost
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
ipfsconn/ipfshttp/ipfshttp.go
Outdated
return ipfs.postDiscardBodyCtx(ipfs.ctx, path) | ||
} | ||
|
||
func (ipfs *Connector) postDiscardBodyCtx(ctx context.Context, path string) error { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as with doPost
. Less trivial unused code marginally better than more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
✅
if !ok { | ||
t.Fatal() | ||
} | ||
if opc.phase == phaseInProgress { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Likely to fail if the item is pinned before you can check if it's in progress right? You'll need a RPC mock which, for example, locks the pinning operation for a few seconds to simulate a long one.
t.Fatal() | ||
} | ||
if opc.phase == phaseInProgress { | ||
err = mpt.Untrack(h1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once you untrack, it would be good to find a way to check that the operation was cancelled. I'm not sure about the best approach though. Probably the mock RPC can report whether the context in the operation was cancelled.
} | ||
} | ||
}() | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You need to wait for above goroutine to return before finishing the test.
go func() { | ||
for { | ||
opc, _ := mpt.optracker.get(c.Cid) | ||
if opc.phase == phaseQueued { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
same comment as above, you might just miss phaseQueued
Update config with new keys and sensible values here: https://github.com/ipfs/ipfs-cluster/blob/master/config_test.go Pin tracker pin_timeout and unpin_timeout are now redundant because the ipfsconnector includes timeouts. |
@hsanjuan the pin_timeout and unpin_timeout are still you used by the |
4f31d6d
to
1a11c71
Compare
@lanzafame I think |
1a11c71
to
be642f5
Compare
Btw. You can adjust a manual delay in TestTrackUntrackWithCancel hangs undefinitely for me some times (i guess because of the problem I pointed above). |
Also, one way to make sure we are not messing something with the locks is to switch to sync.Map and see if the errors still happen. |
f3b9b8a
to
348bd9f
Compare
@hsanjuan If you are happy with the code itself, then I am happy to hand it over to you to see if you can get those windows tests passing. |
1ca55bf
to
63835fb
Compare
9f4e392
to
b564fe6
Compare
License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
ipfshttp: cancel POST request when timeout reached ipfshttp/config: fix config test ipfshttp: use struct styling for multi-line func calls ipfshttp/config: add general ClientTimeout License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
The TrackerStatuses were starting to be used to convey the inflight status of an 'operation', instead of just the status of the Pin. I have separated out any thing related to 'operations' and an operation's 'phases'. License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
to the ipfscluster.IPFSConnector interface and then to the implementation of that interface in ipfsconn/ipfshttp. This allows calls from MapPinTracker to cancel requests made to the local IPFS node. License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
come off the pin/unpin channels. Also fix a race condition in the operationTracker. License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
and get the tests passing and add Pin/UnpinQueued tracker statuses back in. License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
This creates a race condition where the items may have been already pinned before the operation is registered in the tracker. This may result in operations being left in the tracker and potentially never completed. License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
Pinning/unpinning timeouts are controlled by the ipfs connector component. License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
b564fe6
to
877e65a
Compare
Avoid writing tests which will hang indefinitely on failure conditions. Introduce TODOs. Rename some vars to more explicit names. License: MIT Signed-off-by: Hector Sanjuan <code@hector.link>
7506973
to
859cf75
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have fixed a couple of bugs:
- race condition which caused that adding the operation to the tracker came after pinning, so pinning had no effect and things ended marked as queued.
- cancelling the operation contexts in every case before deleting them. Otherwise this would be leaking memory.
I still get occasional failures but the worst ones are fixed.
I have added some TODOs to the tests. We need to check that cancellations of ongoing operations actually happen in the right way. This can be achieved by letting the mock RPCServer keep a list with the times a method was called, and whether the passed context was cancelled before returning from it, and then checking that from the tests.
Other than that I haven't been able to find any more problems with the code..
License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
License: MIT Signed-off-by: Adrian Lanzafame <adrianlanzafame92@gmail.com>
Yai! @lanzafame let me know if you want to do anything here, otherwise I'll merge |
@hsanjuan Nothing else. All good to go 🎉 |
Still, need to add tests, but this PR desperately needs to see the light of day. @hsanjuan The main component, I would love feedback on is the
operationTracker
? Cheers#308 (edit, added correct issue)