Skip to content

Commit

Permalink
final commit for persistent flu
Browse files Browse the repository at this point in the history
  • Loading branch information
himanshu144141 committed Oct 6, 2016
1 parent a220db5 commit 1d7d987
Show file tree
Hide file tree
Showing 6 changed files with 18 additions and 12 deletions.
2 changes: 1 addition & 1 deletion app/models/uuid/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (u *UUID) UnmarshalBinary(data []byte) (err error) {
func (u UUID) Value() (driver.Value, error) {
if u == Nil {
defer func() {
plog.Info("Nil uuid Value() called")
//plog.Info("Nil uuid Value() called")
}()
return nil, nil
}
Expand Down
22 changes: 12 additions & 10 deletions app/services/flu_svc/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/crowdflux/angel/app/DAL/repositories/projects_repo"
"github.com/crowdflux/angel/app/services/flu_svc/flu_validator"
"github.com/crowdflux/angel/app/services/work_flow_svc"
"github.com/robfig/cron"
"time"
)

func New() IFluService {
Expand Down Expand Up @@ -37,16 +37,18 @@ func NewWithExposedValidators() IFluServiceExtended {

func StartFeedLineSync() {

fSvc := New()
c := cron.New()
go func() {

syncFeedLine := func() {
err := fSvc.SyncInputFeedLine()
if err != nil {
fmt.Println(err)
fSvc := New()

ticker := time.Tick(time.Duration(2) * time.Minute)

for _ = range ticker {
err := fSvc.SyncInputFeedLine()
if err != nil {
fmt.Println(err)
}
}
}
}()

c.AddFunc("0 */2 * * * *", syncFeedLine)
c.Start()
}
2 changes: 1 addition & 1 deletion app/services/flu_svc/flu_output/flu_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (fm *FluMonitor) AddToOutputQueue(flu models.FeedLineUnit) error {

func (fm *FluMonitor) AddManyToOutputQueue(fluBundle []models.FeedLineUnit) error {

plog.Info("FLu Monitor", fluBundle)
plog.Info("FLu Monitor, flubundle count:", len(fluBundle))

mutex.Lock()
for _, flu := range fluBundle {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ type manualStep struct {

func (m *manualStep) processFlu(flu feed_line.FLU) {
m.AddToBuffer(flu)
flu.ConfirmReceive()
plog.Info("Manual Step flu reached", flu.ID)
}

Expand Down
1 change: 1 addition & 0 deletions app/services/work_flow_svc/work_flow/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ func NewShortCircuit() WorkFlow {
// and send back the input as output (short circuit)
go func() {
for flu := range w.InQ.Receiver() {
counter.Print(flu, "shortcircuit workflow out")
w.OutQ.Push(flu)
}
}()
Expand Down
2 changes: 2 additions & 0 deletions app/services/work_flow_svc/work_flow_svc.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ func startWorkflowSvc(w *workFlowSvc) {
go func() {
for flu := range w.OutQ.Receiver() {
w.complete(flu.FeedLineUnit)
flu.ConfirmReceive()
}
}()
}
Expand All @@ -54,6 +55,7 @@ func startWorkflowSvcNLog(w *workFlowSvc) {
go func() {
for flu := range w.OutQ.Receiver() {
fmt.Println(flu.ID)
flu.ConfirmReceive()
}
}()
}

0 comments on commit 1d7d987

Please sign in to comment.