Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

exp/support/pipeline: Generic Pipeline #1414

Merged
merged 5 commits into from
Jun 24, 2019
Merged

exp/support/pipeline: Generic Pipeline #1414

merged 5 commits into from
Jun 24, 2019

Conversation

bartekn
Copy link
Contributor

@bartekn bartekn commented Jun 14, 2019

PR Checklist

PR Structure

  • This PR has reasonably narrow scope (if not, break it down into smaller PRs).
  • This PR avoids mixing refactoring changes with feature changes (split into two PRs
    otherwise).
  • This PR's title starts with name of package that is most changed in the PR, ex.
    services/friendbot

Thoroughness

  • This PR adds tests for the most critical parts of the new functionality or fixes.
  • I've updated any docs (developer docs, .md
    files, etc... affected by this change). Take a look in the docs folder for a given service,
    like this one.

Release planning

  • I've updated the relevant CHANGELOG (here for Horizon) if
    needed with deprecations, added features, breaking changes, and DB schema changes.
  • I've decided if this PR requires a new major/minor version according to
    semver, or if it's mainly a patch change. The PR is targeted at the next
    release branch if it's not a patch change.

Summary

Creates a new exp/support/pipeline package that allows creating pipelines processing arbitrary objects to support two pipeline types in the ingestion system.

Close #1388.

Goal and scope

Ingestion system will operate on two pipelines in most cases: state updates pipeline (processing history archives data) and transactions pipeline (processing ledger transactions provided by ledger backend). To support this, the existing pipeline code had to be changed to be more generic to support different reader and writer types and to prevent code duplication.

Summary of changes

This commit extracts and updates the existing pipeline code to a separate exp/support/pipeline package and changes it to be generic. This not only allows us to create two pipeline types in exp/ingest but also enables developers to use it in their apps (or in other SDF apps). Additionally, a number of wrappers have been added exp/ingest/pipline to allow easier development (without type checking from interface{}).

Known limitations & issues

Docs of exp/support/pipeline should be improved with examples how to use it and how it works.

What shouldn't be reviewed

All should be reviewed.

Copy link
Member

@ire-and-curses ire-and-curses left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, minor comments. A number of public methods are missing docstrings, worth a quick pass over for those.


var _ supportPipeline.ReadCloser = &ledgerReadCloserWrapper{}

// readCloserWrapperState wraps pipelinne.ReadCloser to implement StateReadCloser interface.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo pipelinne


var _ io.StateReadCloser = &readCloserWrapperState{}

// readCloserWrapperLedger wraps pipelinne.ReadCloser to implement LedgerReadCloser interface.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo pipelinne


var _ io.LedgerReadCloser = &readCloserWrapperLedger{}

// writeCloserWrapperState wraps pipelinne.WriteCloser to implement StateWriteCloser interface.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo pipelinne


var _ io.StateWriteCloser = &writeCloserWrapperState{}

// writeCloserWrapperLedger wraps pipelinne.WriteCloser to implement LedgerWriteCloser interface.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

typo pipelinne

"io"
)

const bufferSize = 50000
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

worth a comment

b.readEntries++
b.readEntriesMutex.Unlock()
return entry, nil
} else {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

could drop the else here

// `ctx.Done()` channel and exit when it returns a value. This can happen when
// pipeline execution is interrupted, ex. due to an error.
//
// Given all information above `ProcessLedger` should always look like this:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This large standard pattern makes me wonder whether we could provide an interface for a custom processing object which would plug in in the right places in this pattern. Then we just execute the generic pattern with the custom user-supplied code.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call but I'm not sure how we could change the code to support it. This comment (and all the *Wrapper structs) are for ease of use for developers as Golang does not have a support for generics. We could use generic pipeline processor but I really want to prevent having developers to work with interface{} type.

Let me think about it and create another PR when we have a solution.

@tamirms
Copy link
Contributor

tamirms commented Jun 19, 2019

in tools/accounts-for-signers/main.go should the "github.com/stellar/go/exp/ingest/adapters" import be aliased to ingestadapters?

@bartekn
Copy link
Contributor Author

bartekn commented Jun 24, 2019

in tools/accounts-for-signers/main.go should the "github.com/stellar/go/exp/ingest/adapters" import be aliased to ingestadapters?

We can but it's ingest package demo so I don't think it's necessary. Also, possible that we'll remove adapters package (#1405).

@bartekn bartekn merged commit ff38810 into master Jun 24, 2019
@bartekn bartekn deleted the generic-pipeline branch June 24, 2019 10:37
oryband pushed a commit to kinecosystem/go that referenced this pull request Jul 22, 2019
This commit introduces a new `exp/support/pipeline` package that allows
creating pipelines processing arbitrary objects to support two pipeline
types in the ingestion system.

Ingestion system will operate on two pipelines in most cases: state
updates pipeline (processing history archives data) and transactions
pipeline (processing ledger transactions provided by ledger backend). To
support this, the existing pipeline code had to be changed to be more
generic to support different reader and writer types and to prevent code
duplication.

We extract and update the existing pipeline code to a separate
`exp/support/pipeline` package and change it to be generic. This not
only allows us to create two pipeline types in `exp/ingest` but also
enables developers to use it in their apps (or in other SDF apps).
Additionally, a number of wrappers have been added to
`exp/ingest/pipeline` to allow easier development (without type checking
from `interface{}`).
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
ingest New ingestion system
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Make pipeline code generic
3 participants