Skip to content
This repository has been archived by the owner on Oct 17, 2018. It is now read-only.

Commit

Permalink
Add transformation APIs and make pipeline operations private (#141)
Browse files Browse the repository at this point in the history
  • Loading branch information
xichen2020 committed Jun 7, 2018
1 parent a6f9255 commit e324aa1
Show file tree
Hide file tree
Showing 12 changed files with 814 additions and 240 deletions.
36 changes: 15 additions & 21 deletions metadata/metadata_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,14 +117,12 @@ func TestStagedMetadatasIsDefault(t *testing.T) {
Metadata: Metadata{
Pipelines: []PipelineMetadata{
{
Pipeline: applied.Pipeline{
Operations: []applied.Union{
{
Type: op.TransformationType,
Transformation: op.Transformation{Type: transformation.Absolute},
},
Pipeline: applied.NewPipeline([]applied.Union{
{
Type: op.TransformationType,
Transformation: op.Transformation{Type: transformation.Absolute},
},
},
}),
},
},
},
Expand All @@ -138,14 +136,12 @@ func TestStagedMetadatasIsDefault(t *testing.T) {
Metadata: Metadata{
Pipelines: []PipelineMetadata{
{
Pipeline: applied.Pipeline{
Operations: []applied.Union{
{
Type: op.RollupType,
Rollup: applied.Rollup{ID: []byte("foo")},
},
Pipeline: applied.NewPipeline([]applied.Union{
{
Type: op.RollupType,
Rollup: applied.Rollup{ID: []byte("foo")},
},
},
}),
},
},
},
Expand All @@ -159,14 +155,12 @@ func TestStagedMetadatasIsDefault(t *testing.T) {
Metadata: Metadata{
Pipelines: []PipelineMetadata{
{
Pipeline: applied.Pipeline{
Operations: []applied.Union{
{
Type: op.RollupType,
Rollup: applied.Rollup{AggregationID: aggregation.MustCompressTypes(aggregation.Sum)},
},
Pipeline: applied.NewPipeline([]applied.Union{
{
Type: op.RollupType,
Rollup: applied.Rollup{AggregationID: aggregation.MustCompressTypes(aggregation.Sum)},
},
},
}),
},
},
},
Expand Down
39 changes: 27 additions & 12 deletions op/applied/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,21 +108,30 @@ func (u Union) String() string {
// Pipeline is a pipeline of operations.
type Pipeline struct {
// a list of pipeline operations.
Operations []Union
operations []Union
}

// IsEmpty determines whether a pipeline is empty.
func (p Pipeline) IsEmpty() bool {
return len(p.Operations) == 0
// NewPipeline creates a new pipeline.
func NewPipeline(ops []Union) Pipeline {
return Pipeline{operations: ops}
}

// NumSteps returns the number of steps in a pipeline.
func (p Pipeline) NumSteps() int { return len(p.operations) }

// IsEmpty determines whether a pipeline is empty.
func (p Pipeline) IsEmpty() bool { return p.NumSteps() == 0 }

// At returns the operation at a given step.
func (p Pipeline) At(i int) Union { return p.operations[i] }

// Equal determines whether two pipelines are equal.
func (p Pipeline) Equal(other Pipeline) bool {
if len(p.Operations) != len(other.Operations) {
if len(p.operations) != len(other.operations) {
return false
}
for i := 0; i < len(p.Operations); i++ {
if !p.Operations[i].Equal(other.Operations[i]) {
for i := 0; i < len(p.operations); i++ {
if !p.operations[i].Equal(other.operations[i]) {
return false
}
}
Expand All @@ -131,19 +140,25 @@ func (p Pipeline) Equal(other Pipeline) bool {

// Clone clones the pipeline.
func (p Pipeline) Clone() Pipeline {
clone := make([]Union, len(p.Operations))
for i, op := range p.Operations {
clone := make([]Union, len(p.operations))
for i, op := range p.operations {
clone[i] = op.Clone()
}
return Pipeline{Operations: clone}
return Pipeline{operations: clone}
}

// SubPipeline returns a sub-pipeline starting from step `idx`
// of the current pipeline.
func (p Pipeline) SubPipeline(idx int) Pipeline {
return Pipeline{operations: p.operations[idx:]}
}

func (p Pipeline) String() string {
var b bytes.Buffer
b.WriteString("{operations: [")
for i, op := range p.Operations {
for i, op := range p.operations {
b.WriteString(op.String())
if i < len(p.Operations)-1 {
if i < len(p.operations)-1 {
b.WriteString(", ")
}
}
Expand Down
Loading

0 comments on commit e324aa1

Please sign in to comment.