Skip to content

Commit

Permalink
add geo URL and status JSON
Browse files Browse the repository at this point in the history
Signed-off-by: Colin Sullivan <colin@synadia.com>
  • Loading branch information
ColinSullivan1 committed Apr 7, 2022
1 parent 632b0f8 commit f6e4f8d
Showing 1 changed file with 39 additions and 20 deletions.
59 changes: 39 additions & 20 deletions examples/nats-echo/main.go
Expand Up @@ -47,14 +47,24 @@ func printMsg(m *nats.Msg, i int) {
log.Printf("[#%d] Echoing from [%s] to [%s]: %q", i, m.Subject, m.Reply, m.Data)
}

func printStatusMsg(m *nats.Msg, i int) {
log.Printf("[#%d] Sending status from [%s] to [%s]: %q", i, m.Subject, m.Reply, m.Data)
}

type serviceStatus struct {
Id string `json:"id"`
Geo string `json:"geo"`
}

func main() {
var urls = flag.String("s", nats.DefaultURL, "The nats server URLs (separated by comma)")
var userCreds = flag.String("creds", "", "User Credentials File")
var nkeyFile = flag.String("nkey", "", "NKey Seed File")
var serviceId = flag.String("id", "NATS Echo Service", "Identifier for this service")
var showTime = flag.Bool("t", false, "Display timestamps")
var showHelp = flag.Bool("h", false, "Show help message")
var geoloc = flag.Bool("geo", false, "Display geo location of echo service")
var geo string
var geo string = "unknown"

log.SetFlags(0)
flag.Usage = usage
Expand All @@ -74,7 +84,7 @@ func main() {
geo = lookupGeo()
}
// Connect Options.
opts := []nats.Option{nats.Name("NATS Echo Service")}
opts := []nats.Option{nats.Name(*serviceId)}
opts = setupConnOptions(opts)

if *userCreds != "" && *nkeyFile != "" {
Expand All @@ -101,37 +111,40 @@ func main() {
log.Fatal(err)
}

subj, i := args[0], 0
subj, iEcho, iStatus := args[0], 0, 0
statusSubj := subj + ".status"

handleMsg := func(msg *nats.Msg) {
i++
nc.QueueSubscribe(subj, "echo", func(msg *nats.Msg) {
iEcho++
printMsg(msg, iEcho)
if msg.Reply != "" {
printMsg(msg, i)
// Just echo back what they sent us.
if geo != "" {
m := fmt.Sprintf("[%s]: %q", geo, msg.Data)
nc.Publish(msg.Reply, []byte(m))
var payload []byte
if geo != "unknown" {
payload = []byte(fmt.Sprintf("[%s]: %q", geo, msg.Data))
} else {
nc.Publish(msg.Reply, msg.Data)
payload = msg.Data
}
nc.Publish(msg.Reply, payload)
}
}

nc.QueueSubscribe(subj, "echo", func(msg *nats.Msg) {
handleMsg(msg)
})
allSubj := subj + ".all"
nc.Subscribe(allSubj, func(msg *nats.Msg) {
handleMsg(msg)
nc.Subscribe(statusSubj, func(msg *nats.Msg) {
iStatus++
printStatusMsg(msg, iStatus)
if msg.Reply != "" {
payload, _ := json.Marshal(&serviceStatus{Id: *serviceId, Geo: geo})
nc.Publish(msg.Reply, payload)
}
})
nc.Flush()

if err := nc.LastError(); err != nil {
log.Fatal(err)
}

log.Printf("Echo Service ID: [%s]", *serviceId)
log.Printf("Echo Service listening on [%s]\n", subj)
log.Printf("Echo Service (All) listening on [%s]\n", allSubj)
log.Printf("Echo Service (Status) listening on [%s]\n", statusSubj)

// Now handle signal to terminate so we can drain on exit.
c := make(chan os.Signal, 1)
Expand Down Expand Up @@ -185,9 +198,15 @@ type geo struct {
// lookup our current region and country..
func lookupGeo() string {
c := &http.Client{Timeout: 2 * time.Second}
resp, err := c.Get("https://ipapi.co/json")

url := os.Getenv("ECHO_SVC_GEO_URL")
if len(url) == 0 {
url = "https://ipapi.co/json"
}

resp, err := c.Get(url)
if err != nil || resp == nil {
log.Fatalf("Could not retrive geo location data: %v", err)
log.Fatalf("Could not retrieve geo location data: %v", err)
}
defer resp.Body.Close()
body, _ := ioutil.ReadAll(resp.Body)
Expand Down

0 comments on commit f6e4f8d

Please sign in to comment.