Skip to content

Commit

Permalink
ack messages in the nats consumer, display them on the client
Browse files Browse the repository at this point in the history
  • Loading branch information
ryanpbrewster committed Dec 25, 2021
1 parent 8106011 commit 4291106
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 9 deletions.
2 changes: 1 addition & 1 deletion api/Dockerfile
Expand Up @@ -9,4 +9,4 @@ RUN go build ./cmd/api_server/api_server.go
FROM debian:buster AS runner
RUN mkdir /app
COPY --from=builder /workspace/api_server /app/api_server
ENTRYPOINT ["/app/api_server"]
ENTRYPOINT ["/app/api_server", "--nats=nats://address-chat-nats.internal:4222"]
19 changes: 13 additions & 6 deletions api/cmd/api_server/api_server.go
Expand Up @@ -2,6 +2,7 @@ package main

import (
"encoding/json"
"flag"
"fmt"
"log"
"net/http"
Expand Down Expand Up @@ -62,10 +63,7 @@ func wsDriver(nc *nats.Conn, conn *websocket.Conn) {
if err != nil {
log.Fatalf("could not marshall msg: %s", err)
}
if err := nc.Publish(fmt.Sprintf("MESSAGES.%s", msg.From), data); err != nil {
log.Fatalf("could not publish to nats: %s", err)
}
for _, addr := range msg.To {
for addr := range msg.Participants() {
if err := nc.Publish(fmt.Sprintf("MESSAGES.%s", addr), data); err != nil {
log.Fatalf("could not publish to nats: %s", err)
}
Expand Down Expand Up @@ -99,6 +97,7 @@ func wsDriver(nc *nats.Conn, conn *websocket.Conn) {
if err != nil {
log.Fatalf("could not subscribe to MESSAGES: %s", err)
}
log.Println("subscribed to:", subj)
for {
message, err := sub.NextMsg(1 * time.Second)
if err == nats.ErrTimeout {
Expand All @@ -107,6 +106,11 @@ func wsDriver(nc *nats.Conn, conn *websocket.Conn) {
if err != nil {
log.Fatalf("unexpected error reading from nats: %s", err)
}
meta, err := message.Metadata()
if err != nil {
log.Fatalf("unexpected metadata error: %s", err)
}
log.Println("received nats message:", meta)
var msg protocol.Message
if err := json.Unmarshal(message.Data, &msg); err != nil {
log.Fatalf("could not decode message: %s", err)
Expand All @@ -115,6 +119,9 @@ func wsDriver(nc *nats.Conn, conn *websocket.Conn) {
log.Println("write to websocket:", err)
return
}
if err := message.AckSync(); err != nil {
log.Println("ack:", err)
}
}
}()
}
Expand Down Expand Up @@ -163,8 +170,8 @@ func healthCheckHandler(w http.ResponseWriter, r *http.Request) {
}

func main() {
nc, err := nats.Connect("nats://address-chat-nats.internal:4222")
// nc, err := nats.Connect(nats.DefaultURL)
natsUrl := flag.String("nats", nats.DefaultURL, "the url for the NATS cluster")
nc, err := nats.Connect(*natsUrl)
if err != nil {
log.Fatalf("could not connect to nats: %s", err)
}
Expand Down
9 changes: 9 additions & 0 deletions api/protocol/protocol.go
Expand Up @@ -22,3 +22,12 @@ type Message struct {
To []string `json:"to"`
Content string `json:"content"`
}

func (m Message) Participants() map[string]bool {
ps := make(map[string]bool)
ps[m.From] = true
for _, addr := range m.To {
ps[addr] = true
}
return ps
}
17 changes: 15 additions & 2 deletions frontend/src/SignedIn.svelte
Expand Up @@ -6,6 +6,11 @@
readonly address: string;
readonly name?: string;
}
interface Message {
readonly from: string;
readonly to: readonly string[];
readonly content: string;
}
import { ethers } from "ethers";
import Mailbox from "./Mailbox.svelte";
Expand All @@ -18,9 +23,10 @@
author = { ...author, name };
});
const ws = new WebSocket("wss://address-chat-api.fly.dev/ws");
// const ws = new WebSocket("ws://localhost:8080/ws");
// const ws = new WebSocket("wss://address-chat-api.fly.dev/ws");
const ws = new WebSocket("ws://localhost:8080/ws");
let authenticatedUntil: number | null = null;
let messages: readonly Message[] = [];
ws.onopen = (evt) => {
console.log("[OPEN]", evt);
ws.send(token);
Expand All @@ -31,6 +37,8 @@
const msg = JSON.parse(evt.data);
if (typeof msg.authenticatedUntil === "number") {
authenticatedUntil = msg.authenticatedUntil || null;
} else {
messages = [...messages, msg];
}
} catch (e) {
console.error(e);
Expand Down Expand Up @@ -80,6 +88,11 @@
}
</script>

<div>
{#each messages as m}
<p>{m.from} -> {m.to}: {m.content}</p>
{/each}
</div>
<div class="center">
<table>
<tbody>
Expand Down

0 comments on commit 4291106

Please sign in to comment.