Skip to content
Permalink
Browse files

Remove the intermidiate storage flag

Signed-off-by: Swarvanu Sengupta <swarvanusg@gmail.com>
  • Loading branch information...
s8sg committed Jun 12, 2019
1 parent 9f3f90a commit ec016b0b7c43a7cdc26d519dc234f93ebd30b78d
@@ -369,20 +369,11 @@ Once a DataStore is set it can be used by calling `Get()` and `Set()` from `cont
```
* **[MinioDataStore](https://github.com/s8sg/faas-flow-minio-datastore)** allows to store data in **amazon s3** or local **minio DB**

> **Default `requestEmbedDataStore`:**
> By default faas-flow template use `requestEmbedDataStore` which embed the state data along with the request for the next node. For bigger values it is recommended to pass it with custom `DataStore`.
Once `DataStore` is overridden, all call to `Set()`, `Get()` and `del()` will call the provided `DataStore`
Once `DataStore` is overridden, all call to `Set()`, `Get()` and `del()` will call the provided `DataStore` otherwise they are maintained in request context.

### Use **DataStore** to store intermediate result
By default **`partially`** completed data gets forwarded along with the async request. When using external `DataStore` it can be saved and retrived from the `DataStore` if the flag `intermediate_storage` is set. Default is `false`
```yaml
intermediate_storage: true
```
Due to **nats** `1mb` storage limitation, async call may fail. In such scenario using `intermediate_storage` is recommended

### Manage State of Pipeline in a DAG with `StateStore`
In a `faas-flow` DAG execution faas-flow state is not only depends on the execution position, as the DAG execution happens on a shared state, a 3rd party **Synchoronous KV store** can be used as a `StateStore`
Any DAG which has a branch needs external statestore which can be a 3rd party **Synchoronous KV store**. It can be provided by Implementing `StateStore` interface
`StateStore` provides the below interface:
```go
type StateStore interface {
@@ -413,8 +404,8 @@ func DefineStateStore() (faasflow.StateStore, error) {
}
```

* **[ConsulStateStore](https://github.com/s8sg/faas-flow-consul-statestore)** manage state in **consul** for dag execution.
* **[EtcdStateStore](https://github.com/s8sg/faas-flow-etcd-statestore)** manage state in **etcd** for dag execution.
* **[ConsulStateStore](https://github.com/s8sg/faas-flow-consul-statestore)** statestore implementation with **consul**
* **[EtcdStateStore](https://github.com/s8sg/faas-flow-etcd-statestore)** statewtore implementation with **etcd**


### Geting Http Query to Workflow:
@@ -131,15 +131,6 @@ func getWorkflowName() string {
return os.Getenv("workflow_name")
}

// useIntermediateStorage check if IntermidiateStorage is enabled
func useIntermediateStorage() bool {
storage := os.Getenv("intermediate_storage")
if strings.ToUpper(storage) == "TRUE" {
return true
}
return false
}

// getPipeline returns the underline flow.pipeline object
func (fhandler *flowHandler) getPipeline() *sdk.Pipeline {
return fhandler.flow.GetPipeline()
@@ -608,8 +599,8 @@ func handleDynamicNode(fhandler *flowHandler, context *faasflow.Context, result
fhandler.id, subdag.Id, dynamicKey, err)
}

// If intermediate storage is enabled store data to intermediate storage
if useIntermediateStorage() {
// If forwarder is not nil its not an execution flow
if currentNode.GetForwarder("dynamic") != nil {
// <option>--<currentnodeid>--<subnodeid>
key := fmt.Sprintf("%s--%s--%s", dynamicKey, currentNodeUniqueId, subNode.GetUniqueId())
serr := context.Set(key, intermediateData)
@@ -712,14 +703,8 @@ func handleResponse(fhandler *flowHandler, context *faasflow.Context, result []b
// Get forwarder for child node
forwarder := currentNode.GetForwarder(node.Id)
if forwarder != nil {
// call default or user defined forwarder
intermediateData = forwarder(result)
} else {
// in case NoneForward forwarder in nil
intermediateData = []byte{}
}

// If intermediate storage is enabled store data to intermediate storage
if forwarder != nil && useIntermediateStorage() {

key := ""
dagNode := currentDag.GetParentNode()
@@ -747,6 +732,9 @@ func handleResponse(fhandler *flowHandler, context *faasflow.Context, result []b

// intermediateData is set to blank once its stored in storage
intermediateData = []byte("")
} else {
// in case NoneForward forwarder in nil
intermediateData = []byte("")
}

// if indegree is > 1 then use statestore to get request state
@@ -848,7 +836,9 @@ func handleFailure(fhandler *flowHandler, context *faasflow.Context, err error)
}

// getDagIntermediateData gets the intermediate data from earlier vertex
func getDagIntermediateData(handler *flowHandler, context *faasflow.Context, data []byte) ([]byte, error) {
func getDagIntermediateData(handler *flowHandler, context *faasflow.Context) ([]byte, error) {
var data []byte

pipeline := handler.getPipeline()
currentNode, dag := pipeline.GetCurrentNodeDag()
dataMap := make(map[string][]byte)
@@ -881,7 +871,7 @@ func getDagIntermediateData(handler *flowHandler, context *faasflow.Context, dat
idata, serr := aggregator(subDataMap)
if serr != nil {
serr := fmt.Errorf("failed to aggregate dynamic node data, error %v", serr)
return data, serr
return nil, serr
}
delete(pipeline.AllDynamicOption, node.GetUniqueId())

@@ -1001,20 +991,15 @@ func handleWorkflow(data []byte) string {

// Pipeline Execution Steps
{
// BUILD: build the flow based on execution request
// BUILD: build the flow based on the request data
fhandler, data := buildWorkflow(data)

// INIT STORE: Get definition of StateStore and DataStore
// INIT STORE: Get definition of StateStore and DataStore from user
stateSDefined, dataSOverride, err := initializeStore(fhandler)
if err != nil {
panic(fmt.Sprintf("[Request `%s`] Failed to init flow, %v", fhandler.id, err))
}

// Check if the pipeline is active
if fhandler.getPipeline().PipelineType == sdk.TYPE_DAG && fhandler.partial && !isActive(fhandler) {
panic(fmt.Sprintf("flow has been terminated"))
}

// MAKE CONTEXT: make the request context from flow
context := createContext(fhandler)

@@ -1025,12 +1010,18 @@ func handleWorkflow(data []byte) string {
}

// VALIDATE: Validate Pipeline Definition
// Dag need to be valid
err = fhandler.getPipeline().Dag.Validate()
if err != nil {
panic(fmt.Sprintf("[Request `%s`] Invalid dag, %v", fhandler.id, err))
}

// For dag one of the branch can cause the pipeline to terminate
// hence we Check if the pipeline is active
if fhandler.getPipeline().Dag.HasBranch() &&
fhandler.partial && !isActive(fhandler) {
panic(fmt.Sprintf("flow has been terminated"))
}

// For dag which has branches
// StateStore need to be external
if fhandler.getPipeline().Dag.HasBranch() {
@@ -1039,7 +1030,7 @@ func handleWorkflow(data []byte) string {
}
}

// Id dags has more than one nodes
// If dags has more than one nodes
// and nodes forwards data, data store need to be external
if fhandler.getPipeline().Dag.HasEdge() &&
!fhandler.getPipeline().Dag.IsExecutionFlow() {
@@ -1064,16 +1055,17 @@ func handleWorkflow(data []byte) string {
// If not a partial request set the execution position to initial node
if !fhandler.partial {
// On the 0th depth set the initial node as the current execution position
fhandler.getPipeline().UpdatePipelineExecutionPosition(sdk.DEPTH_SAME, fhandler.getPipeline().GetInitialNodeId())
fhandler.getPipeline().UpdatePipelineExecutionPosition(sdk.DEPTH_SAME,
fhandler.getPipeline().GetInitialNodeId())
}

// GETDATA: Get intermediate data from data store if not using default
// GETDATA: Get intermediate data from data store
// For partially completed requests if IntermidiateStorage is enabled
// get the data from dataStoretore
if fhandler.partial && useIntermediateStorage() {

data, gerr = getDagIntermediateData(fhandler, context, data)
if fhandler.partial &&
!fhandler.getPipeline().Dag.IsExecutionFlow() {

data, gerr = getDagIntermediateData(fhandler, context)
if gerr != nil {
gerr := fmt.Errorf("failed to retrive intermediate result, error %v", gerr)
handleFailure(fhandler, context, gerr)
@@ -1128,8 +1120,6 @@ func handleDotGraph() string { // Get flow name
panic(fmt.Sprintf("failed to generate dot graph, error %v", err))
}

// VALIDATE: Validate Pipeline Definition
// Dag need to be valid
err = fhandler.getPipeline().Dag.Validate()
if err != nil {
panic(fmt.Sprintf("[Request `%s`] Invalid dag, %v", fhandler.id, err))

Some generated files are not rendered by default. Learn more.

Some generated files are not rendered by default. Learn more.

@@ -16,7 +16,8 @@ type Options struct {
}

type BranchOptions struct {
aggregator sdk.Aggregator
aggregator sdk.Aggregator
noforwarder bool
}

type Workflow struct {
@@ -35,7 +36,8 @@ var (
Sync = SyncCall()
// Execution specify a edge doesn't forwards a data
// but rather mention a execution direction
Execution = InvokeEdge()
Execution = InvokeEdge()
ExecutionBranch = InvokeEdgeDynamic()
// Denote if last node doesn't contain any function call
emptyNode = false
// the reference of lastnode when applied as chain
@@ -56,16 +58,23 @@ func (o *Options) reset() {
// reset reset the BranchOptions
func (o *BranchOptions) reset() {
o.aggregator = nil
o.noforwarder = false
}

// ForEach denotes the vertex will be executed in parralel for each value returned.
// aggregator aggregates all outputs into one
// Aggregator aggregates all outputs into one
func Aggregator(aggregator sdk.Aggregator) BranchOption {
return func(o *BranchOptions) {
o.aggregator = aggregator
}
}

// InvokeEdgeDynamic() denotes a dynamic node doesn't forwards a data
func InvokeEdgeDynamic() BranchOption {
return func(o *BranchOptions) {
o.noforwarder = true
}
}

// InvokeEdge denotes a edge doesn't forwards a data,
// but rather provides only an execution flow
func InvokeEdge() Option {
@@ -82,11 +82,14 @@ func (this *DagFlow) AddForEachBranch(vertex string, foreach sdk.ForEach, option

for _, option := range options {
o := &BranchOptions{}
o.reset()
option(o)
if o.aggregator != nil {
node.AddSubAggregator(o.aggregator)
}
break
if o.noforwarder == true {
node.AddForwarder("dynamic", nil)
}
}

dag = CreateDag()
@@ -119,11 +122,14 @@ func (this *DagFlow) AddConditionalBranch(vertex string, conditions []string,

for _, option := range options {
o := &BranchOptions{}
o.reset()
option(o)
if o.aggregator != nil {
node.AddSubAggregator(o.aggregator)
}
break
if o.noforwarder == true {
node.AddForwarder("dynamic", nil)
}
}
conditiondags = make(map[string]*DagFlow)
for _, conditionKey := range conditions {

0 comments on commit ec016b0

Please sign in to comment.
You can’t perform that action at this time.