Skip to content

Commit

Permalink
[#59] refactoring example
Browse files Browse the repository at this point in the history
  • Loading branch information
dwkang committed Dec 13, 2022
1 parent 582c8de commit bb0473d
Showing 1 changed file with 36 additions and 57 deletions.
93 changes: 36 additions & 57 deletions plugin/goelastic/example/goelastic.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ package main

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
Expand All @@ -18,53 +20,40 @@ import (
"github.com/pinpoint-apm/pinpoint-go-agent/plugin/http"
)

//The example code below is from https://github.com/elastic/go-elasticsearch/blob/master/_examples/main.go.
// The example below is referred from
// https://github.com/elastic/go-elasticsearch/blob/master/_examples/main.go.

func goelastic(w http.ResponseWriter, req *http.Request) {
func elasticTest(w http.ResponseWriter, req *http.Request) {
ctx := req.Context()
log.SetFlags(0)

var (
r map[string]interface{}
wg sync.WaitGroup
)

addr := []string{"http://localhost:9200"}
es, err := elasticsearch.NewClient(
elasticsearch.Config{
Transport: ppgoelastic.NewTransport(nil),
Addresses: addr,
Transport: ppgoelastic.NewTransport(nil, addr),
})
//es, err := elasticsearch.NewDefaultClient()
if err != nil {
log.Fatalf("Error creating the client: %s", err)
}

// 1. Get cluster info
//
res, err := es.Info()
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
defer res.Body.Close()
// Check response status
if res.IsError() {
log.Fatalf("Error: %s", res.String())
}
// Deserialize the response into a map.
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print client and server version numbers.
log.Printf("Client: %s", elasticsearch.Version)
log.Printf("Server: %s", r["version"].(map[string]interface{})["number"])
log.Println(strings.Repeat("~", 37))

// 2. Index documents concurrently
//
for i, title := range []string{"Test One" /*, "Test Two"*/} {
indexDocument(ctx, es)
buf := searchDocument(ctx, es)

io.WriteString(w, buf.String())
}

func indexDocument(ctx context.Context, es *elasticsearch.Client) {
var wg sync.WaitGroup
tracer := pinpoint.FromContext(ctx)

for i, title := range []string{"Test One", "Test Two"} {
wg.Add(1)

go func(i int, title string) {
go func(i int, title string, asyncTracer pinpoint.Tracer) {
defer wg.Done()
defer asyncTracer.EndSpan() //!!must be called
defer asyncTracer.NewSpanEvent("indexRequest").EndSpanEvent()

// Build the request body.
var b strings.Builder
Expand All @@ -81,7 +70,7 @@ func goelastic(w http.ResponseWriter, req *http.Request) {
}

// Perform the request with the client.
res, err := req.Do(ctx, es)
res, err := req.Do(pinpoint.NewContext(context.Background(), asyncTracer), es)
if err != nil {
log.Fatalf("Error getting response: %s", err)
}
Expand All @@ -99,15 +88,12 @@ func goelastic(w http.ResponseWriter, req *http.Request) {
log.Printf("[%s] %s; version=%d", res.Status(), r["result"], int(r["_version"].(float64)))
}
}
}(i, title)
}(i, title, tracer.NewGoroutineTracer())
}
wg.Wait()
}

log.Println(strings.Repeat("-", 37))

// 3. Search for the indexed documents
//
// Build the request body.
func searchDocument(ctx context.Context, es *elasticsearch.Client) bytes.Buffer {
var buf bytes.Buffer
query := map[string]interface{}{
"query": map[string]interface{}{
Expand All @@ -121,7 +107,7 @@ func goelastic(w http.ResponseWriter, req *http.Request) {
}

// Perform the search request.
res, err = es.Search(
res, err := es.Search(
es.Search.WithContext(ctx),
es.Search.WithIndex("test"),
es.Search.WithBody(&buf),
Expand All @@ -147,30 +133,24 @@ func goelastic(w http.ResponseWriter, req *http.Request) {
}
}

var r map[string]interface{}
if err := json.NewDecoder(res.Body).Decode(&r); err != nil {
log.Fatalf("Error parsing the response body: %s", err)
}
// Print the response status, number of results, and request duration.
log.Printf(
"[%s] %d hits; took: %dms",
res.Status(),
int(r["hits"].(map[string]interface{})["total"].(map[string]interface{})["value"].(float64)),
int(r["took"].(float64)),
)
// Print the ID and document source for each hit.
js, _ := json.MarshalIndent(r, "", " ")

buf.Reset()
fmt.Fprintf(&buf, "[%s]\n %s\n\n", res.Status(), string(js))
for _, hit := range r["hits"].(map[string]interface{})["hits"].([]interface{}) {
log.Printf(" * ID=%s, %s", hit.(map[string]interface{})["_id"], hit.(map[string]interface{})["_source"])
fmt.Fprintf(&buf, "ID=%s, %s\n", hit.(map[string]interface{})["_id"], hit.(map[string]interface{})["_source"])
}

log.Println(strings.Repeat("=", 37))

io.WriteString(w, "hello goelastic!!")
return buf
}

func main() {
opts := []pinpoint.ConfigOption{
pinpoint.WithAppName("GoElasticTest"),
pinpoint.WithAgentId("GoElasticTestAgent"),
//pinpoint.WithCollectorHost("localhost"),
pinpoint.WithConfigFile(os.Getenv("HOME") + "/tmp/pinpoint-config.yaml"),
}
cfg, _ := pinpoint.NewConfig(opts...)
Expand All @@ -180,7 +160,6 @@ func main() {
}
defer agent.Shutdown()

http.HandleFunc("/goelastic", pphttp.WrapHandlerFunc(goelastic))

http.HandleFunc("/elastic", pphttp.WrapHandlerFunc(elasticTest))
http.ListenAndServe(":9000", nil)
}

0 comments on commit bb0473d

Please sign in to comment.