New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add bulk first load feature to sink-postgres #27
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a first pass, tomorrow I'll revisit again concentrating more of FileStateStore and the BulkSinker
flags.Uint64("bundle-size", 1000, "Size of output bundle, in blocks") | ||
flags.String("start-block", "", "Start processing at this block instead of the substreams initial block") | ||
flags.String("working-dir", "./workdir", "Path to local folder used as working directory") | ||
flags.String("on-module-hash-mistmatch", "error", FlagDescription(` |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Will need to share this flag definition across commands, note to myself, don't need to do anything for this PR.
`), | ||
) | ||
}), | ||
OnCommandErrorLogAndExit(zlog), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Note to myself again, this should be needed only on the root command, investigate if there is a problem without it on each command.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Here another round, I'll have another round this afternoon. What I have left to review thoroughly is the interaction between FileStateStore, GenerateCSVSinker and the shutdown flow.
s.stats.LogNow() | ||
s.logger.Info("csv sinker terminating", zap.Stringer("last_block_written", s.stats.lastBlock)) | ||
s.stats.Close() | ||
s.stateStore.Shutdown(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is no need for stateStore
to know it's closing due to an error. I would remove Shutdown
from the Store
interface to keep only Close
and I would call Close()
here instead of Shutdown
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It was using Close(), but I started getting some errors related to input reader channel closed
because in file.go
when the uploadQueue
is terminating, it calls shutdown that call closes again.
Calling shutdown was easier to handle the termination process of everything.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok let's leave it like that and I can check later after we merge it.
sinker/generate_csv_sinker.go
Outdated
uploadContext := context.Background() | ||
for _, fb := range s.fileBundlers { | ||
fb.Launch(uploadContext) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see a good reason to have a context that is context.Background
, it will never be done nor change, so it's rather useless.
Can't we use the received ctx
? Was it causing problem to you somehow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It wasn't causing any problems. This is the way it's used on graph-load
, I don't know if they have any issues related to context.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
That will be my last review after we are good to merge. Can you just add a CHANGELOG entry that will talk about this.
Before a release I'll
- Extract bundler/specialized I/O writer in a re-usable place (probably
substreams-sink
for now) - Make some refactoring I noted as note to myself while reviewing the PR.
Would you like me to create a prerelease version before that?
s.stats.LogNow() | ||
s.logger.Info("csv sinker terminating", zap.Stringer("last_block_written", s.stats.lastBlock)) | ||
s.stats.Close() | ||
s.stateStore.Shutdown(err) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Ok let's leave it like that and I can check later after we merge it.
I just tested without the pool min and max values and it's not needed. I could run without those options. I removed them. |
@@ -117,6 +117,7 @@ func (l *Loader) InsertCursor(ctx context.Context, moduleHash string, c *sink.Cu | |||
|
|||
// UpdateCursor updates the active cursor. If no cursor is active and no update occurred, returns | |||
// ErrCursorNotFound. If the update was not successful on the database, returns an error. | |||
// You can use tx=nil to run the query outside of a transaction. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks :)!
Just 1 last thing remaining for me: |
I'm going to do tomorrow another release of |
@flametuner Can you update your branch on top of |
Co-authored-by: Matthieu Vachon <matthieu.o.vachon@gmail.com>
Merged thanks! |
No description provided.