-
Notifications
You must be signed in to change notification settings - Fork 8
/
devobserver.go
150 lines (120 loc) · 4.22 KB
/
devobserver.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
// Copyright 2022 Namespace Labs Inc; All rights reserved.
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
package hotreload
import (
"context"
"fmt"
"io"
"net"
"strings"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
"namespacelabs.dev/foundation/framework/rpcerrors/multierr"
"namespacelabs.dev/foundation/internal/cli/fncobra/name"
"namespacelabs.dev/foundation/internal/compute"
"namespacelabs.dev/foundation/internal/console"
"namespacelabs.dev/foundation/internal/fnerrors"
"namespacelabs.dev/foundation/internal/fnfs/workspace/wsremote"
hrconstants "namespacelabs.dev/foundation/internal/hotreload/constants"
"namespacelabs.dev/foundation/internal/integrations"
"namespacelabs.dev/foundation/internal/planning"
"namespacelabs.dev/foundation/internal/runtime"
"namespacelabs.dev/foundation/internal/wscontents"
"namespacelabs.dev/foundation/schema"
"namespacelabs.dev/foundation/std/tasks"
)
type FileSyncDevObserver struct {
log io.Writer
server runtime.Deployable
cluster runtime.ClusterNamespace
fileSyncPort int32
mu sync.Mutex
conn *grpc.ClientConn
}
func ConfigureFileSyncDevObserver(ctx context.Context, cluster runtime.ClusterNamespace, srv planning.Server) (context.Context, integrations.DevObserver, error) {
if wsremote.Ctx(ctx) != nil {
return nil, nil, fnerrors.NewWithLocation(srv.Location, "`%s dev` on multiple web/nodejs servers not supported", name.CmdName)
}
devObserver := newFileSyncDevObserver(ctx, cluster, srv, hrconstants.FileSyncPort)
newCtx, _ := wsremote.BufferAndSinkTo(ctx, devObserver.Deposit)
return newCtx, devObserver, nil
}
func newFileSyncDevObserver(ctx context.Context, cluster runtime.ClusterNamespace, srv planning.Server, fileSyncPort int32) *FileSyncDevObserver {
return &FileSyncDevObserver{
log: console.TypedOutput(ctx, "hot reload", console.CatOutputUs),
server: srv.Proto(),
cluster: cluster,
fileSyncPort: fileSyncPort,
}
}
func (do *FileSyncDevObserver) Close() error {
do.mu.Lock()
defer do.mu.Unlock()
return do.cleanup()
}
func (do *FileSyncDevObserver) cleanup() error {
var errs []error
if do.conn != nil {
if err := do.conn.Close(); err != nil {
errs = append(errs, err)
}
do.conn = nil
}
return multierr.New(errs...)
}
func (do *FileSyncDevObserver) OnDeployment(ctx context.Context) {
do.mu.Lock()
defer do.mu.Unlock()
err := do.cleanup()
if err != nil {
fmt.Fprintln(do.log, "failed to port forwarding cleanup", err)
}
orch := compute.On(ctx)
sink := tasks.SinkFrom(ctx)
// A background context is used here as the connection we create will be
// long-lived. The parent orchestrator and sink are then patched in when an
// actual connection attempt is made.
ctxWithTimeout, done := context.WithTimeout(context.Background(), 15*time.Second)
defer done()
t := time.Now()
conn, err := grpc.DialContext(ctxWithTimeout, "filesync-"+do.server.GetName(),
grpc.WithBlock(),
grpc.WithTransportCredentials(insecure.NewCredentials()),
grpc.WithContextDialer(func(ctx context.Context, _ string) (net.Conn, error) {
patchedContext := compute.AttachOrch(tasks.WithSink(ctx, sink), orch)
return do.cluster.DialServer(patchedContext, do.server, &schema.Endpoint_Port{
ContainerPort: do.fileSyncPort,
})
}),
)
if err != nil {
fmt.Fprintln(do.log, "failed to connect to filesync", err)
return
}
do.conn = conn
fmt.Fprintf(do.log, " Connected to FileSync (for hot reload), took %v.\n\n", time.Since(t))
}
func (do *FileSyncDevObserver) Deposit(ctx context.Context, s *wsremote.Signature, events []*wscontents.FileEvent) (bool, error) {
do.mu.Lock()
defer do.mu.Unlock()
if do.conn == nil {
return false, nil
}
var labels []string
for _, ev := range events {
labels = append(labels, fmt.Sprintf("%s %s", ev.Event, ev.Path))
}
fmt.Fprintf(do.log, "FileSync event: %s\n", strings.Join(labels, ", "))
newCtx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()
if _, err := wsremote.NewFileSyncServiceClient(do.conn).Push(newCtx, &wsremote.PushRequest{
Signature: s,
FileEvent: events,
}); err != nil {
return false, err
}
return true, nil
}