Skip to content
Vladimir Sibirov edited this page Oct 16, 2020 · 7 revisions

Components are basic building blocks in flow-based programming. A component is a single entity with a single purpose that processes incoming data and passes the results down the stream.

In GoFlow a component is a struct that contains at least one input port (bidirectional or receive-only channel):

type SimpleComponent struct {
	In   <-chan PacketType
}

Can a component have no input ports? In theory yes, but it's not very practical, because in this case a network has no way to signal the process that it has to stop.

Ports and Events

Inports and outports

Ports are windows to the outer world for a component. In GoFlow component ports are channel fields in component's struct. There are 2 types of ports: input ports (inports) and output ports (outports). The following example demonstrates how they can be declared:

type PortExample struct {
	Foo <-chan int  // receive-only inport
	Bar chan<- int  // send-only outport
	Boo chan string // can be inport or outport
}

Ports must be exported fields (their names must start with capital latter), otherwise GoFlow won't be able to detect them.

There are no specific limitations on data types of port channels. This means that you can also pass channels through the ports. If you need to pass more than one data elements to a component at the same time, it is recommended to pass tuples and structures instead of trying to pass them through multiple ports. Multiple inports should be used for information packets which arrive from different sources or at different time. The number of ports per component is not limited, but it is recommended to keep them a few, otherwise component's cohesion and logical consistency suffers.

Events

Behaviour of a component is driven by events. In GoFlow world an event is when something happens to one of the ports. Currently the following events are supported:

  • Incoming packet received: there is a new information packet (data element) received on an input port.
  • Connection closed: the channel associated with inport has been closed by sender or network.

Early implementations of GoFlow also supported another event on outports - outgoing packet sent - and it sent outgoing packets in non-blocking mode, but it was stripped out because it isn't really necessary and it increases program complexity.

Events are independent, except for the fact that no packets can be received on an already closed port. Events on the same port are independent, as well as the events on different ports.

Array ports

Array ports can attach multiple connection, and make each connection accessible by the component independently via its numeric index. Typical use case for array ports is routing.

In GoFlow array ports are defined as arrays of chan type. Here is a very simple example of a Router component which takes a string from index k of its inport and sends it to index Len(inputs) - k:

type Router struct {
	In  [](<-chan string) // Array input port, accessible via index
	Out [](chan<- string) // Array output port, accessible via index
}

func (c *Router) Process() {
	// Waitgroup is needed to shutdown gracefully when all inputs are closed
	wg := new(sync.WaitGroup)
	l := len(c.In)

	for k, ch := range c.In {
		wg.Add(1)

		go func(k int, ch <-chan string) {
			// Each goroutine listens on one channel in the array
			for s := range ch {

				c.Out[l-k] <- s
			}

			close(c.Out[l-k])
			wg.Done()
		}(k, ch)
	}

	wg.Wait()
}

Of course, array ports can be used just for input, or just for output, not necessarily together.

On a network level, array ports are accessible via PortName[index] syntax:

err := n.Connect("sender", "Out", "router", "In[3]")

Map ports

Similar to array ports, map ports allow for arbitrary number of inputs/outputs and addressing an individual connection. Unlike array ports, map ports use string indexes.

Here is an example router component definition using map ports:

type Router struct {
	In  map[string]<-chan Message
	Out map[string]chan<- Message
}

Process function of such a component can access individual input or output connections using a string index. On a network level, the syntax is similar to array ports, but values can be non-numeric. Example:

err := n.Connect("router", "Out[CustomerController]", "customerController", "Req")

Process

Components are reusable classes, they don't exist at run time. The actual job is done by their instances called Processes. There can be many processes of the same component like there can be many objects of the same class.

Process also is how the component behaves.

Process function

Behaviour of a Component is defined by its Process function. Consider an example:

type Echo struct {
	In  <-chan int
	Out chan<- int
}

func (c *Echo) Process() {
	for i := range c.In {
		c.Out <- i
	}
}

In this example we have a simple Echo component. Its Process function reads incoming packets and sends them to the output port.

Process functions don't return any values. If they need to yield some results they should send them through outports instead.

The order of data processing depends on the logic that is implemented in the process function. By default it's a synchronous reader/writer, but you can use goroutines to implement parallel processing of packets within the same process.

The fact that events on different ports of the same component are independent means that if you need 2 or more inports to be synchronized, you mat need to provide synchronization yourself (for example, using queues).

Processing multiple inputs

The example below is a repeater component that reads two inputs: one tells which word to repeat, another one is the number of times the word needs to be repeated:

type Repeater struct {
	Word  <-chan string
	Times <-chan int

	Words chan<- string
}

func (c *Repeater) Process() {
	// Input guard is used to check if inputs on all required ports have arrived
	guard := goflow.NewInputGuard("word", "times")

	times := 0
	word := ""

	// The main select-loop
	for {
		// select listens on all available inports
		select {
		case t, ok := <-c.Times:
			if ok {
				// Received data
				times = t
				// Calling the function that contains the actual logic
				c.repeat(word, times)
			} else if guard.Complete("times") {
				// ok is false, meaning that the port is closed
				// guard.Complete() just flags this port as completed
				return
			}
		case w, ok := <-c.Word:
			if ok {
				word = w
				c.repeat(word, times)
			} else if guard.Complete("word") {
				return
			}
		}
	}
}

// repeat contains the actual logic that uses provided inputs
func (c *Repeater) repeat(word string, times int) {
	if word == "" || times <= 0 {
		return
	}

	for i := 0; i < times; i++ {
		c.Words <- word
	}
}

Constructor

Processes usually are created using Go's new() function:

proc := new(ComponentType)

If a component has some fields that needs more complex construction, e.g. pointers which must be allocated, you should provide a constructor function instead:

func NewCustomType() *CustomType {
	p := new(CustomType)
	p.ptr = new(SomePointer)
	p.flag = 123
	return p
}

Notice that processes are always referenced and passed by pointers. This is a requirement.

Initialization and cleanup

In v0 branch of GoFlow there were special callbacks for lifecycle management. Starting of v1 all initialization and cleanup can be added inside of the Process function itself, just before or after the main loop that handles input/output.

State

It is assumed that component fields other than ports describe its state. They can be of any type, either exported or private, GoFlow doesn't care about it. The following example demonstrates a component with several state fields:

type StateExample struct {
	// ports
	In1 <-chan Envelope // inport
	In2 <-chan Stamp    // inport
	Out chan<- Letter   // outport

	// state
	counter int       // private field
	buffer  [32]Stamp // private field
	Status  string    // public field
}

Thread-safety

If a process in the above example accesses the state fields concurrently (e.g. by spawning some goroutines), you need to protect that state data from race conditions.

A typical way to do that is by using a *sync.Mutex from the sync package of the standard library.