From 6f010b07dd638ecc9b65e669c0c68f1dc4bbbd2d Mon Sep 17 00:00:00 2001 From: Norio Nomura Date: Thu, 16 Oct 2025 09:37:10 +0900 Subject: [PATCH 1/2] guestagent: Avoid restarting in the boot script if the executable and config have no changes. Signed-off-by: Norio Nomura --- cmd/lima-guestagent/install_systemd_linux.go | 43 +++++++++--- .../boot/25-guestagent-base.sh | 70 ++++++++++++------- 2 files changed, 79 insertions(+), 34 deletions(-) diff --git a/cmd/lima-guestagent/install_systemd_linux.go b/cmd/lima-guestagent/install_systemd_linux.go index 91ff53b19b5..0e3efdd9e9e 100644 --- a/cmd/lima-guestagent/install_systemd_linux.go +++ b/cmd/lima-guestagent/install_systemd_linux.go @@ -4,12 +4,14 @@ package main import ( + "bytes" _ "embed" "errors" "fmt" "os" "os/exec" "path/filepath" + "slices" "strings" "github.com/sirupsen/logrus" @@ -24,6 +26,7 @@ func newInstallSystemdCommand() *cobra.Command { Short: "Install a systemd unit (user)", RunE: installSystemdAction, } + installSystemdCommand.Flags().Bool("guestagent-updated", false, "Indicate that the guest agent has been updated") installSystemdCommand.Flags().Int("vsock-port", 0, "Use vsock server on specified port") installSystemdCommand.Flags().String("virtio-port", "", "Use virtio server instead a UNIX socket") return installSystemdCommand @@ -31,6 +34,10 @@ func newInstallSystemdCommand() *cobra.Command { func installSystemdAction(cmd *cobra.Command, _ []string) error { ctx := cmd.Context() + guestAgentUpdated, err := cmd.Flags().GetBool("guestagent-updated") + if err != nil { + return err + } vsockPort, err := cmd.Flags().GetInt("vsock-port") if err != nil { return err @@ -48,24 +55,42 @@ func installSystemdAction(cmd *cobra.Command, _ []string) error { return err } unitPath := "/etc/systemd/system/lima-guestagent.service" + unitFileChanged := true if _, err := os.Stat(unitPath); !errors.Is(err, os.ErrNotExist) { - logrus.Infof("File %q already exists, overwriting", unitPath) + if existingUnit, err := os.ReadFile(unitPath); err == nil && bytes.Equal(unit, existingUnit) { + logrus.Infof("File %q is up-to-date", unitPath) + unitFileChanged = false + } else { + logrus.Infof("File %q needs update", unitPath) + } } else { unitDir := filepath.Dir(unitPath) if err := os.MkdirAll(unitDir, 0o755); err != nil { return err } } - if err := os.WriteFile(unitPath, unit, 0o644); err != nil { - return err + if unitFileChanged { + if err := os.WriteFile(unitPath, unit, 0o644); err != nil { + return err + } + logrus.Infof("Written file %q", unitPath) + } else if !guestAgentUpdated { + logrus.Info("lima-guestagent.service already up-to-date") + return nil } - logrus.Infof("Written file %q", unitPath) - args := [][]string{ - {"daemon-reload"}, - {"enable", "lima-guestagent.service"}, - {"start", "lima-guestagent.service"}, - {"try-restart", "lima-guestagent.service"}, + // unitFileChanged || guestAgentUpdated + args := make([][]string, 0, 4) + if unitFileChanged { + args = append(args, []string{"daemon-reload"}) } + args = slices.Concat( + args, + [][]string{ + {"enable", "lima-guestagent.service"}, + {"try-restart", "lima-guestagent.service"}, // try-restart: restart if running, otherwise do nothing + {"start", "lima-guestagent.service"}, // start: start if not running, otherwise do nothing + }, + ) for _, args := range args { cmd := exec.CommandContext(ctx, "systemctl", append([]string{"--system"}, args...)...) cmd.Stdout = os.Stdout diff --git a/pkg/cidata/cidata.TEMPLATE.d/boot/25-guestagent-base.sh b/pkg/cidata/cidata.TEMPLATE.d/boot/25-guestagent-base.sh index 9a458deedf0..ed3fc08539a 100644 --- a/pkg/cidata/cidata.TEMPLATE.d/boot/25-guestagent-base.sh +++ b/pkg/cidata/cidata.TEMPLATE.d/boot/25-guestagent-base.sh @@ -1,4 +1,4 @@ -#!/bin/sh +#!/bin/bash # SPDX-FileCopyrightText: Copyright The Lima Authors # SPDX-License-Identifier: Apache-2.0 @@ -19,46 +19,66 @@ fi # Install or update the guestagent binary mkdir -p "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}"/bin -install -m 755 "${LIMA_CIDATA_MNT}"/lima-guestagent "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}"/bin/lima-guestagent +guestagent_updated=false +if diff -q "${LIMA_CIDATA_MNT}"/lima-guestagent "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}"/bin/lima-guestagent 2>/dev/null; then + echo "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}/bin/lima-guestagent is up-to-date" +else + install -m 755 "${LIMA_CIDATA_MNT}"/lima-guestagent "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}"/bin/lima-guestagent + guestagent_updated=true +fi # Launch the guestagent service if [ -f /sbin/openrc-run ]; then - # Convert .env to conf.d by wrapping values in double quotes. - # Split the variable and value at the first "=" to handle cases where the value contains additional "=" characters. - sed -E 's/^([^=]+)=(.*)/\1="\2"/' "${LIMA_CIDATA_MNT}/lima.env" >"/etc/conf.d/lima-guestagent" - # Install the openrc lima-guestagent service script - cat >/etc/init.d/lima-guestagent <<'EOF' -#!/sbin/openrc-run -supervisor=supervise-daemon + print_config() { + # Convert .env to conf.d by wrapping values in double quotes. + # Split the variable and value at the first "=" to handle cases where the value contains additional "=" characters. + sed -E 's/^([^=]+)=(.*)/\1="\2"/' "${LIMA_CIDATA_MNT}/lima.env" + } + print_script() { + # the openrc lima-guestagent service script + cat <<-'EOF' + #!/sbin/openrc-run + supervisor=supervise-daemon -log_file="${log_file:-/var/log/${RC_SVCNAME}.log}" -err_file="${err_file:-${log_file}}" -log_mode="${log_mode:-0644}" -log_owner="${log_owner:-root:root}" + log_file="${log_file:-/var/log/${RC_SVCNAME}.log}" + err_file="${err_file:-${log_file}}" + log_mode="${log_mode:-0644}" + log_owner="${log_owner:-root:root}" -supervise_daemon_args="${supervise_daemon_opts:---stderr \"${err_file}\" --stdout \"${log_file}\"}" + supervise_daemon_args="${supervise_daemon_opts:---stderr \"${err_file}\" --stdout \"${log_file}\"}" -name="lima-guestagent" -description="Forward ports to the lima-hostagent" + name="lima-guestagent" + description="Forward ports to the lima-hostagent" + + command=${LIMA_CIDATA_GUEST_INSTALL_PREFIX}/bin/lima-guestagent + command_args="daemon --debug=${LIMA_CIDATA_DEBUG} --vsock-port \"${LIMA_CIDATA_VSOCK_PORT}\" --virtio-port \"${LIMA_CIDATA_VIRTIO_PORT}\"" + command_background=true + pidfile="/run/lima-guestagent.pid" + EOF + } + if [ "${guestagent_updated}" = "false" ] && + diff -q <(print_config) /etc/conf.d/lima-guestagent 2>/dev/null && + diff -q <(print_script) /etc/init.d/lima-guestagent 2>/dev/null; then + echo "lima-guestagent service already up-to-date" + exit 0 + fi -command=${LIMA_CIDATA_GUEST_INSTALL_PREFIX}/bin/lima-guestagent -command_args="daemon --debug=${LIMA_CIDATA_DEBUG} --vsock-port \"${LIMA_CIDATA_VSOCK_PORT}\" --virtio-port \"${LIMA_CIDATA_VIRTIO_PORT}\"" -command_background=true -pidfile="/run/lima-guestagent.pid" -EOF + print_config >/etc/conf.d/lima-guestagent + print_script >/etc/init.d/lima-guestagent chmod 755 /etc/init.d/lima-guestagent rc-update add lima-guestagent default - rc-service lima-guestagent start + rc-service --ifstarted lima-guestagent restart # restart if running, otherwise do nothing + rc-service --ifstopped lima-guestagent start # start if not running, otherwise do nothing else # Remove legacy systemd service rm -f "${LIMA_CIDATA_HOME}/.config/systemd/user/lima-guestagent.service" if [ "${LIMA_CIDATA_VSOCK_PORT}" != "0" ]; then - sudo "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}"/bin/lima-guestagent install-systemd --debug="${LIMA_CIDATA_DEBUG}" --vsock-port "${LIMA_CIDATA_VSOCK_PORT}" + sudo "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}"/bin/lima-guestagent install-systemd --debug="${LIMA_CIDATA_DEBUG}" --guestagent-updated="${guestagent_updated}" --vsock-port "${LIMA_CIDATA_VSOCK_PORT}" elif [ "${LIMA_CIDATA_VIRTIO_PORT}" != "" ]; then - sudo "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}"/bin/lima-guestagent install-systemd --debug="${LIMA_CIDATA_DEBUG}" --virtio-port "${LIMA_CIDATA_VIRTIO_PORT}" + sudo "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}"/bin/lima-guestagent install-systemd --debug="${LIMA_CIDATA_DEBUG}" --guestagent-updated="${guestagent_updated}" --virtio-port "${LIMA_CIDATA_VIRTIO_PORT}" else - sudo "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}"/bin/lima-guestagent install-systemd --debug="${LIMA_CIDATA_DEBUG}" + sudo "${LIMA_CIDATA_GUEST_INSTALL_PREFIX}"/bin/lima-guestagent install-systemd --debug="${LIMA_CIDATA_DEBUG}" --guestagent-updated="${guestagent_updated}" fi fi From 72c7a8413d5b497be03e72dfe54a05464ca1ba15 Mon Sep 17 00:00:00 2001 From: Norio Nomura Date: Fri, 17 Oct 2025 09:51:59 +0900 Subject: [PATCH 2/2] `lima-guestagent`: Handle SIGTERM, support save/restore `eventState` on stop/start `eventState` is saved at `/run/lima-guestagent/event-state.json`. The saved `eventState` is expected to be removed on OS restart. Signed-off-by: Norio Nomura # Conflicts: # pkg/guestagent/api/server/server.go --- cmd/lima-guestagent/daemon_linux.go | 22 ++++- .../lima-guestagent.TEMPLATE.service | 2 +- cmd/lima-guestagent/main_linux.go | 2 + pkg/guestagent/api/server/server.go | 18 +++- pkg/guestagent/guestagent_linux.go | 84 +++++++++++++++++-- pkg/guestagent/ticker/compound.go | 7 +- pkg/guestagent/ticker/ebpf_linux.go | 23 +++-- pkg/guestagent/ticker/simple.go | 43 +++++++++- pkg/guestagent/ticker/ticker.go | 1 + 9 files changed, 176 insertions(+), 26 deletions(-) diff --git a/cmd/lima-guestagent/daemon_linux.go b/cmd/lima-guestagent/daemon_linux.go index 8c80d96d566..83509e9c538 100644 --- a/cmd/lima-guestagent/daemon_linux.go +++ b/cmd/lima-guestagent/daemon_linux.go @@ -7,6 +7,8 @@ import ( "errors" "net" "os" + "os/signal" + "syscall" "time" "github.com/mdlayher/vsock" @@ -26,6 +28,7 @@ func newDaemonCommand() *cobra.Command { Short: "Run the daemon", RunE: daemonAction, } + daemonCommand.Flags().String("runtime-dir", "/run/lima-guestagent", "Directory to store runtime state") daemonCommand.Flags().Duration("tick", 3*time.Second, "Tick for polling events") daemonCommand.Flags().Int("vsock-port", 0, "Use vsock server instead a UNIX socket") daemonCommand.Flags().String("virtio-port", "", "Use virtio server instead a UNIX socket") @@ -34,6 +37,13 @@ func newDaemonCommand() *cobra.Command { func daemonAction(cmd *cobra.Command, _ []string) error { ctx := cmd.Context() + runtimeDir, err := cmd.Flags().GetString("runtime-dir") + if err != nil { + return err + } + if err := os.MkdirAll(runtimeDir, 0o755); err != nil { + return err + } socket := "/run/lima-guestagent.sock" tick, err := cmd.Flags().GetDuration("tick") if err != nil { @@ -66,11 +76,16 @@ func daemonAction(cmd *cobra.Command, _ []string) error { tickerInst = ticker.NewCompoundTicker(simpleTicker, ebpfTicker) } - agent, err := guestagent.New(ctx, tickerInst) + ctx, stop := signal.NotifyContext(ctx, syscall.SIGTERM) + defer stop() + go func() { + <-ctx.Done() + logrus.Debug("Received SIGTERM, shutting down the guest agent") + }() + agent, err := guestagent.New(ctx, tickerInst, runtimeDir) if err != nil { return err } - defer agent.Close() err = os.RemoveAll(socket) if err != nil { @@ -104,5 +119,6 @@ func daemonAction(cmd *cobra.Command, _ []string) error { l = socketL logrus.Infof("serving the guest agent on %q", socket) } - return server.StartServer(l, &server.GuestServer{Agent: agent, TunnelS: portfwdserver.NewTunnelServer()}) + defer logrus.Debug("exiting lima-guestagent daemon") + return server.StartServer(ctx, l, &server.GuestServer{Agent: agent, TunnelS: portfwdserver.NewTunnelServer()}) } diff --git a/cmd/lima-guestagent/lima-guestagent.TEMPLATE.service b/cmd/lima-guestagent/lima-guestagent.TEMPLATE.service index 27741f0bfed..190613a2929 100644 --- a/cmd/lima-guestagent/lima-guestagent.TEMPLATE.service +++ b/cmd/lima-guestagent/lima-guestagent.TEMPLATE.service @@ -2,7 +2,7 @@ Description=lima-guestagent [Service] -ExecStart={{.Binary}} daemon {{.Args}} +ExecStart={{.Binary}} daemon {{.Args}} --runtime-dir="%t/%N" Type=simple Restart=on-failure OOMPolicy=continue diff --git a/cmd/lima-guestagent/main_linux.go b/cmd/lima-guestagent/main_linux.go index 1e203c4ce0b..2e001e9eecb 100644 --- a/cmd/lima-guestagent/main_linux.go +++ b/cmd/lima-guestagent/main_linux.go @@ -11,12 +11,14 @@ import ( "github.com/lima-vm/lima/v2/cmd/yq" "github.com/lima-vm/lima/v2/pkg/debugutil" + "github.com/lima-vm/lima/v2/pkg/osutil" "github.com/lima-vm/lima/v2/pkg/version" ) func main() { yq.MaybeRunYQ() if err := newApp().Execute(); err != nil { + osutil.HandleExitError(err) logrus.Fatal(err) } } diff --git a/pkg/guestagent/api/server/server.go b/pkg/guestagent/api/server/server.go index ad5124bbab8..021c79f2be6 100644 --- a/pkg/guestagent/api/server/server.go +++ b/pkg/guestagent/api/server/server.go @@ -7,6 +7,7 @@ import ( "context" "net" + "github.com/sirupsen/logrus" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" "google.golang.org/protobuf/types/known/emptypb" @@ -16,7 +17,7 @@ import ( "github.com/lima-vm/lima/v2/pkg/portfwdserver" ) -func StartServer(lis net.Listener, guest *GuestServer) error { +func StartServer(ctx context.Context, lis net.Listener, guest *GuestServer) error { server := grpc.NewServer( grpc.InitialWindowSize(512<<20), grpc.InitialConnWindowSize(512<<20), @@ -26,7 +27,19 @@ func StartServer(lis net.Listener, guest *GuestServer) error { grpc.KeepaliveParams(keepalive.ServerParameters{Time: 0, Timeout: 0, MaxConnectionIdle: 0}), ) api.RegisterGuestServiceServer(server, guest) - return server.Serve(lis) + go func() { + <-ctx.Done() + logrus.Debug("Stopping the gRPC server") + server.GracefulStop() + logrus.Debug("Closing the listener used by the gRPC server") + lis.Close() + }() + err := server.Serve(lis) + // grpc.Server.Serve() expects to return a non-nil error caused by lis.Accept() + if opErr, ok := err.(*net.OpError); ok && opErr.Err.Error() == "use of closed network connection" { + return nil + } + return err } type GuestServer struct { @@ -41,6 +54,7 @@ func (s *GuestServer) GetInfo(ctx context.Context, _ *emptypb.Empty) (*api.Info, func (s *GuestServer) GetEvents(_ *emptypb.Empty, stream api.GuestService_GetEventsServer) error { responses := make(chan *api.Event) + // expects Events() to close the channel when stream.Context() is done or ticker stops go s.Agent.Events(stream.Context(), responses) for response := range responses { err := stream.Send(response) diff --git a/pkg/guestagent/guestagent_linux.go b/pkg/guestagent/guestagent_linux.go index 362e7b9b83e..7be81cf6ca8 100644 --- a/pkg/guestagent/guestagent_linux.go +++ b/pkg/guestagent/guestagent_linux.go @@ -5,7 +5,10 @@ package guestagent import ( "context" + "encoding/json" + "errors" "os" + "path/filepath" "reflect" "time" @@ -19,7 +22,7 @@ import ( "github.com/lima-vm/lima/v2/pkg/guestagent/timesync" ) -func New(ctx context.Context, ticker ticker.Ticker) (Agent, error) { +func New(ctx context.Context, ticker ticker.Ticker, runtimeDir string) (Agent, error) { socketsLister, err := sockets.NewLister() if err != nil { return nil, err @@ -28,14 +31,25 @@ func New(ctx context.Context, ticker ticker.Ticker) (Agent, error) { ticker: ticker, socketLister: socketsLister, kubernetesServiceWatcher: kubernetesservice.NewServiceWatcher(), + runtimeDir: runtimeDir, } go a.kubernetesServiceWatcher.Start(ctx) - go a.fixSystemTimeSkew() + go a.fixSystemTimeSkew(ctx) + + go func() { + <-ctx.Done() + logrus.Debug("Closing the agent") + if err := a.Close(); err != nil { + logrus.Errorf("error on agent.Close(): %v", err) + } + }() return a, nil } +var _ Agent = (*agent)(nil) + type agent struct { // Ticker is like time.Ticker. // We can't use inotify for /proc/net/tcp, so we need this ticker to @@ -43,10 +57,11 @@ type agent struct { ticker ticker.Ticker socketLister *sockets.Lister kubernetesServiceWatcher *kubernetesservice.ServiceWatcher + runtimeDir string } type eventState struct { - ports []*api.IPPort + Ports []*api.IPPort `json:"ports,omitempty"` } func comparePorts(old, neww []*api.IPPort) (added, removed []*api.IPPort) { @@ -82,13 +97,13 @@ func (a *agent) collectEvent(ctx context.Context, st eventState) (*api.Event, ev err error ) newSt := st - newSt.ports, err = a.LocalPorts(ctx) + newSt.Ports, err = a.LocalPorts(ctx) if err != nil { ev.Errors = append(ev.Errors, err.Error()) ev.Time = timestamppb.Now() return ev, newSt } - ev.AddedLocalPorts, ev.RemovedLocalPorts = comparePorts(st.ports, newSt.ports) + ev.AddedLocalPorts, ev.RemovedLocalPorts = comparePorts(st.Ports, newSt.Ports) ev.Time = timestamppb.Now() return ev, newSt } @@ -102,8 +117,16 @@ func isEventEmpty(ev *api.Event) bool { func (a *agent) Events(ctx context.Context, ch chan *api.Event) { defer close(ch) tickerCh := a.ticker.Chan() - defer a.ticker.Stop() - var st eventState + + st, err := a.LoadEventState() + if err != nil { + logrus.Errorf("failed to load state: %v", err) + } + defer func() { + if err := a.SaveEventState(st); err != nil { + logrus.Errorf("failed to save state: %v", err) + } + }() for { var ev *api.Event ev, st = a.collectEvent(ctx, st) @@ -115,6 +138,7 @@ func (a *agent) Events(ctx context.Context, ch chan *api.Event) { return case _, ok := <-tickerCh: if !ok { + logrus.Debug("ticker channel closed") return } logrus.Debug("tick!") @@ -190,7 +214,7 @@ func (a *agent) Info(ctx context.Context) (*api.Info, error) { const deltaLimit = 2 * time.Second -func (a *agent) fixSystemTimeSkew() { +func (a *agent) fixSystemTimeSkew(ctx context.Context) { logrus.Info("fixSystemTimeSkew(): monitoring system time skew") for { ok, err := timesync.HasRTC() @@ -217,6 +241,13 @@ func (a *agent) fixSystemTimeSkew() { logrus.Infof("fixSystemTimeSkew: system time synchronized with rtc") break } + select { + case <-ctx.Done(): + logrus.Debug("fixSystemTimeSkew: context done, exiting") + ticker.Stop() + return + default: + } } ticker.Stop() } @@ -239,5 +270,42 @@ func (a *agent) Close() error { return err } } + a.ticker.Stop() return nil } + +const eventStateFileName = "event-state.json" + +// LoadEventState loads the event state from a file in JSON format. +// If the file does not exist, it returns an empty eventState with no error. +// The saved eventState is expected to be removed on OS restart. +func (a *agent) LoadEventState() (eventState, error) { + logrus.Debug("Loading event state") + path := filepath.Join(a.runtimeDir, eventStateFileName) + data, err := os.ReadFile(path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return eventState{}, nil + } + return eventState{}, err + } + var st eventState + if err := json.Unmarshal(data, &st); err != nil { + return eventState{}, err + } + // We don't remove the file after loading for debugging purposes. + return st, nil +} + +// SaveEventState saves the event state to a file in JSON format. +// It overwrites the file if it already exists. +// The saved eventState is expected to be removed on OS restart. +func (a *agent) SaveEventState(st eventState) error { + logrus.Debug("Saving event state") + data, err := json.Marshal(st) + if err != nil { + return err + } + path := filepath.Join(a.runtimeDir, eventStateFileName) + return os.WriteFile(path, data, 0o644) +} diff --git a/pkg/guestagent/ticker/compound.go b/pkg/guestagent/ticker/compound.go index 5373a56216c..0a59cf74c37 100644 --- a/pkg/guestagent/ticker/compound.go +++ b/pkg/guestagent/ticker/compound.go @@ -5,6 +5,8 @@ package ticker import ( "time" + + "github.com/sirupsen/logrus" ) func NewCompoundTicker(t1, t2 Ticker) Ticker { @@ -15,10 +17,13 @@ type compoundTicker struct { t1, t2 Ticker } +var _ Ticker = (*compoundTicker)(nil) + func (ticker *compoundTicker) Chan() <-chan time.Time { ch := make(chan time.Time) go func() { - defer ticker.Stop() + defer close(ch) + defer logrus.Debug("compoundTicker: exiting") for { select { case v, ok := <-ticker.t1.Chan(): diff --git a/pkg/guestagent/ticker/ebpf_linux.go b/pkg/guestagent/ticker/ebpf_linux.go index b0942729ecd..8d72bb46cfb 100644 --- a/pkg/guestagent/ticker/ebpf_linux.go +++ b/pkg/guestagent/ticker/ebpf_linux.go @@ -4,6 +4,7 @@ package ticker import ( + "errors" "os" "strings" "time" @@ -20,19 +21,22 @@ func NewEbpfTicker(tracepoints []string) (Ticker, error) { ticker ebpfTicker err error ) + defer func() { + if err != nil { + ticker.Stop() + } + }() ticker.events, err = ebpf.NewMap(&ebpf.MapSpec{ Name: "lima_ticker_events", Type: ebpf.RingBuf, MaxEntries: 1 << 20, }) if err != nil { - ticker.Stop() return nil, err } ticker.prog, err = buildEbpfProg(ticker.events) if err != nil { - ticker.Stop() return nil, err } @@ -40,7 +44,6 @@ func NewEbpfTicker(tracepoints []string) (Ticker, error) { tpPair := strings.SplitN(tp, ":", 2) tpLink, err := link.Tracepoint(tpPair[0], tpPair[1], ticker.prog, nil) if err != nil { - ticker.Stop() return nil, err } ticker.links = append(ticker.links, tpLink) @@ -48,17 +51,19 @@ func NewEbpfTicker(tracepoints []string) (Ticker, error) { ticker.reader, err = ringbuf.NewReader(ticker.events) if err != nil { - ticker.Stop() return nil, err } ticker.ch = make(chan time.Time) go func() { + defer close(ticker.ch) for { _, rdErr := ticker.reader.Read() if rdErr != nil { - logrus.WithError(rdErr).Warn("ebpfTicker: failed to read ringbuf") - ticker.Stop() + if !errors.Is(rdErr, ringbuf.ErrClosed) { + logrus.WithError(rdErr).Warn("ebpfTicker: failed to read ringbuf") + } + logrus.Debug("ebpfTicker: exiting") return } ticker.ch <- time.Now() @@ -68,6 +73,8 @@ func NewEbpfTicker(tracepoints []string) (Ticker, error) { return &ticker, nil } +var _ Ticker = (*ebpfTicker)(nil) + type ebpfTicker struct { events *ebpf.Map prog *ebpf.Program @@ -93,9 +100,7 @@ func (ticker *ebpfTicker) Stop() { if ticker.reader != nil { _ = ticker.reader.Close() } - if ticker.ch != nil { - close(ticker.ch) - } + // ticker.ch will be closed in go routine in NewEbpfTicker() to avoid sending on closed channel } func buildEbpfProg(events *ebpf.Map) (*ebpf.Program, error) { diff --git a/pkg/guestagent/ticker/simple.go b/pkg/guestagent/ticker/simple.go index c9b3446e3f0..0f44fc99b07 100644 --- a/pkg/guestagent/ticker/simple.go +++ b/pkg/guestagent/ticker/simple.go @@ -4,17 +4,56 @@ package ticker import ( + "sync" "time" + + "github.com/sirupsen/logrus" ) func NewSimpleTicker(ticker *time.Ticker) Ticker { - return &simpleTicker{Ticker: ticker} + return &simpleTicker{ + Ticker: ticker, + closableCh: make(chan any), + exposeCh: make(chan time.Time), + } } +var _ Ticker = (*simpleTicker)(nil) + type simpleTicker struct { *time.Ticker + closableCh chan any + exposeCh chan time.Time + once sync.Once } func (ticker *simpleTicker) Chan() <-chan time.Time { - return ticker.Ticker.C + // We cannot directly expose ticker.Ticker.C because it won't be closed on Stop() + ticker.once.Do(func() { + go func() { + defer close(ticker.exposeCh) + for { + select { + case v, ok := <-ticker.Ticker.C: + if !ok { + // should not happen as time.Ticker.C is never closed as per docs + return + } + ticker.exposeCh <- v + case <-ticker.closableCh: + logrus.Debug("simpleTicker: exiting") + return + } + } + }() + logrus.Debug("simpleTicker: starting") + }) + return ticker.exposeCh +} + +func (ticker *simpleTicker) Stop() { + ticker.Ticker.Stop() + // Since ticker.Ticker.Stop() does not close ticker.Ticker.C, + // we need to close the goroutine created in Chan(). + close(ticker.closableCh) } diff --git a/pkg/guestagent/ticker/ticker.go b/pkg/guestagent/ticker/ticker.go index edfa5012473..fceb2751d26 100644 --- a/pkg/guestagent/ticker/ticker.go +++ b/pkg/guestagent/ticker/ticker.go @@ -8,6 +8,7 @@ import ( ) type Ticker interface { + // similar to time.Ticker.C, but must be closed when Stop() is called Chan() <-chan time.Time Stop() }