Skip to content

LiangXianSen/mambed

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

2 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

Mambed

Mambed is a clickhouse Injector which implements Worker. On other hands Mambed consumes message from up-stream cyclic batch-insert to clickhouse. There is a important thing before Mambed works, you have to implements CKJob.

// Job is a CKjob Factory type which is NewMambed() requires.
type Job func() CKJob

// CKJob is a clickhouse job to tell Mambed how it works
// requires you to define a clickhouse stmt (clickhouse SQL)
// declares a struct represents the item you will insert
// fills fields waiting Mambed Commit().
type CKJob interface {
	// Stmt returns clickhouse SQL stmt.
	Stmt() string
	// FillData receive message, declares your struct then fill them.
	FillData(data interface{}) error
	// Execute fills fields the stmt needs.
	Execute(*sql.Stmt) error
	// String for output of the item.
	String() string
}

Firstly, implememts CKJob, then new a Mambed instance. Binds CKJob on Mambed, runs Mambed worker.

Declare a struct then implememts CKJob:

// Declare a Job type
func NewPerson() CKJob {
	return &Person{}
}

// Person implememts CKJob.
type Person struct {
	Name string
	Age  uint8
}

func (p *Person) String() string {
	return p.Name
}

func (p *Person) Stmt() string {
	return personStmt
}

func (p *Person) Execute(stmt *sql.Stmt) error {
	if _, err := stmt.Exec(
		p.Name,
		p.Age,
	); err != nil {
		return err
	}
	return nil
}

func validateResource(message interface{}) (*Message, error) {
	if req, ok := message.(*Message); ok {
		return req, nil
	}
	return nil, errors.New("incorrect message")
}

func (p *Person) FillData(message interface{}) error {
	req, err := validateResource(message)
	if err != nil {
		return err
	}
	p.Name = req.Name
	p.Age = uint8(req.Age)

	return nil
}

const personStmt = `INSERT INTO person (
	name, 
	age
) VALUES (?, ?)`

New Mambed instance requires, Config & Job type;

conf := &Config{
  MaxBatchLength: 1000,
  MaxBatchTime:   time.Second * 1,
  CKAccessURI:    "tcp://10.4.197.101:9001",
}

worker := NewMambed(conf, NewPerson)
go worker.Run()
worker.Close()
worker.Done()

You can define any CKJob , underlie on Mambed. Binds Mambed runs. The best choice is register in worker package.

Examples:

  • same up-stream distribute to multi-Jobs.
manager := worker.NewWorkerManager()
visitor := NewMambed(conf, NewPerson)
tracker := NewMambed(conf, NewTrace)

manager.Register(
  visitor,
  tracker,
)

go manager.RunOnDistribute()
for i := 0; i < 100; i++ {
  manager.Consume(i)
}
manager.Exit()
  • For efficiency, same CKJob consume up-stream together.
manager := worker.NewWorkerManager()
v1 := NewMambed(conf, NewPerson)
v2 := NewMambed(conf, NewPerson)

manager.Register(
  v1,
  v2,
)

go manager.RunOnCoWork()
for i := 0; i < 100; i++ {
  manager.Consume(i)
}
manager.Exit()