Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Need better example #21

Open
0xd3e opened this issue Jan 20, 2017 · 3 comments
Open

Need better example #21

0xd3e opened this issue Jan 20, 2017 · 3 comments

Comments

@0xd3e
Copy link

0xd3e commented Jan 20, 2017

Hi,

can anyone explain how I can use this library?
A complete example would be nice. Your splitsentence example differs completely from the description. This is confusing.

Thanks!

@asaarashi
Copy link

As a newbie of Storm, also confused by the example

@wilriker
Copy link
Contributor

We just fought a whole working day through this and we came up with the following working example. It emits a timestamp encoded in a []byte from a Spout and calculates the latency inside the Bolt.

// spout.go
package main

import (
	"encoding/binary"
	"fmt"
	"log"
	"os"
	"strconv"
	"time"

	"github.com/jsgilmore/gostorm"
	_ "github.com/jsgilmore/gostorm/encodings"
	stormmsg "github.com/jsgilmore/gostorm/messages"
)

type timestampSpout struct {
	payloadSize int
	stream      string
	context     *stormmsg.Context
	collector   gostorm.SpoutOutputCollector
	idCounter   uint64
}

func (s *timestampSpout) NextTuple() {
	tb := make([]byte, s.payloadSize)
	binary.LittleEndian.PutUint64(tb, uint64(time.Now().UnixNano()))
	s.collector.Emit(strconv.FormatUint(s.idCounter, 10), s.stream, tb)
	s.idCounter++
}

func (s *timestampSpout) Acked(id string) {
	log.Println("Acked", id)
}

func (s *timestampSpout) Failed(id string) {
	log.Println("Failed", id)
}

func (s *timestampSpout) Exit() {
	log.Println("Exiting")
}

func (s *timestampSpout) Open(ctx *stormmsg.Context, collector gostorm.SpoutOutputCollector) {
	s.context = ctx
	s.collector = collector
}

func main() {

	// Logging is done to an output file, since stdout and stderr are captured
	fo, err := os.Create(fmt.Sprintf("timestampSpout-%d.txt", os.Getpid()))
	if err != nil {
		panic(err)
	}
	defer fo.Close()
	log.SetOutput(fo)

	encoding := "jsonEncoded"
	s := &timestampSpout{
		payloadSize: 90,
		stream:      "",
	}
	gostorm.RunSpout(s, encoding)
}
// bolt.go
package main

import (
	"encoding/binary"
	"fmt"
	"log"
	"os"
	"time"

	"github.com/jsgilmore/gostorm"
	_ "github.com/jsgilmore/gostorm/encodings"
	stormmsg "github.com/jsgilmore/gostorm/messages"
)

type latencyBolt struct {
	context   *stormmsg.Context
	collector gostorm.OutputCollector
}

func (s *latencyBolt) Fields() []interface{} {
	ret := make([]interface{}, 1)
	payload := make([]byte, 90)
	ret[0] = &payload
	return ret
}

func (s *latencyBolt) Execute(meta stormmsg.BoltMsgMeta, fields ...interface{}) {
	if meta.GetStream() == "__heartbeat" {
		s.collector.SendAck(meta.Id)
		return
	}
	switch payload := fields[0].(type) {
	case *[]byte:
		now := time.Now()
		sent := int64(binary.LittleEndian.Uint64(*payload))
		nowNano := now.UnixNano()
		latency := nowNano - sent
		s.collector.Emit([]string{meta.Id}, "latency", latency)
		s.collector.SendAck(meta.Id)
		log.Println(meta.Id, latency)
	default:
		s.collector.SendFail(meta.Id)
	}
}

func (s *latencyBolt) Prepare(context *stormmsg.Context, collector gostorm.OutputCollector) {
	s.context = context
	s.collector = collector
}

func (s *latencyBolt) Cleanup() {
}

func main() {

	// Logging is done to an output file, since stdout and stderr are captured
	fo, err := os.Create(fmt.Sprintf("/tmp/latencyBolt-%d.txt", os.Getpid()))
	if err != nil {
		panic(err)
	}
	defer fo.Close()
	log.SetOutput(fo)

	go func() {
		if r := recover(); r != nil {
			log.Panicf("Recovered panic: %v", r)
		}
	}()

	encoding := "jsonEncoded"
	b := &latencyBolt{}
	gostorm.RunBolt(b, encoding)
}

Just for reference here's also our clojure definition for the Topology. It expects that the binaries are in the executing user's $HOME/bin dir.

; latency.clj
(ns storm-evaluation.latency
  (:require
    [org.apache.storm.clojure :as storm]
    [org.apache.storm.config :as config]))

(def home-bin-dir (str (System/getProperty "user.home") "/bin/"))

(defn mk-topology
  []
  (storm/topology
    {"1" (storm/shell-spout-spec (str home-bin-dir "spout") "" ["timestamps"] :p 1)}
    {"2" (storm/shell-bolt-spec {"1" :shuffle} (str home-bin-dir "bolt") "" ["latency"] :p 1)}))

@edrex
Copy link
Contributor

edrex commented Oct 6, 2017

I created a minimal example here:

https://github.com/sixgill/gostorm-runner

@dominiek

Not sure if this should be included in the gostorm repo or just linked in the docs.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants