diff --git a/app/DAL/clients/rabbitmq/internal/receive.go b/app/DAL/clients/rabbitmq/internal/receive.go index e8b0fc8f9596..b0bbc591a060 100644 --- a/app/DAL/clients/rabbitmq/internal/receive.go +++ b/app/DAL/clients/rabbitmq/internal/receive.go @@ -1,6 +1,6 @@ //package rabbitmq -package main +package internal import ( "log" @@ -53,10 +53,10 @@ func main() { flu := models.FeedLineUnit{} json.Unmarshal(d.Body, &flu) log.Printf("Received a message: %s", flu.ID) - err := d.Ack(false) - if err != nil { - panic(err) - } + //err := d.Ack(false) + //if err != nil { + // panic(err) + //} } }() diff --git a/app/DAL/clients/rabbitmq/internal/send.go b/app/DAL/clients/rabbitmq/internal/send.go index 3650859d5f34..01c1e8b4dda8 100644 --- a/app/DAL/clients/rabbitmq/internal/send.go +++ b/app/DAL/clients/rabbitmq/internal/send.go @@ -1,4 +1,4 @@ -package rabbitmq +package internal // //package main diff --git a/app/DAL/clients/rabbitmq/rabbitmq.go b/app/DAL/clients/rabbitmq/rabbitmq.go index 890caa865092..788de4ee79bc 100644 --- a/app/DAL/clients/rabbitmq/rabbitmq.go +++ b/app/DAL/clients/rabbitmq/rabbitmq.go @@ -2,7 +2,26 @@ package rabbitmq import "github.com/streadway/amqp" -func noname() { +var rabbitmqConn *amqp.Connection - amqp.Queue{} +func init() { + rabbitmqConn = initRabbitMqClient() +} + +func initRabbitMqClient() *amqp.Connection { + + conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/") + if err != nil { + panic(err) + } + + return conn +} + +func GetNewChannel() *amqp.Channel { + ch, err := rabbitmqConn.Channel() + if err != nil { + panic(err) + } + return ch } diff --git a/app/DAL/feed_line/feed_line.go b/app/DAL/feed_line/feed_line.go new file mode 100644 index 000000000000..50caba5b0cdd --- /dev/null +++ b/app/DAL/feed_line/feed_line.go @@ -0,0 +1,122 @@ +package feed_line + +import ( + "encoding/json" + "errors" + "github.com/crowdflux/angel/app/DAL/clients/rabbitmq" + "github.com/crowdflux/angel/app/models" + "github.com/crowdflux/angel/app/plog" + "github.com/streadway/amqp" + "sync" +) + +// ShortHand for channel of FLUs i.e. FeedLine +type Fl struct { + amqpChan *amqp.Channel + queueName string + once sync.Once +} + +func New(name string) Fl { + + ch := rabbitmq.GetNewChannel() + + q, err := ch.QueueDeclare( + name, // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + plog.Error("Feedline", err, "error declaring queue, name: ", name) + panic(err) + } + + return Fl{ + amqpChan: ch, + queueName: q.Name, + } +} + +func (fl *Fl) Push(flu FLU) { + + // Send only the models.Feedline part of the flu in bytes + bty, _ := json.Marshal(flu.FeedLineUnit) + + // This is async + // TODO Think about a way to guarantee this operation also + err := fl.amqpChan.Publish( + "", // exchange + fl.queueName, // routing key + false, // mandatory + false, // immediate + amqp.Publishing{ + ContentType: "application/json", + Body: bty, + }) + if err != nil { + plog.Error("Feedline", err, "error publishing to channel", "flu_id: "+flu.ID.String()) + panic(err) + } + + // Just for safety: if someone forgets + // to ConfirmReceive the flu received from a queue + // then reconfirm it here as it will most + // probably be a bug + if flu.delivery.Acknowledger != nil { + flu.ConfirmReceive() + } +} + +func (fl *Fl) Receiver() <-chan FLU { + + println("Feedline, subscribe request: ", fl.queueName) + + var fluChan chan FLU + var flag bool = false + + fl.once.Do(func() { + + fluChan = make(chan FLU, 50000) + + deliveryChan, err := fl.amqpChan.Consume( + fl.queueName, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + plog.Error("Feedline", err, "error consuming queue, name:", fl.queueName) + panic(err) + } + + go func() { + + for msg := range deliveryChan { + + flu := models.FeedLineUnit{} + json.Unmarshal(msg.Body, &flu) + + fluChan <- FLU{ + FeedLineUnit: flu, + delivery: msg, + } + } + }() + + flag = true + }) + + if flag { + return (<-chan FLU)(fluChan) + } else { + panic(errors.New("Feedline already subscribed, name: " + fl.queueName)) + } + +} diff --git a/app/DAL/feed_line/feed_line_unit_test.go b/app/DAL/feed_line/feed_line_unit_test.go index 8b7f772c0950..9fa44b964fbb 100644 --- a/app/DAL/feed_line/feed_line_unit_test.go +++ b/app/DAL/feed_line/feed_line_unit_test.go @@ -20,7 +20,7 @@ func TestNew(t *testing.T) { }, }) - flu := <-fl.Out() + flu := <-fl.Receiver() flu.ConfirmReceive() diff --git a/app/api/builder.go b/app/api/builder.go index 2b9e09c52714..c1e012fef2b0 100644 --- a/app/api/builder.go +++ b/app/api/builder.go @@ -14,7 +14,7 @@ import ( "time" - "github.com/crowdflux/angel/app/services/work_flow_svc/step/crowdsourcing_step_svc/crowdsourcing_step_transport" + "github.com/crowdflux/angel/app/services/work_flow_svc/step/crowdsourcing_step_svc" "github.com/crowdflux/angel/app/services/work_flow_svc/step/manual_step_svc" "github.com/itsjamie/gin-cors" "github.com/newrelic/go-agent" @@ -69,7 +69,7 @@ func Build() { api.POST("/bulkdownloadimages", handlers.BulkDownloadImages) api.POST("/bulkdownloadimagesfromcsv", handlers.BulkDownloadedImagesFromCSV) - crowdsourcing_step_transport.AddHttpTransport(api) + crowdsourcing_step_svc.AddHttpTransport(api) manual_step_svc.AddHttpTransport(api) utils_api.AddHttpTransport(api) } diff --git a/app/models/step_type/step_type.go b/app/models/step_type/step_type.go index a9522ebc9c41..bafd3e6cc4af 100644 --- a/app/models/step_type/step_type.go +++ b/app/models/step_type/step_type.go @@ -40,3 +40,22 @@ func (s *StepType) Scan(src interface{}) error { *s = StepType(tmp) return nil } + +var stepTypeNames = map[StepType]string{ + CrowdSourcing: "CrowdSourcing", + InternalSourcing: "InternalSourcing", + Transformation: "Transformation", + Algorithm: "Algorithm", + Bifurcation: "Bifurcation", + Unification: "Unification", + Manual: "Manual", + Gateway: "Gateway", + Error: "Error", +} + +func (s *StepType) String() string { + if name, ok := stepTypeNames[*s]; ok { + return name + } + return "NoName" +} diff --git a/app/models/uuid/uuid.go b/app/models/uuid/uuid.go index 865023b7e66a..daf3a9c96b11 100644 --- a/app/models/uuid/uuid.go +++ b/app/models/uuid/uuid.go @@ -288,7 +288,7 @@ func (u *UUID) UnmarshalBinary(data []byte) (err error) { func (u UUID) Value() (driver.Value, error) { if u == Nil { defer func() { - plog.Info("Nil uuid Value() called") + //plog.Info("Nil uuid Value() called") }() return nil, nil } diff --git a/app/services/flu_logger_svc/logger.go b/app/services/flu_logger_svc/logger.go new file mode 100644 index 000000000000..cc08dd19bd98 --- /dev/null +++ b/app/services/flu_logger_svc/logger.go @@ -0,0 +1,43 @@ +package feed_line + +import ( + "database/sql" + "github.com/crowdflux/angel/app/models" + "github.com/crowdflux/angel/app/models/uuid" + "github.com/crowdflux/angel/app/services/work_flow_svc/feed_line" + "github.com/lib/pq" + "time" +) + +func LogStepEntry(flu feed_line.FLU, metaData ...models.JsonF) { + + log(flu, true, metaData) +} + +func LogStepExit(flu feed_line.FLU, metaData ...models.JsonF) { + + log(flu, false, metaData) +} + +func log(flu feed_line.FLU, stepEntry bool, metaData ...models.JsonF) { + + metaDataMerged := models.JsonF{} + + for _, m := range metaData { + metaDataMerged.Merge(m) + } + + metaDataMerged.Merge(models.JsonF{"build": flu.Build}) + + fluLog := models.FeedLineLog{ + FluId: flu.ID, + Message: sql.NullString{"asfas", true}, + MetaData: metaDataMerged, + StepType: sql.NullInt64{-1, false}, + StepEntry: sql.NullBool{stepEntry, true}, + StepExit: sql.NullBool{!stepEntry, true}, + StepId: flu.StepId, + WorkFlowId: uuid.Nil, + CreatedAt: pq.NullTime{time.Now(), true}, + } +} diff --git a/app/services/flu_logger_svc/logger_chan.go b/app/services/flu_logger_svc/logger_chan.go new file mode 100644 index 000000000000..3f645b821b2f --- /dev/null +++ b/app/services/flu_logger_svc/logger_chan.go @@ -0,0 +1,62 @@ +package feed_line + +import ( + "encoding/json" + "github.com/crowdflux/angel/app/DAL/clients/rabbitmq" + "github.com/crowdflux/angel/app/models" + "github.com/crowdflux/angel/app/plog" +) + +type feedLineLogChan struct { +} + +func newLoggerChan() feedLineLogChan { + + ch := rabbitmq.GetNewChannel() + + q, err := ch.QueueDeclare( + "FeedlineLogQueue", // name + true, // durable + false, // delete when unused + false, // exclusive + false, // no-wait + nil, // arguments + ) + + if err != nil { + plog.Error("Feedline Logger", err, "error declaring queue") + panic(err) + } + + outChan, err := ch.Consume( + q.Name, // queue + "", // consumer + false, // auto-ack + false, // exclusive + false, // no-local + false, // no-wait + nil, // args + ) + if err != nil { + plog.Error("Feedline", err, "error consuming queue") + } + + fl := Fl{make(chan FLU, 50000), outChan, ch, q.Name, false} + + go func() { + + for msg := range outChan { + + flu := models.FeedLineUnit{} + json.Unmarshal(msg.Body, &flu) + + fl.ch <- FLU{ + FeedLineUnit: flu, + delivery: msg, + } + } + + }() + + return fl +} diff --git a/app/services/flu_svc/export.go b/app/services/flu_svc/export.go index 27b82087f505..52820ae34537 100644 --- a/app/services/flu_svc/export.go +++ b/app/services/flu_svc/export.go @@ -7,7 +7,7 @@ import ( "github.com/crowdflux/angel/app/DAL/repositories/projects_repo" "github.com/crowdflux/angel/app/services/flu_svc/flu_validator" "github.com/crowdflux/angel/app/services/work_flow_svc" - "github.com/robfig/cron" + "time" ) func New() IFluService { @@ -37,16 +37,18 @@ func NewWithExposedValidators() IFluServiceExtended { func StartFeedLineSync() { - fSvc := New() - c := cron.New() + go func() { - syncFeedLine := func() { - err := fSvc.SyncInputFeedLine() - if err != nil { - fmt.Println(err) + fSvc := New() + + ticker := time.Tick(time.Duration(2) * time.Minute) + + for _ = range ticker { + err := fSvc.SyncInputFeedLine() + if err != nil { + fmt.Println(err) + } } - } + }() - c.AddFunc("0 */2 * * * *", syncFeedLine) - c.Start() } diff --git a/app/services/flu_svc/flu_output/flu_monitor.go b/app/services/flu_svc/flu_output/flu_monitor.go index a054c1053bbf..8707ccf762da 100644 --- a/app/services/flu_svc/flu_output/flu_monitor.go +++ b/app/services/flu_svc/flu_output/flu_monitor.go @@ -60,7 +60,7 @@ func (fm *FluMonitor) AddToOutputQueue(flu models.FeedLineUnit) error { func (fm *FluMonitor) AddManyToOutputQueue(fluBundle []models.FeedLineUnit) error { - plog.Info("FLu Monitor", fluBundle) + plog.Info("FLu Monitor, flubundle count:", len(fluBundle)) mutex.Lock() for _, flu := range fluBundle { diff --git a/app/services/flu_svc/flu_svc.go b/app/services/flu_svc/flu_svc.go index 5f1ce137b569..f13f32f88029 100644 --- a/app/services/flu_svc/flu_svc.go +++ b/app/services/flu_svc/flu_svc.go @@ -84,8 +84,8 @@ func (i *fluService) SyncInputFeedLine() error { } func (i *fluService) GetFeedLineUnit(fluId uuid.UUID) (models.FeedLineUnit, error) { - fin := feed_line_repo.NewInputQueue() - flu, err := fin.Get(fluId) + + flu, err := i.fluRepo.GetById(fluId) if err != nil && err == feed_line_repo.ErrFLUNotFoundInInputQueue { err = ErrFluNotFound } diff --git a/app/services/work_flow_svc/export.go b/app/services/work_flow_svc/export.go index 80af57e170c8..0d9c0ef66663 100644 --- a/app/services/work_flow_svc/export.go +++ b/app/services/work_flow_svc/export.go @@ -16,7 +16,7 @@ func newStd() IWorkFlowSvc { fOut := flu_output.New() completeHandler := func(flu models.FeedLineUnit) { - fmt.Println("on complete handler called", flu) + fmt.Println("on complete handler called", flu.ID) fOut.AddToOutputQueue(flu) } diff --git a/app/services/work_flow_svc/router_svc/starter.go b/app/services/work_flow_svc/router_svc/starter.go index ac87ad8e813e..f67570b62cba 100644 --- a/app/services/work_flow_svc/router_svc/starter.go +++ b/app/services/work_flow_svc/router_svc/starter.go @@ -13,7 +13,7 @@ func start(sr *stepRouter) { // in another goroutine & route it to its exact step go func() { - for flu := range sr.InQ.Out() { + for flu := range sr.InQ.Receiver() { plog.Info("Router in", flu.ID) // There is a question that adding to the // buffer should be inside or outside diff --git a/app/services/work_flow_svc/step/crowdsourcing_step_svc/crowdsourcing_step_test.go b/app/services/work_flow_svc/step/crowdsourcing_step_svc/crowdsourcing_step_test.go index ec0149d9a62c..d06ebc293a58 100644 --- a/app/services/work_flow_svc/step/crowdsourcing_step_svc/crowdsourcing_step_test.go +++ b/app/services/work_flow_svc/step/crowdsourcing_step_svc/crowdsourcing_step_test.go @@ -65,7 +65,7 @@ func Test(t *testing.T) { var fluNew feed_line.FLU select { - case fluNew = <-cs.OutQ.Out(): + case fluNew = <-cs.OutQ.Receiver(): assert.EqualValues(t, flu.ID, fluNew.ID) assert.EqualValues(t, flu.Build["new_prop"], 123) default: diff --git a/app/services/work_flow_svc/step/crowdsourcing_step_svc/endpoint.go b/app/services/work_flow_svc/step/crowdsourcing_step_svc/endpoint.go index 18c2323977dc..5a88db0750be 100644 --- a/app/services/work_flow_svc/step/crowdsourcing_step_svc/endpoint.go +++ b/app/services/work_flow_svc/step/crowdsourcing_step_svc/endpoint.go @@ -81,6 +81,8 @@ func FluUpdateHandlerCustom(updates []FluUpdate) error { return err } else { plog.Error("crowdy flu handler partially updated", err, "updated ids", updatedFlus) + // this wont return + // this will continue } } diff --git a/app/services/work_flow_svc/step/crowdsourcing_step_svc/crowdsourcing_step_transport/http_transport.go b/app/services/work_flow_svc/step/crowdsourcing_step_svc/transport.go similarity index 84% rename from app/services/work_flow_svc/step/crowdsourcing_step_svc/crowdsourcing_step_transport/http_transport.go rename to app/services/work_flow_svc/step/crowdsourcing_step_svc/transport.go index b14a4e650e5c..abe5b77e1471 100644 --- a/app/services/work_flow_svc/step/crowdsourcing_step_svc/crowdsourcing_step_transport/http_transport.go +++ b/app/services/work_flow_svc/step/crowdsourcing_step_svc/transport.go @@ -1,4 +1,4 @@ -package crowdsourcing_step_transport +package crowdsourcing_step_svc import "github.com/gin-gonic/gin" import ( @@ -6,7 +6,6 @@ import ( "github.com/crowdflux/angel/app/plog" "github.com/crowdflux/angel/app/services/plerrors" - "github.com/crowdflux/angel/app/services/work_flow_svc/step/crowdsourcing_step_svc" ) func AddHttpTransport(r *gin.RouterGroup) { @@ -15,7 +14,7 @@ func AddHttpTransport(r *gin.RouterGroup) { } type fluUpdateReq struct { - FluUpdates []crowdsourcing_step_svc.FluUpdate `json:"flu_updates"` + FluUpdates []FluUpdate `json:"flu_updates"` } func crowdSourcingPostHandler() gin.HandlerFunc { @@ -40,7 +39,7 @@ func crowdSourcingPostHandler() gin.HandlerFunc { return } - err := crowdsourcing_step_svc.FluUpdateHandlerCustom(fluUpdateReq.FluUpdates) + err := FluUpdateHandlerCustom(fluUpdateReq.FluUpdates) if err != nil { showErrorResponse(c, err) return diff --git a/app/services/work_flow_svc/step/manual_step_svc/manual_step.go b/app/services/work_flow_svc/step/manual_step_svc/manual_step.go index 26f491d27817..31e662039563 100644 --- a/app/services/work_flow_svc/step/manual_step_svc/manual_step.go +++ b/app/services/work_flow_svc/step/manual_step_svc/manual_step.go @@ -14,6 +14,7 @@ type manualStep struct { func (m *manualStep) processFlu(flu feed_line.FLU) { m.AddToBuffer(flu) + flu.ConfirmReceive() plog.Info("Manual Step flu reached", flu.ID) } diff --git a/app/services/work_flow_svc/step/manual_step_svc/transport.go b/app/services/work_flow_svc/step/manual_step_svc/transport.go new file mode 100644 index 000000000000..3d45c99c1947 --- /dev/null +++ b/app/services/work_flow_svc/step/manual_step_svc/transport.go @@ -0,0 +1,194 @@ +package manual_step_svc + +import ( + "bytes" + "errors" + "fmt" + "github.com/crowdflux/angel/app/models" + "github.com/crowdflux/angel/app/models/uuid" + "github.com/crowdflux/angel/app/plog" + "github.com/crowdflux/angel/utilities/constants" + "github.com/gin-gonic/gin" + "io" + "io/ioutil" + "mime/multipart" + "net/http" + "os" + "strconv" +) + +func AddHttpTransport(r *gin.RouterGroup) { + + r.POST(DOWNLOAD_ENDPOINT, fileDownloadHandler()) + r.POST(UPLOAD_ENDPOINT, fileUploadHandler()) +} + +func fileDownloadHandler() gin.HandlerFunc { + + return func(c *gin.Context) { + + manualStepId, err := uuid.FromString(c.PostForm(MANUAL_STEP_ID)) + plog.Info(c.PostForm(MANUAL_STEP_ID), manualStepId, err) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + SUCCESS: false, + ERROR: "please provide params", + }) + return + } + + fileUrl, err := DownloadCsv(manualStepId) + if err != nil { + c.JSON(http.StatusOK, gin.H{ + SUCCESS: false, + ERROR: err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + SUCCESS: true, + //FILEPATH: "/downloadedfiles" + string(os.PathSeparator) + manualStepId.String() + ".csv", + FILEPATH: fileUrl, + }) + plog.Info(c.Request.RequestURI, fileUrl) + } +} +func fileUploadHandler() gin.HandlerFunc { + + return func(c *gin.Context) { + + file, header, err := c.Request.FormFile(UPLOAD) + if err != nil { + plog.Error("Err", errors.New("problem in uploaded file"), err) + showError(c, err) + return + } + + filename := header.Filename + + out, err := os.Create(TEMP_FOLDER + filename) + if err != nil { + plog.Error("Err", errors.New("Cannot create file"), err) + showError(c, err) + return + } + defer out.Close() + _, err = io.Copy(out, file) + if err != nil { + plog.Error("Err", errors.New("Cannot copy file"), err) + showError(c, err) + return + } + + plog.Info("Sent file for upload: ", filename) + + err = UploadCsv(filename) + if err != nil { + c.JSON(http.StatusBadRequest, gin.H{ + SUCCESS: false, + ERROR: err.Error(), + }) + return + } + + c.JSON(http.StatusOK, gin.H{ + SUCCESS: true, + }) + } +} + +func showError(c *gin.Context, err error) { + c.JSON(http.StatusOK, gin.H{ + SUCCESS: false, + ERROR: err.Error(), + }) +} + +//TODO send it to Utilities +func FlattenCSV(file string, url string, manualStepId uuid.UUID) (fileUrl string, err error) { + + // Prepare a form that you will submit to that URL. + var b bytes.Buffer + w := multipart.NewWriter(&b) + // Add your image file + f, err := os.Open(file) + if err != nil { + plog.Error("Error", err) + return constants.Empty, err + } + defer f.Close() + fw, err := w.CreateFormFile(PARAM_FILES, file) + if err != nil { + plog.Error("Error", err) + return constants.Empty, err + } + if _, err = io.Copy(fw, f); err != nil { + return + } + // Add the other fields + //TODO check these, are they needed + if fw, err = w.CreateFormField("key"); err != nil { + plog.Error("Error", err) + return + } + // TODO this one too. + if _, err = fw.Write([]byte("KEY")); err != nil { + plog.Error("Error", err) + return + } + // Don't forget to close the multipart writer. + // If you don't close it, your request will be missing the terminating boundary. + w.Close() + + // Now that you have a form, you can submit it to your handler. + req, err := http.NewRequest("POST", url, &b) + if err != nil { + plog.Error("Error", err) + return constants.Empty, err + } + // Don't forget to set the content type, this will contain the boundary. + req.Header.Set(CONTENT_TYPE, w.FormDataContentType()) + req.Header.Set(PARAM_PLAYMENT_ID, manualStepId.String()) + + // Submit the request + client := &http.Client{} + res, err := client.Do(req) + if err != nil { + plog.Error("Error", err) + return constants.Empty, err + } + + // Check the response + if res.StatusCode != http.StatusOK { + err = fmt.Errorf("bad status: %s", res.Status) + return + } + fmt.Println(res.StatusCode) + + response, err := ioutil.ReadAll(res.Body) + if err != nil { + plog.Error("Error", err) + return constants.Empty, err + } + + fileMap := models.JsonF{} + fileMap.Scan(string(response)) + plog.Info("FileMAP: ", fileMap) + + errTag := fileMap[ERROR] + if errTag != nil { + err = errors.New(strconv.FormatBool(errTag.(bool))) + return + } + + urlTag := fileMap[URL] + if urlTag == nil { + err = errors.New("No file url from Transformation.") + return + } + fileUrl = urlTag.(string) + plog.Info("FileURL: ", fileUrl) + + return +} diff --git a/app/services/work_flow_svc/step/step.go b/app/services/work_flow_svc/step/step.go index e0b54f291992..9e120a6486ef 100644 --- a/app/services/work_flow_svc/step/step.go +++ b/app/services/work_flow_svc/step/step.go @@ -3,8 +3,10 @@ package step import ( "errors" + "github.com/crowdflux/angel/app/DAL/feed_line" + "github.com/crowdflux/angel/app/models/step_type" "github.com/crowdflux/angel/app/models/uuid" - "github.com/crowdflux/angel/app/services/work_flow_svc/feed_line" + "sync" ) type Step struct { @@ -12,12 +14,17 @@ type Step struct { OutQ feed_line.Fl buffer feed_line.Bf + once sync.Once + + processFlu processFlu } -func New() Step { +type processFlu func(feed_line.FLU) + +func New(st step_type.StepType) Step { return Step{ - InQ: feed_line.New(), - OutQ: feed_line.New(), + InQ: feed_line.New(st.String() + "-in"), + OutQ: feed_line.New(st.String() + "-out"), buffer: feed_line.NewBuffer(), } } @@ -38,5 +45,41 @@ func (s *Step) RemoveFromBuffer(flu feed_line.FLU) error { return errors.New("not present") } return nil +} + +func (s *Step) Connect(routerIn *feed_line.Fl) *feed_line.Fl { + + // Send output of this step to the router's input + // for next rerouting + s.OutQ = *routerIn + + s.start() + + // Return the input channel of this step + // so that router can push flu to it + return &s.InQ +} + +// TODO shit code +func (s *Step) SetFluProcessor(p processFlu) { + s.processFlu = p +} + +func (s *Step) start() { + + if s.processFlu == nil { + panic(errors.New("processFlu nil for the step")) + } + + s.once.Do(func() { + + go func() { + + for flu := range s.InQ.Receiver() { + + s.processFlu(flu) + } + }() + }) } diff --git a/app/services/work_flow_svc/work_flow/workflow.go b/app/services/work_flow_svc/work_flow/workflow.go index 07c5757b2c66..8bb3445a2bcc 100644 --- a/app/services/work_flow_svc/work_flow/workflow.go +++ b/app/services/work_flow_svc/work_flow/workflow.go @@ -1,9 +1,9 @@ package work_flow import ( + "github.com/crowdflux/angel/app/DAL/feed_line" "github.com/crowdflux/angel/app/services/work_flow_svc/counter" - "github.com/crowdflux/angel/app/services/work_flow_svc/feed_line" - "github.com/crowdflux/angel/app/services/work_flow_svc/step_router" + "github.com/crowdflux/angel/app/services/work_flow_svc/router_svc" ) type WorkFlow struct { @@ -17,22 +17,26 @@ func newStdWorkFlow() WorkFlow { //create new instance w := WorkFlow{ - InQ: feed_line.New(), - OutQ: feed_line.New(), + InQ: feed_line.New("workflow-in"), + OutQ: feed_line.New("workflow-out"), } // Start Workflow Channel IO in another goroutine go func() { + + inputQueue := w.InQ.Receiver() + outputQueue := router_svc.StdStepRouter.ProcessedFluQ.Receiver() + for { select { - case flu := <-w.InQ: + case flu := <-inputQueue: counter.Print(flu, "workflow in") - step_router.StdStepRouter.InQ <- flu + router_svc.StdStepRouter.InQ.Push(flu) - case flu := <-step_router.StdStepRouter.ProcessedFluQ: + case flu := <-outputQueue: counter.Print(flu, "workflow out") - w.OutQ <- flu + w.OutQ.Push(flu) } } }() @@ -45,21 +49,19 @@ var StdWorkFlow = newStdWorkFlow() //var StdWorkFlow = newShortCircuit() -func newShortCircuit() WorkFlow { +func NewShortCircuit() WorkFlow { //create new instance w := WorkFlow{ - InQ: feed_line.New(), - OutQ: feed_line.New(), + InQ: feed_line.New("workflow-in2123"), + OutQ: feed_line.New("workflow-out2123"), } // Start Workflow Channel IO in another goroutine // and send back the input as output (short circuit) go func() { - for { - select { - case flu := <-w.InQ: - w.OutQ <- flu - } + for flu := range w.InQ.Receiver() { + counter.Print(flu, "shortcircuit workflow out") + w.OutQ.Push(flu) } }() diff --git a/app/services/work_flow_svc/work_flow_svc.go b/app/services/work_flow_svc/work_flow_svc.go index e0a7aa5439fa..a4d1eda95e34 100644 --- a/app/services/work_flow_svc/work_flow_svc.go +++ b/app/services/work_flow_svc/work_flow_svc.go @@ -4,9 +4,9 @@ import ( "fmt" "sync" + "github.com/crowdflux/angel/app/DAL/feed_line" "github.com/crowdflux/angel/app/models" "github.com/crowdflux/angel/app/services/work_flow_svc/counter" - "github.com/crowdflux/angel/app/services/work_flow_svc/feed_line" "github.com/crowdflux/angel/app/services/work_flow_svc/work_flow" ) @@ -18,7 +18,7 @@ type workFlowSvc struct { func (w *workFlowSvc) AddFLU(flu models.FeedLineUnit) { counter.Print(feed_line.FLU{FeedLineUnit: flu}, "workflowsvc") - w.InQ <- feed_line.FLU{FeedLineUnit: flu} + w.InQ.Push(feed_line.FLU{FeedLineUnit: flu}) } func (w *workFlowSvc) Start() { @@ -44,22 +44,18 @@ func (w *workFlowSvc) OnComplete(f OnCompleteHandler) { func startWorkflowSvc(w *workFlowSvc) { go func() { - for { - select { - case flu := <-w.OutQ: - w.complete(flu.FeedLineUnit) - } + for flu := range w.OutQ.Receiver() { + w.complete(flu.FeedLineUnit) + flu.ConfirmReceive() } }() } func startWorkflowSvcNLog(w *workFlowSvc) { go func() { - for { - select { - case flu := <-w.OutQ: - fmt.Println(flu.ID) - } + for flu := range w.OutQ.Receiver() { + fmt.Println(flu.ID) + flu.ConfirmReceive() } }() }