Skip to content

Commit

Permalink
Merge branch 'main' into dependabot/npm_and_yarn/auxiliary/socket.io-…
Browse files Browse the repository at this point in the history
…parser-3.3.3
  • Loading branch information
kevmo314 committed Dec 22, 2022
2 parents ffa9c0d + a9939ac commit 37f8718
Show file tree
Hide file tree
Showing 26 changed files with 464 additions and 1,357 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/agent.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,4 @@ jobs:
- name: Deploy to GCE
id: deploy
run: |
gcloud compute instance-groups managed rolling-action replace agent-small-us-central1 --region=us-central1
gcloud compute instance-groups managed rolling-action start-update agent-small-us-central1 --project=rtchat-47692 --type='proactive' --max-surge=1 --max-unavailable=1 --minimal-action='replace' --replacement-method='substitute' --version=template=https://www.googleapis.com/compute/beta/projects/rtchat-47692/global/instanceTemplates/agent-e2-small --zone=us-central1-c
286 changes: 68 additions & 218 deletions agent/cmd/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,207 +2,101 @@ package main

import (
"context"
"errors"
"fmt"
"io"
"net/http"
"os"
"os/signal"
"sync"
"sync/atomic"
"syscall"
"time"

"cloud.google.com/go/compute/metadata"
"cloud.google.com/go/firestore"
"github.com/google/uuid"
"github.com/muxable/rtchat/agent/internal/agent"
"github.com/muxable/rtchat/agent/internal/handler"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
"golang.org/x/sync/errgroup"
)

func NewGCPLogger() (*zap.Logger, error) {
loggerCfg := &zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.InfoLevel),
Encoding: "json",
EncoderConfig: encoderConfig,
OutputPaths: []string{"stdout"},
ErrorOutputPaths: []string{"stderr"},
}

return loggerCfg.Build(zap.AddStacktrace(zap.DPanicLevel))
}


var encoderConfig = zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "severity",
NameKey: "logger",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: encodeLevel(),
EncodeTime: zapcore.RFC3339TimeEncoder,
EncodeDuration: zapcore.MillisDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
}

func encodeLevel() zapcore.LevelEncoder {
return func(l zapcore.Level, enc zapcore.PrimitiveArrayEncoder) {
switch l {
case zapcore.DebugLevel:
enc.AppendString("DEBUG")
case zapcore.InfoLevel:
enc.AppendString("INFO")
case zapcore.WarnLevel:
enc.AppendString("WARNING")
case zapcore.ErrorLevel:
enc.AppendString("ERROR")
case zapcore.DPanicLevel:
enc.AppendString("CRITICAL")
case zapcore.PanicLevel:
enc.AppendString("ALERT")
case zapcore.FatalLevel:
enc.AppendString("EMERGENCY")
}
}
}

func quitContext() context.Context {
sigs := make(chan os.Signal, 1)
signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM)

preemption := make(chan struct{})
go func() {
req, err := http.NewRequest("GET", "http://metadata.google.internal/computeMetadata/v1/instance/preempted?wait_for_change=true", nil)
if err != nil {
zap.L().Warn("failed to create preemption request", zap.Error(err))
return
}
req.Header.Add("Metadata-Flavor", "Google")
for {
res, err := http.DefaultClient.Do(req)
if err != nil {
if os.IsTimeout(err) {
continue
}
zap.L().Warn("failed to send preemption request", zap.Error(err))
return
var loggerConfig = &zap.Config{
Level: zap.NewAtomicLevelAt(zapcore.InfoLevel),
Encoding: "json",
EncoderConfig: zapcore.EncoderConfig{
TimeKey: "time",
LevelKey: "severity",
NameKey: "logger",
CallerKey: "caller",
MessageKey: "message",
StacktraceKey: "stacktrace",
LineEnding: zapcore.DefaultLineEnding,
EncodeLevel: func(l zapcore.Level, enc zapcore.PrimitiveArrayEncoder) {
switch l {
case zapcore.DebugLevel:
enc.AppendString("DEBUG")
case zapcore.InfoLevel:
enc.AppendString("INFO")
case zapcore.WarnLevel:
enc.AppendString("WARNING")
case zapcore.ErrorLevel:
enc.AppendString("ERROR")
case zapcore.DPanicLevel:
enc.AppendString("CRITICAL")
case zapcore.PanicLevel:
enc.AppendString("ALERT")
case zapcore.FatalLevel:
enc.AppendString("EMERGENCY")
}
if res.StatusCode != http.StatusOK {
zap.L().Warn("preemption request failed", zap.Int("status", res.StatusCode))
continue
}
close(preemption)
}
}()

ctx, cancel := context.WithCancel(context.Background())
go func() {
select {
case <-preemption:
zap.L().Info("instance is being preempted")
cancel()
break
case <-sigs:
zap.L().Info("received termination signal")
cancel()
break
}
}()

http.HandleFunc("/quitquitquit", func(w http.ResponseWriter, r *http.Request) {
zap.L().Info("received quitquitquit request")
cancel()
w.Write([]byte("ok"))
})

return ctx
}

func fetchAgentID() (agent.AgentID, error) {
req, err := http.NewRequest("GET", "http://metadata.google.internal/computeMetadata/v1/instance/id", nil)
if err != nil {
return "", err
}
req.Header.Add("Metadata-Flavor", "Google")
res, err := http.DefaultClient.Do(req)
if err != nil {
return "", err
}
if res.StatusCode != http.StatusOK {
return "", fmt.Errorf("failed to get instance id: %d", res.StatusCode)
}
defer res.Body.Close()

body, err := io.ReadAll(res.Body)
if err != nil {
return "", err
}
return agent.AgentID(body), nil
},
EncodeTime: zapcore.ISO8601TimeEncoder,
EncodeDuration: zapcore.MillisDurationEncoder,
EncodeCaller: zapcore.ShortCallerEncoder,
},
OutputPaths: []string{"stdout"},
ErrorOutputPaths: []string{"stderr"},
}

func main() {
agentID, err := fetchAgentID()
if err != nil {
logger, err := zap.NewDevelopment()
if metadata.OnGCE() {
logger, err := loggerConfig.Build(zap.AddStacktrace(zap.ErrorLevel))
if err != nil {
panic(err)
}
defer logger.Sync()
agentID = agent.AgentID(fmt.Sprintf("pseudo:%s", uuid.New().String()))
undo := zap.ReplaceGlobals(logger.With(zap.String("agentId", string(agentID))))
undo := zap.ReplaceGlobals(logger)
defer undo()
zap.L().Warn("failed to fetch agent id", zap.Error(err))
} else {
logger, err := NewGCPLogger()
logger, err := zap.NewDevelopment()
if err != nil {
panic(err)
}
defer logger.Sync()
undo := zap.ReplaceGlobals(logger.With(zap.String("agentId", string(agentID))))
undo := zap.ReplaceGlobals(logger)
defer undo()
}

// don't use the application context because we will perform cleanup on SIGINT/SIGTERM
client, err := firestore.NewClient(context.Background(), "rtchat-47692")
if err != nil {
zap.L().Fatal("failed to create firestore client", zap.Error(err))
}

terminateCtx, terminate := context.WithCancel(quitContext())

go agent.RunWatchdog(terminateCtx, agentID, client)

// create a new RequestLock
lock := agent.NewRequestLock(terminateCtx, agentID, client)
lock := client.Collection("assignments").Snapshots(context.Background())

handler, err := handler.NewHandler(lock, client)
handler, err := handler.NewHandler(client)
if err != nil {
zap.L().Fatal("failed to create handler", zap.Error(err))
}

activeChannels := &sync.Map{}

go func() {
for {
done := false
select {
case <-terminateCtx.Done():
done = true
case <-time.After(15 * time.Second):
}
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for range ticker.C {
chs := make([]string, 0)
activeChannels.Range(func(key, value interface{}) bool {
chs = append(chs, key.(string))
return true
})
zap.L().Info("active channels", zap.Strings("channels", chs))
if done {
break
}
}
}()

Expand All @@ -218,83 +112,39 @@ func main() {

go func() {
zap.L().Info("starting http server")
if err := http.ListenAndServe(":8082", nil); err != nil {
port := os.Getenv("PORT")
if port == "" {
port = "8080"
}
if err := http.ListenAndServe(":"+port, nil); err != nil {
zap.L().Fatal("failed to start http server", zap.Error(err))
}
}()

var errwg errgroup.Group

for {
isListening.Store(true)
req, err := lock.Next()
snapshot, err := lock.Next()
isListening.Store(false)
if err != nil {
if !errors.Is(err, context.Canceled) {
zap.L().Error("failed to get next request", zap.Error(err))
}
terminate()
break
}
if req.String() != "twitch:muxfd" {
continue
}
zap.L().Info("got request", zap.String("id", req.String()))
// check if it's in the active channels map
if _, loaded := activeChannels.LoadOrStore(req.String(), struct{}{}); loaded {
zap.L().Info("request already active", zap.String("id", req.String()))
continue
return
}

errwg.Go(func() error {
defer activeChannels.Delete(req.String())

joinCtx, err := handler.Join(req)
if err != nil {
zap.L().Error("failed to open request", zap.Error(err))
return nil
}

// lock the request
claim, err := agent.LockClaim(req)
if err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
return fmt.Errorf("failed to lock request: %w", err)
}

go func() {
select {
case <-joinCtx.ReconnectCtx.Done():
break
case <-terminateCtx.Done():
break
}
if err := claim.Unlock(); err != nil {
zap.L().Error("failed to unlock request", zap.Error(err))
for _, change := range snapshot.Changes {
if change.Kind != firestore.DocumentRemoved {
req := (*agent.Request)(change.Doc.Ref)
zap.L().Info("got request", zap.String("id", req.String()))
// check if it's in the active channels map
if _, loaded := activeChannels.LoadOrStore(req.String(), struct{}{}); loaded {
zap.L().Info("request already active", zap.String("id", req.String()))
continue
}
}()

if err := claim.Wait(); err != nil {
zap.L().Error("failed to wait for request lock", zap.Error(err))
go func() {
if err := handler.Join(req); err != nil {
zap.L().Error("failed to open request", zap.Error(err))
return
}
}()
time.Sleep(200 * time.Millisecond)
}

// leave
if err := joinCtx.Close(); err != nil {
zap.L().Error("failed to leave request", zap.Error(err))
}

// call unlock again to ensure it's unlocked
if err := claim.Unlock(); err != nil {
return fmt.Errorf("failed to unlock request: %w", err)
}

return nil
})
}

if err := errwg.Wait(); err != nil {
zap.L().Error("failed to join request", zap.Error(err))
}
}
}

0 comments on commit 37f8718

Please sign in to comment.