Skip to content

Commit

Permalink
Merge pull request cockroachdb#27 from crowdflux/himanshu-github
Browse files Browse the repository at this point in the history
persistent channel
  • Loading branch information
himanshu144141 committed Oct 6, 2016
2 parents c7cc109 + 1d7d987 commit b4a7c25
Show file tree
Hide file tree
Showing 23 changed files with 568 additions and 64 deletions.
10 changes: 5 additions & 5 deletions app/DAL/clients/rabbitmq/internal/receive.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
//package rabbitmq

package main
package internal

import (
"log"
Expand Down Expand Up @@ -53,10 +53,10 @@ func main() {
flu := models.FeedLineUnit{}
json.Unmarshal(d.Body, &flu)
log.Printf("Received a message: %s", flu.ID)
err := d.Ack(false)
if err != nil {
panic(err)
}
//err := d.Ack(false)
//if err != nil {
// panic(err)
//}
}
}()

Expand Down
2 changes: 1 addition & 1 deletion app/DAL/clients/rabbitmq/internal/send.go
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package rabbitmq
package internal

//
//package main
Expand Down
23 changes: 21 additions & 2 deletions app/DAL/clients/rabbitmq/rabbitmq.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,26 @@ package rabbitmq

import "github.com/streadway/amqp"

func noname() {
var rabbitmqConn *amqp.Connection

amqp.Queue{}
func init() {
rabbitmqConn = initRabbitMqClient()
}

func initRabbitMqClient() *amqp.Connection {

conn, err := amqp.Dial("amqp://guest:guest@localhost:5672/")
if err != nil {
panic(err)
}

return conn
}

func GetNewChannel() *amqp.Channel {
ch, err := rabbitmqConn.Channel()
if err != nil {
panic(err)
}
return ch
}
122 changes: 122 additions & 0 deletions app/DAL/feed_line/feed_line.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
package feed_line

import (
"encoding/json"
"errors"
"github.com/crowdflux/angel/app/DAL/clients/rabbitmq"
"github.com/crowdflux/angel/app/models"
"github.com/crowdflux/angel/app/plog"
"github.com/streadway/amqp"
"sync"
)

// ShortHand for channel of FLUs i.e. FeedLine
type Fl struct {
amqpChan *amqp.Channel
queueName string
once sync.Once
}

func New(name string) Fl {

ch := rabbitmq.GetNewChannel()

q, err := ch.QueueDeclare(
name, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)

if err != nil {
plog.Error("Feedline", err, "error declaring queue, name: ", name)
panic(err)
}

return Fl{
amqpChan: ch,
queueName: q.Name,
}
}

func (fl *Fl) Push(flu FLU) {

// Send only the models.Feedline part of the flu in bytes
bty, _ := json.Marshal(flu.FeedLineUnit)

// This is async
// TODO Think about a way to guarantee this operation also
err := fl.amqpChan.Publish(
"", // exchange
fl.queueName, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
Body: bty,
})
if err != nil {
plog.Error("Feedline", err, "error publishing to channel", "flu_id: "+flu.ID.String())
panic(err)
}

// Just for safety: if someone forgets
// to ConfirmReceive the flu received from a queue
// then reconfirm it here as it will most
// probably be a bug
if flu.delivery.Acknowledger != nil {
flu.ConfirmReceive()
}
}

func (fl *Fl) Receiver() <-chan FLU {

println("Feedline, subscribe request: ", fl.queueName)

var fluChan chan FLU
var flag bool = false

fl.once.Do(func() {

fluChan = make(chan FLU, 50000)

deliveryChan, err := fl.amqpChan.Consume(
fl.queueName, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
plog.Error("Feedline", err, "error consuming queue, name:", fl.queueName)
panic(err)
}

go func() {

for msg := range deliveryChan {

flu := models.FeedLineUnit{}
json.Unmarshal(msg.Body, &flu)

fluChan <- FLU{
FeedLineUnit: flu,
delivery: msg,
}
}
}()

flag = true
})

if flag {
return (<-chan FLU)(fluChan)
} else {
panic(errors.New("Feedline already subscribed, name: " + fl.queueName))
}

}
2 changes: 1 addition & 1 deletion app/DAL/feed_line/feed_line_unit_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func TestNew(t *testing.T) {
},
})

flu := <-fl.Out()
flu := <-fl.Receiver()

flu.ConfirmReceive()

Expand Down
4 changes: 2 additions & 2 deletions app/api/builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import (

"time"

"github.com/crowdflux/angel/app/services/work_flow_svc/step/crowdsourcing_step_svc/crowdsourcing_step_transport"
"github.com/crowdflux/angel/app/services/work_flow_svc/step/crowdsourcing_step_svc"
"github.com/crowdflux/angel/app/services/work_flow_svc/step/manual_step_svc"
"github.com/itsjamie/gin-cors"
"github.com/newrelic/go-agent"
Expand Down Expand Up @@ -69,7 +69,7 @@ func Build() {
api.POST("/bulkdownloadimages", handlers.BulkDownloadImages)
api.POST("/bulkdownloadimagesfromcsv", handlers.BulkDownloadedImagesFromCSV)

crowdsourcing_step_transport.AddHttpTransport(api)
crowdsourcing_step_svc.AddHttpTransport(api)
manual_step_svc.AddHttpTransport(api)
utils_api.AddHttpTransport(api)
}
Expand Down
19 changes: 19 additions & 0 deletions app/models/step_type/step_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,3 +40,22 @@ func (s *StepType) Scan(src interface{}) error {
*s = StepType(tmp)
return nil
}

var stepTypeNames = map[StepType]string{
CrowdSourcing: "CrowdSourcing",
InternalSourcing: "InternalSourcing",
Transformation: "Transformation",
Algorithm: "Algorithm",
Bifurcation: "Bifurcation",
Unification: "Unification",
Manual: "Manual",
Gateway: "Gateway",
Error: "Error",
}

func (s *StepType) String() string {
if name, ok := stepTypeNames[*s]; ok {
return name
}
return "NoName"
}
2 changes: 1 addition & 1 deletion app/models/uuid/uuid.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ func (u *UUID) UnmarshalBinary(data []byte) (err error) {
func (u UUID) Value() (driver.Value, error) {
if u == Nil {
defer func() {
plog.Info("Nil uuid Value() called")
//plog.Info("Nil uuid Value() called")
}()
return nil, nil
}
Expand Down
43 changes: 43 additions & 0 deletions app/services/flu_logger_svc/logger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package feed_line

import (
"database/sql"
"github.com/crowdflux/angel/app/models"
"github.com/crowdflux/angel/app/models/uuid"
"github.com/crowdflux/angel/app/services/work_flow_svc/feed_line"
"github.com/lib/pq"
"time"
)

func LogStepEntry(flu feed_line.FLU, metaData ...models.JsonF) {

log(flu, true, metaData)
}

func LogStepExit(flu feed_line.FLU, metaData ...models.JsonF) {

log(flu, false, metaData)
}

func log(flu feed_line.FLU, stepEntry bool, metaData ...models.JsonF) {

metaDataMerged := models.JsonF{}

for _, m := range metaData {
metaDataMerged.Merge(m)
}

metaDataMerged.Merge(models.JsonF{"build": flu.Build})

fluLog := models.FeedLineLog{
FluId: flu.ID,
Message: sql.NullString{"asfas", true},
MetaData: metaDataMerged,
StepType: sql.NullInt64{-1, false},
StepEntry: sql.NullBool{stepEntry, true},
StepExit: sql.NullBool{!stepEntry, true},
StepId: flu.StepId,
WorkFlowId: uuid.Nil,
CreatedAt: pq.NullTime{time.Now(), true},
}
}
62 changes: 62 additions & 0 deletions app/services/flu_logger_svc/logger_chan.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
package feed_line

import (
"encoding/json"
"github.com/crowdflux/angel/app/DAL/clients/rabbitmq"
"github.com/crowdflux/angel/app/models"
"github.com/crowdflux/angel/app/plog"
)

type feedLineLogChan struct {
}

func newLoggerChan() feedLineLogChan {

ch := rabbitmq.GetNewChannel()

q, err := ch.QueueDeclare(
"FeedlineLogQueue", // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)

if err != nil {
plog.Error("Feedline Logger", err, "error declaring queue")
panic(err)
}

outChan, err := ch.Consume(
q.Name, // queue
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
plog.Error("Feedline", err, "error consuming queue")
}

fl := Fl{make(chan FLU, 50000), outChan, ch, q.Name, false}

go func() {

for msg := range outChan {

flu := models.FeedLineUnit{}
json.Unmarshal(msg.Body, &flu)

fl.ch <- FLU{
FeedLineUnit: flu,
delivery: msg,
}
}

}()

return fl
}
22 changes: 12 additions & 10 deletions app/services/flu_svc/export.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
"github.com/crowdflux/angel/app/DAL/repositories/projects_repo"
"github.com/crowdflux/angel/app/services/flu_svc/flu_validator"
"github.com/crowdflux/angel/app/services/work_flow_svc"
"github.com/robfig/cron"
"time"
)

func New() IFluService {
Expand Down Expand Up @@ -37,16 +37,18 @@ func NewWithExposedValidators() IFluServiceExtended {

func StartFeedLineSync() {

fSvc := New()
c := cron.New()
go func() {

syncFeedLine := func() {
err := fSvc.SyncInputFeedLine()
if err != nil {
fmt.Println(err)
fSvc := New()

ticker := time.Tick(time.Duration(2) * time.Minute)

for _ = range ticker {
err := fSvc.SyncInputFeedLine()
if err != nil {
fmt.Println(err)
}
}
}
}()

c.AddFunc("0 */2 * * * *", syncFeedLine)
c.Start()
}
2 changes: 1 addition & 1 deletion app/services/flu_svc/flu_output/flu_monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func (fm *FluMonitor) AddToOutputQueue(flu models.FeedLineUnit) error {

func (fm *FluMonitor) AddManyToOutputQueue(fluBundle []models.FeedLineUnit) error {

plog.Info("FLu Monitor", fluBundle)
plog.Info("FLu Monitor, flubundle count:", len(fluBundle))

mutex.Lock()
for _, flu := range fluBundle {
Expand Down
Loading

0 comments on commit b4a7c25

Please sign in to comment.