Skip to content

Commit

Permalink
API Change: Param{In,Out}Port -> {In,Out}ParamPort
Browse files Browse the repository at this point in the history
  • Loading branch information
samuell committed Jun 13, 2018
1 parent 75512dd commit 0424377
Show file tree
Hide file tree
Showing 15 changed files with 159 additions and 147 deletions.
60 changes: 30 additions & 30 deletions baseprocess.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ type BaseProcess struct {
workflow *Workflow
inPorts map[string]*InPort
outPorts map[string]*OutPort
paramInPorts map[string]*ParamInPort
paramOutPorts map[string]*ParamOutPort
paramInPorts map[string]*InParamPort
paramOutPorts map[string]*OutParamPort
}

// NewBaseProcess returns a new BaseProcess, connected to the provided workflow,
Expand All @@ -19,8 +19,8 @@ func NewBaseProcess(wf *Workflow, name string) BaseProcess {
name: name,
inPorts: make(map[string]*InPort),
outPorts: make(map[string]*OutPort),
paramInPorts: make(map[string]*ParamInPort),
paramOutPorts: make(map[string]*ParamOutPort),
paramInPorts: make(map[string]*InParamPort),
paramOutPorts: make(map[string]*OutParamPort),
}
}

Expand Down Expand Up @@ -110,31 +110,31 @@ func (p *BaseProcess) DeleteOutPort(portName string) {
// Param-in-port stuff
// ------------------------------------------------

// InitParamInPort adds the parameter port paramPort with name portName
func (p *BaseProcess) InitParamInPort(proc WorkflowProcess, portName string) {
// InitInParamPort adds the parameter port paramPort with name portName
func (p *BaseProcess) InitInParamPort(proc WorkflowProcess, portName string) {
if _, ok := p.paramInPorts[portName]; ok {
Failf("Such a param-in-port ('%s') already exists for process '%s'. Please check your workflow code!\n", portName, p.name)
}
pip := NewParamInPort(portName)
pip := NewInParamPort(portName)
pip.process = proc
p.paramInPorts[portName] = pip
}

// ParamInPort returns the parameter port with name portName
func (p *BaseProcess) ParamInPort(portName string) *ParamInPort {
// InParamPort returns the parameter port with name portName
func (p *BaseProcess) InParamPort(portName string) *InParamPort {
if _, ok := p.paramInPorts[portName]; !ok {
Failf("No such param-in-port ('%s') for process '%s'. Please check your workflow code!\n", portName, p.name)
}
return p.paramInPorts[portName]
}

// ParamInPorts returns all parameter in-ports of the process
func (p *BaseProcess) ParamInPorts() map[string]*ParamInPort {
// InParamPorts returns all parameter in-ports of the process
func (p *BaseProcess) InParamPorts() map[string]*InParamPort {
return p.paramInPorts
}

// DeleteParamInPort deletes a ParamInPort object from the process
func (p *BaseProcess) DeleteParamInPort(portName string) {
// DeleteInParamPort deletes a InParamPort object from the process
func (p *BaseProcess) DeleteInParamPort(portName string) {
if _, ok := p.paramInPorts[portName]; !ok {
Failf("No such param-in-port ('%s') for process '%s'. Please check your workflow code!\n", portName, p.name)
}
Expand All @@ -145,34 +145,34 @@ func (p *BaseProcess) DeleteParamInPort(portName string) {
// Param-out-port stuff
// ------------------------------------------------

// InitParamOutPort initializes the parameter port paramPort with name portName
// InitOutParamPort initializes the parameter port paramPort with name portName
// to the process We need to supply the concrete process used here as well,
// since this method might be used as part of an embedded struct, meaning that
// the process in the receiver is just the *BaseProcess, which doesn't suffice.
func (p *BaseProcess) InitParamOutPort(proc WorkflowProcess, portName string) {
func (p *BaseProcess) InitOutParamPort(proc WorkflowProcess, portName string) {
if _, ok := p.paramOutPorts[portName]; ok {
Failf("Such a param-out-port ('%s') already exists for process '%s'. Please check your workflow code!\n", portName, p.name)
}
pop := NewParamOutPort(portName)
pop := NewOutParamPort(portName)
pop.process = proc
p.paramOutPorts[portName] = pop
}

// ParamOutPort returns the parameter port with name portName
func (p *BaseProcess) ParamOutPort(portName string) *ParamOutPort {
// OutParamPort returns the parameter port with name portName
func (p *BaseProcess) OutParamPort(portName string) *OutParamPort {
if _, ok := p.paramOutPorts[portName]; !ok {
Failf("No such param-out-port ('%s') for process '%s'. Please check your workflow code!\n", portName, p.name)
}
return p.paramOutPorts[portName]
}

// ParamOutPorts returns all parameter out-ports of the process
func (p *BaseProcess) ParamOutPorts() map[string]*ParamOutPort {
// OutParamPorts returns all parameter out-ports of the process
func (p *BaseProcess) OutParamPorts() map[string]*OutParamPort {
return p.paramOutPorts
}

// DeleteParamOutPort deletes a ParamOutPort object from the process
func (p *BaseProcess) DeleteParamOutPort(portName string) {
// DeleteOutParamPort deletes a OutParamPort object from the process
func (p *BaseProcess) DeleteOutParamPort(portName string) {
if _, ok := p.paramOutPorts[portName]; !ok {
Failf("No such param-out-port ('%s') for process '%s'. Please check your workflow code!\n", portName, p.name)
}
Expand Down Expand Up @@ -200,13 +200,13 @@ func (p *BaseProcess) Ready() (isReady bool) {
}
for portName, port := range p.paramInPorts {
if !port.Ready() {
Error.Printf("ParamInPort %s of process %s is not connected - check your workflow code!\n", portName, p.name)
Error.Printf("InParamPort %s of process %s is not connected - check your workflow code!\n", portName, p.name)
isReady = false
}
}
for portName, port := range p.paramOutPorts {
if !port.Ready() {
Error.Printf("ParamOutPort %s of process %s is not connected - check your workflow code!\n", portName, p.name)
Error.Printf("OutParamPort %s of process %s is not connected - check your workflow code!\n", portName, p.name)
isReady = false
}
}
Expand All @@ -220,17 +220,17 @@ func (p *BaseProcess) CloseOutPorts() {
}
}

// CloseParamOutPorts closes all parameter out-ports
func (p *BaseProcess) CloseParamOutPorts() {
for _, op := range p.ParamOutPorts() {
// CloseOutParamPorts closes all parameter out-ports
func (p *BaseProcess) CloseOutParamPorts() {
for _, op := range p.OutParamPorts() {
op.Close()
}
}

// CloseAllOutPorts closes all normal-, and parameter out ports
func (p *BaseProcess) CloseAllOutPorts() {
p.CloseOutPorts()
p.CloseParamOutPorts()
p.CloseOutParamPorts()
}

func (p *BaseProcess) receiveOnInPorts() (ips map[string]*FileIP, inPortsOpen bool) {
Expand All @@ -250,11 +250,11 @@ func (p *BaseProcess) receiveOnInPorts() (ips map[string]*FileIP, inPortsOpen bo
return
}

func (p *BaseProcess) receiveOnParamInPorts() (params map[string]string, paramPortsOpen bool) {
func (p *BaseProcess) receiveOnInParamPorts() (params map[string]string, paramPortsOpen bool) {
paramPortsOpen = true
params = make(map[string]string)
// Read input IPs on in-ports and set up path mappings
for pname, pport := range p.ParamInPorts() {
for pname, pport := range p.InParamPorts() {
pval, open := <-pport.Chan
if !open {
paramPortsOpen = false
Expand Down
4 changes: 2 additions & 2 deletions components/concatenator.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,15 +37,15 @@ func (p *Concatenator) Run() {
for ft := range p.In().Chan {

fr := NewFileReader(p.Workflow(), p.Name()+"_filereader_"+getRandString(7))
pop := scipipe.NewParamOutPort("temp_filepath_feeder")
pop := scipipe.NewOutParamPort("temp_filepath_feeder")
pop.SetProcess(p)
fr.InFilePath().From(pop)
go func() {
defer pop.Close()
pop.Send(ft.Path())
}()

pip := scipipe.NewParamInPort(p.Name() + "temp_line_reader")
pip := scipipe.NewInParamPort(p.Name() + "temp_line_reader")
pip.SetProcess(p)
pip.From(fr.OutLine())

Expand Down
8 changes: 4 additions & 4 deletions components/file_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,17 @@ func NewFileReader(wf *scipipe.Workflow, name string) *FileReader {
p := &FileReader{
BaseProcess: scipipe.NewBaseProcess(wf, name),
}
p.InitParamInPort(p, "filepath")
p.InitParamOutPort(p, "line")
p.InitInParamPort(p, "filepath")
p.InitOutParamPort(p, "line")
wf.AddProc(p)
return p
}

// InFilePath returns the parameter in-port on which a file name is read
func (p *FileReader) InFilePath() *scipipe.ParamInPort { return p.ParamInPort("filepath") }
func (p *FileReader) InFilePath() *scipipe.InParamPort { return p.InParamPort("filepath") }

// OutLine returns an parameter out-port with lines of the files being read
func (p *FileReader) OutLine() *scipipe.ParamOutPort { return p.ParamOutPort("line") }
func (p *FileReader) OutLine() *scipipe.OutParamPort { return p.OutParamPort("line") }

// Run the FileReader
func (p *FileReader) Run() {
Expand Down
4 changes: 2 additions & 2 deletions components/file_splitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func (p *FileSplitter) Run() {
defer p.CloseAllOutPorts()

fileReader := NewFileReader(p.Workflow(), p.Name()+"_filereader_"+getRandString(7))
pop := scipipe.NewParamOutPort(p.Name() + "_temp_filepath_feeder")
pop := scipipe.NewOutParamPort(p.Name() + "_temp_filepath_feeder")
pop.SetProcess(p)
fileReader.InFilePath().From(pop)

Expand All @@ -51,7 +51,7 @@ func (p *FileSplitter) Run() {
pop.Send(ft.Path())
}()

pip := scipipe.NewParamInPort(p.Name() + "temp_line_reader")
pip := scipipe.NewInParamPort(p.Name() + "temp_line_reader")
pip.SetProcess(p)
pip.From(fileReader.OutLine())

Expand Down
4 changes: 2 additions & 2 deletions components/param_source.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ func NewParamSource(wf *scipipe.Workflow, name string, params ...string) *ParamS
BaseProcess: scipipe.NewBaseProcess(wf, name),
params: params,
}
p.InitParamOutPort(p, "out")
p.InitOutParamPort(p, "out")
wf.AddProc(p)
return p
}

// Out returns the out-port, on which parameters the process was initialized
// with, will be retrieved.
func (p *ParamSource) Out() *scipipe.ParamOutPort { return p.ParamOutPort("out") }
func (p *ParamSource) Out() *scipipe.OutParamPort { return p.OutParamPort("out") }

// Run runs the process
func (p *ParamSource) Run() {
Expand Down
8 changes: 4 additions & 4 deletions docs/basic_concepts.md
Original file line number Diff line number Diff line change
Expand Up @@ -67,15 +67,15 @@ method (for in-ports. Out-ports have a corresponding `To()` method), which
takes another port, and connects to it, by stitching a channel between the
ports.

On `Process` objects, there is also a third port type, `ParamInPort` (and the
accompanying `ParamOutPort`), which is used when it is needed to send a
On `Process` objects, there is also a third port type, `InParamPort` (and the
accompanying `OutParamPort`), which is used when it is needed to send a
stream of parameter values (in string format) to be supplied to as arguments
to shell commands.

* See [GoDoc for the InPort struct type](https://godoc.org/github.com/scipipe/scipipe#InPort)
* See [GoDoc for the OutPort struct type](https://godoc.org/github.com/scipipe/scipipe#OutPort)
* See [GoDoc for the ParamInPort struct type](https://godoc.org/github.com/scipipe/scipipe#ParamInPort)
* See [GoDoc for the ParamOutPort struct type](https://godoc.org/github.com/scipipe/scipipe#ParamOutPort)
* See [GoDoc for the InParamPort struct type](https://godoc.org/github.com/scipipe/scipipe#InParamPort)
* See [GoDoc for the OutParamPort struct type](https://godoc.org/github.com/scipipe/scipipe#OutParamPort)

## Channels

Expand Down
18 changes: 9 additions & 9 deletions examples/param_channels/params.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,9 @@ func main() {
prt.Spawn = false

// Connection info
abc.ParamInPort("a").From(cmb.A())
abc.ParamInPort("b").From(cmb.B())
abc.ParamInPort("c").From(cmb.C())
abc.InParamPort("a").From(cmb.A())
abc.InParamPort("b").From(cmb.B())
abc.InParamPort("c").From(cmb.C())
prt.In("in").From(abc.Out("out"))

wf.Run()
Expand All @@ -46,16 +46,16 @@ func NewCombinatoricsGen(wf *sci.Workflow, name string) *CombinatoricsGen {
p := &CombinatoricsGen{
BaseProcess: sci.NewBaseProcess(wf, name),
}
p.InitParamOutPort(p, "a")
p.InitParamOutPort(p, "b")
p.InitParamOutPort(p, "c")
p.InitOutParamPort(p, "a")
p.InitOutParamPort(p, "b")
p.InitOutParamPort(p, "c")
wf.AddProc(p)
return p
}

func (p *CombinatoricsGen) A() *sci.ParamOutPort { return p.ParamOutPort("a") }
func (p *CombinatoricsGen) B() *sci.ParamOutPort { return p.ParamOutPort("b") }
func (p *CombinatoricsGen) C() *sci.ParamOutPort { return p.ParamOutPort("c") }
func (p *CombinatoricsGen) A() *sci.OutParamPort { return p.OutParamPort("a") }
func (p *CombinatoricsGen) B() *sci.OutParamPort { return p.OutParamPort("b") }
func (p *CombinatoricsGen) C() *sci.OutParamPort { return p.OutParamPort("c") }

func (p *CombinatoricsGen) Run() {
defer p.CloseAllOutPorts()
Expand Down
2 changes: 1 addition & 1 deletion examples/resequencing/resequencing.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func main() {
bwaMerge.SetPathCustom("merged", func(t *Task) string {
return t.Param("indv") + ".merged.sam"
})
bwaMerge.ParamInPort("indv").ConnectStr(indv)
bwaMerge.InParamPort("indv").ConnectStr(indv)
bwaMerge.In("ref").From(ungzipRef.Out("out"))
bwaMerge.In("refdone").From(indexRef.Out("done"))
bwaMerge.In("sai1").From(outPorts[indv]["1"]["sai"])
Expand Down
Loading

0 comments on commit 0424377

Please sign in to comment.