-
Notifications
You must be signed in to change notification settings - Fork 566
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PPS] CreateDatum hierarchical input types #9928
Conversation
5647ad3
to
21b54d1
Compare
f800f2e
to
968c0ff
Compare
Codecov ReportAttention: Patch coverage is
Additional details and impacted files@@ Coverage Diff @@
## master #9928 +/- ##
==========================================
+ Coverage 58.00% 58.09% +0.08%
==========================================
Files 608 608
Lines 73924 74189 +265
==========================================
+ Hits 42883 43099 +216
- Misses 30485 30539 +54
+ Partials 556 551 -5 ☔ View full report in Codecov by Sentry. |
968c0ff
to
8ce580d
Compare
8ce580d
to
781a105
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approving, but we should look into why these formatter changes are happening spuriously.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is looking good. I think you want to figure out how to test each of the stream producers outside of pachd and write some unit tests that cover the actual calculations; are the joins correct, are the crosses correct, etc. That is what is currently missing.
// Consumes file set id shards from each child input as they arrive. Calls cb with the index of | ||
// the child input. Function returns when all children channels are closed or the context is done. | ||
func consumeChildrenFsidChans(ctx context.Context, childrenFsidChans []chan string, cb func(int, string) error) error { | ||
cases := make([]reflect.SelectCase, len(childrenFsidChans)+1) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think we should use reflection for this. Just use a single channel and if you need to retain the index information, send struct{ id int; fsid string }
instead of just the fsid.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason I use one channel per child input is so the child input can easily indicate to the parent it's done sending all its fsids by closing its channel. With the single channel approach, we can mimic that behavior by returning an empty string for the fsid (for example) to indicate a child is done sending its fsids. Does that sound okay?
src/internal/task/util.go
Outdated
@@ -61,17 +61,17 @@ func DoOne(ctx context.Context, doer Doer, input *anypb.Any) (*anypb.Any, error) | |||
|
|||
// DoBatch executes a batch of tasks. | |||
func DoBatch(ctx context.Context, doer Doer, inputs []*anypb.Any, cb CollectFunc) error { | |||
var eg errgroup.Group | |||
eg, egCtx := errgroup.WithContext(ctx) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a big problem to call this variable ctx
. It prevents someone from accidentally using the parent context when they intend to use this one.
src/server/pachyderm_test.go
Outdated
} | ||
listDatumTimeToFirstDatum = time.Since(start).Seconds() | ||
return nil | ||
testTimeToFirstDatum := func(input *pps.Input) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do these tests need to use Pachyderm running in k8s? RealEnv has been replaced with pachd.NewTestPachd and can probably run tasks.
if index == len(input) { | ||
temp := make([]string, len(result)) | ||
copy(temp, result) | ||
*output = append(*output, temp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do we need a copy of result here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We modify the underlying array in result
to generate permutations. Once we create a permutation, a copy needs to be created, because in subsequent iterations to create other permutations, the underlying array for result
is modified.
fsidChan := make(chan string) | ||
go streamingCreate(egCtx, c, taskDoer, input, fsidChan, errChan, requestDatumsChan) | ||
for fsid := range fsidChan { | ||
if err := renewer.Add(ctx, fsid); err != nil { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this should use the error group context. If the error group is done but the parent isn't, then this goroutine will live too long.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good catch
return errors.Wrap(err, "consumeChildrenFsidChans") | ||
} | ||
} else { | ||
finished[i] = true |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you need to remove the i-th selector from cases; nothing guarantees not hitting this 'matched because the channel is closed' case on the next Select call. It could happen forever.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is the concern that we'll never exit from the loop or that we'll unnecessarily do extra iterations by matching on the closed channel repeatedly?
src/server/worker/datum/util_test.go
Outdated
{"c", "2", "x"}, | ||
{"c", "2", "y"}, | ||
} | ||
for _, e := range expected { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should rework this test to use require.NoDiff
. I think that slicesEqualUnordered and sliceExistsInSlices only exist to get around having to sort expected and expected[], you should just sort those (with cmpopt.SortSlices for []string and [][]string). The output will be much easier to read and you don't need to maintain these helpers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I like that better. Will do.
src/server/pachyderm_test.go
Outdated
client.NewPFSInputOpts(repo, pfs.DefaultProjectName, repo, "master", "/file-?*(??)0", "$1", "", false, false, nil), | ||
client.NewPFSInputOpts(repo, pfs.DefaultProjectName, repo, "master", "/file-?0(??)0", "$1", "", false, false, nil), | ||
) | ||
testTimeToFirstDatum(input) | ||
}) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be a table-driven test:
testData := []struct{name string; input *pfs.Input}{
{"PFSInput", client.NewPFSInput(...)},
{"UnionInput", client.NewUnionInput(...)},
...
}
for _, test := range testData {
t.Run(test.name, func(t *testing.T) {
t.Parallel()
var eg errgroup.Group
var listDatumTimeToFirstDatum, createDatumTimeToFirstDatum float64
...
})
}
Also, I am not sure how reliable it is to collect one timing data point each time the test is run. Nothing guarantees that each goroutine is given the same scheduling latency or the same amount of time on the CPU, so even if CreateDatum is faster than ListDatum, you aren't guaranteed to measure that every time. It's just a recipe for a flaky test. This could be a benchmark (func BenchmarkCreateDatum(b *testing.B)
if you want to make sure that you collect enough timing samples to be confident in the result.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed it to a benchmark to avoid flakiness. Running the benchmark a few times confirmed the improved latency in getting the first datum.
require.NoError(t, datumClient.Send(&pps.CreateDatumRequest{Body: &pps.CreateDatumRequest_Start{Start: &pps.StartCreateDatumRequest{Input: input}}})) | ||
n, err := grpcutil.Read[*pps.DatumInfo](datumClient, make([]*pps.DatumInfo, 11)) | ||
require.True(t, stream.IsEOS(err)) | ||
require.Equal(t, 10, n) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Somewhere, there needs to be a test that actually checks that CreateDatum produces correct result. You run all the code in these tests, which is good, but you aren't checking that the results are correct. Anything that returns 10 copies of DatumInfo (perhaps nil 10 times) will pass the test, which is not good enough for someone to be able to work on this code confidently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tested correctness for the inputs manually. I'll add some of those to the test suite.
680a12f
to
5750f05
Compare
5750f05
to
7964d98
Compare
This PR adds support for Union, Cross, Join, and Group inputs to the
CreateDatum
RPC. It builds upon the changes made in #9712.One can visualize the Input as a tree. Inputs deeper in the spec are represented by deeper levels in the tree. When more datums need to be created, a datum request is propagated down the tree level-by-level via channels until it reaches the PFS inputs (leaves). The PFS nodes then send shards up one level for processing. The processed result shards are further propagated upwards, until eventually, they reach the root. The processed results at the root level are sent to the iterator, which iterates over each shard (file set) and sends the datums back to the client.
Logic by input:
shardPermute()
contain an exampleListDatum
implementationJira: INT-1204