Skip to content

Conversation

@odeke-em
Copy link
Member

Given:

Server

package main

import (
	"context"
	"encoding/json"
	"io"
	"io/ioutil"
	"log"
	"net/http"

	"github.com/go-redis/redis"
	"github.com/orijtech/otils"

	"go.opencensus.io/exporter/stackdriver"
	"go.opencensus.io/plugin/ochttp"
	"go.opencensus.io/stats/view"
	"go.opencensus.io/trace"
)

func main() {
	trace.ApplyConfig(trace.Config{DefaultSampler: trace.AlwaysSample()})
	se, err := stackdriver.NewExporter(stackdriver.Options{
		ProjectID:    "census-demos",
		MetricPrefix: "cmstore",
	})
	if err != nil {
		log.Fatalf("Failed to create Stackdriver exporter: %v", err)
	}
	trace.RegisterExporter(se)
	view.RegisterExporter(se)
	if err := view.Register(ochttp.DefaultServerViews...); err != nil {
		log.Fatalf("Failed to register server views: %v", err)
	}
	if err := view.Register(ochttp.DefaultClientViews...); err != nil {
		log.Fatalf("Failed to register client views: %v", err)
	}
	if err := view.Register(redis.ObservabilityMetricViews...); err != nil {
		log.Fatalf("Failed to register redis observability metric views: %v", err)
	}

	mux := http.NewServeMux()
	mux.HandleFunc("/hdel", hdel)
	mux.HandleFunc("/hget", hget)
	mux.HandleFunc("/hset", hset)
	h := &ochttp.Handler{Handler: mux}
	if err := http.ListenAndServe(":9889", h); err != nil {
		log.Fatalf("Failed to serve: %v", err)
	}
}

type params struct {
	Table string `json:"table"`
	Key   string `json:"key"`
	Value string `json:"value"`
}

func hget(w http.ResponseWriter, r *http.Request) {
	ctx, span := trace.StartSpan(r.Context(), "hget")
	defer span.End()

	client := newRedisClient(ctx)
	defer client.Close()

	pm := new(params)
	if err := parseJSON(r.Body, pm); err != nil {
		http.Error(w, err.Error(), http.StatusUnprocessableEntity)
		return
	}
	val, err := client.HGet(pm.Table, pm.Key).Result()
	errStr := ""
	if err != nil {
		errStr = err.Error()
	}
	outBlob, err := json.MarshalIndent(map[string]string{
		"value": val,
		"err":   errStr,
	}, "", "  ")
	if err != nil {
		http.Error(w, err.Error(), http.StatusUnprocessableEntity)
		return
	}
	w.Write(outBlob)
}

func newRedisClient(ctx context.Context) *redis.Client {
	return redis.NewClient(&redis.Options{
		Addr:    otils.EnvOrAlternates("REDIS_SERVER_ADDR", ":6379"),
		Context: ctx,
	})
}

func hdel(w http.ResponseWriter, r *http.Request) {
	ctx, span := trace.StartSpan(r.Context(), "hdel")
	defer span.End()

	client := newRedisClient(ctx)
	defer client.Close()

	pm := new(params)
	if err := parseJSON(r.Body, pm); err != nil {
		http.Error(w, err.Error(), http.StatusUnprocessableEntity)
		return
	}
	val, err := client.HDel(pm.Table, pm.Key).Result()
	errStr := ""
	if err != nil {
		errStr = err.Error()
	}
	outBlob, err := json.MarshalIndent(map[string]interface{}{
		"value": val,
		"err":   errStr,
	}, "", "  ")
	if err != nil {
		http.Error(w, err.Error(), http.StatusUnprocessableEntity)
		return
	}
	w.Write(outBlob)
}

func parseJSON(rc io.ReadCloser, save interface{}) error {
	blob, err := ioutil.ReadAll(rc)
	_ = rc.Close()
	if err != nil {
		return err
	}
	return json.Unmarshal(blob, save)
}

func hset(w http.ResponseWriter, r *http.Request) {
	ctx, span := trace.StartSpan(r.Context(), "hset")
	defer span.End()

	client := newRedisClient(ctx)
	defer client.Close()

	pm := new(params)
	if err := parseJSON(r.Body, pm); err != nil {
		span.SetStatus(trace.Status{Code: int32(trace.StatusCodeInternal), Message: err.Error()})
		http.Error(w, err.Error(), http.StatusUnprocessableEntity)
		return
	}

	ok, err := client.HSet(pm.Table, pm.Key, pm.Value).Result()
	errStr := ""
	if err != nil {
		errStr = err.Error()
	}
	outBlob, err := json.MarshalIndent(map[string]interface{}{
		"ok":  ok,
		"err": errStr,
	}, "", "  ")
	if err != nil {
		http.Error(w, err.Error(), http.StatusUnprocessableEntity)
		return
	}
	w.Write(outBlob)

}

Client

package main

import (
	"bytes"
	"fmt"
	"io/ioutil"
	"math/rand"
	"net/http"
	"time"
)

func main() {
	bodies := []string{
		`{"table": "stories", "key":"foo", "value":"ghost"}`,
		`{"table": "protocols", "key":"version", "value":"ipv4"}`,
		`{"table": "devices", "key":"Kindle", "value":"v1"}`,
		`{"table": "customers", "key":"Barnes & Noble", "value":"ox"}`,
	}
	routes := []string{
		"/hset",
		"/hdel",
		"/hget",
	}

	rand.Seed(time.Now().Unix())
	for i := uint64(0); ; i++ {
		// Pick a random route
		routePerm := rand.Perm(len(routes))
		for _, ithRoute := range routePerm {
			bodiesPerm := rand.Perm(len(bodies))
			for _, ithBody := range bodiesPerm {
				body := []byte(bodies[ithBody])
				route := routes[ithRoute]
				req, _ := http.NewRequest("POST", "http://localhost:9889"+route, bytes.NewReader(body))
				res, _ := http.DefaultClient.Do(req)
				resBlob, _ := ioutil.ReadAll(res.Body)
				_ = res.Body.Close()
				fmt.Printf("#%d (%s) ==> %s\n\n", i, route, resBlob)
			}
		}
		sleepDuration := time.Millisecond * time.Duration(rand.Float64()*2e3)
		fmt.Printf("Sleeping for %s\n", sleepDuration)
		time.Sleep(sleepDuration)
	}
}

Tracing results

screen shot 2018-05-27 at 9 02 19 pm

Monitoring results

screen shot 2018-05-27 at 9 02 41 pm
screen shot 2018-05-27 at 9 03 57 pm
screen shot 2018-05-27 at 9 03 40 pm
screen shot 2018-05-27 at 9 03 20 pm
screen shot 2018-05-27 at 9 03 06 pm

odeke-em added 2 commits May 27, 2018 20:54
Just for purposes of following common convention, use
milliseconds instead of seconds.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants