Skip to content

Commit

Permalink
refactor: replace klog with slog (#37)
Browse files Browse the repository at this point in the history
* refactor: eliminate klog usage in server

Signed-off-by: Jian Zeng <anonymousknight96@gmail.com>

* feat: add slog util

Signed-off-by: Jian Zeng <anonymousknight96@gmail.com>

* refactor: replace klog with slog

Signed-off-by: Jian Zeng <anonymousknight96@gmail.com>

* feat: specify verbosity

Signed-off-by: Jian Zeng <anonymousknight96@gmail.com>

* chore: update config files

Signed-off-by: Jian Zeng <anonymousknight96@gmail.com>

* chore: minor change

Signed-off-by: Jian Zeng <anonymousknight96@gmail.com>

---------

Signed-off-by: Jian Zeng <anonymousknight96@gmail.com>
  • Loading branch information
knight42 committed Apr 2, 2024
1 parent c7e621c commit cad54d6
Show file tree
Hide file tree
Showing 14 changed files with 173 additions and 142 deletions.
7 changes: 1 addition & 6 deletions .golangci.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,8 @@ run:
modules-download-mode: readonly
linters:
enable:
- exhaustive
- exportloopref
- dupl
- exportloopref
- gochecknoinits
- goconst
- gocritic
Expand All @@ -22,10 +21,6 @@ linters:
- usestdlibvars
- whitespace
linters-settings:
exhaustive:
# presence of "default" case in switch statements satisfies exhaustiveness,
# even if all enum members are not listed
default-signifies-exhaustive: true
issues:
exclude-rules:
- path: _test.go
Expand Down
59 changes: 27 additions & 32 deletions cmd/client/forwarder.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,16 @@ package main

import (
"fmt"
"log/slog"
"net"
"strconv"

"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/klog/v2"

"github.com/knight42/krelay/pkg/constants"
"github.com/knight42/krelay/pkg/ports"
"github.com/knight42/krelay/pkg/remoteaddr"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xnet"
)

Expand Down Expand Up @@ -53,14 +54,16 @@ func (p *portForwarder) listen(localIP string) error {
func (p *portForwarder) run(streamConn httpstream.Connection) {
switch {
case p.tcpListener != nil:
l := p.tcpListener
defer l.Close()

localAddr := l.Addr().String()
klog.InfoS("Forwarding",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
constants.LogFieldRemotePort, p.ports.RemotePort,
lis := p.tcpListener
defer lis.Close()

localAddr := lis.Addr().String()
l := slog.With(
slog.String(constants.LogFieldProtocol, p.ports.Protocol),
slog.String(constants.LogFieldLocalAddr, localAddr),
)
l.Info("Forwarding",
slogutil.Uint16(constants.LogFieldRemotePort, p.ports.RemotePort),
)

for {
Expand All @@ -70,21 +73,15 @@ func (p *portForwarder) run(streamConn httpstream.Connection) {
default:
}

c, err := l.Accept()
c, err := lis.Accept()
if err != nil {
klog.ErrorS(err, "Fail to accept tcp connection",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
)
l.Error("Fail to accept tcp connection", slogutil.Error(err))
return
}

remoteAddr, err := p.addrGetter.Get()
if err != nil {
klog.ErrorS(err, "Fail to get remote address",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
)
l.Error("Fail to get remote address", slogutil.Error(err))
continue
}
go handleTCPConn(c, streamConn, remoteAddr, p.ports.RemotePort)
Expand All @@ -96,21 +93,21 @@ func (p *portForwarder) run(streamConn httpstream.Connection) {

udpConn := &xnet.UDPConn{UDPConn: pc.(*net.UDPConn)}
localAddr := pc.LocalAddr().String()
klog.InfoS("Forwarding",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
constants.LogFieldRemotePort, p.ports.RemotePort,
l := slog.With(
slog.String(constants.LogFieldProtocol, p.ports.Protocol),
slog.String(constants.LogFieldLocalAddr, localAddr),
)
l.Info("Forwarding",
slogutil.Uint16(constants.LogFieldRemotePort, p.ports.RemotePort),
)
track := newConnTrack()
finish := make(chan string)

go func() {
for key := range finish {
track.Delete(key)
klog.V(4).InfoS("Remove udp conn from conntrack table",
"key", key,
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
l.Debug("Remove udp conn from conntrack table",
slog.String("key", key),
)
}
}()
Expand All @@ -128,9 +125,8 @@ func (p *portForwarder) run(streamConn httpstream.Connection) {

n, cliAddr, err := udpConn.ReadFrom(buf)
if err != nil {
klog.ErrorS(err, "Fail to read udp packet",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
l.Error("Fail to read udp packet",
slogutil.Error(err),
)
return
}
Expand All @@ -146,9 +142,8 @@ func (p *portForwarder) run(streamConn httpstream.Connection) {
track.Set(key, dataCh)
remoteAddr, err := p.addrGetter.Get()
if err != nil {
klog.ErrorS(err, "Fail to get remote address",
constants.LogFieldProtocol, p.ports.Protocol,
constants.LogFieldLocalAddr, localAddr,
l.Error("Fail to get remote address",
slogutil.Error(err),
)
continue
}
Expand Down
50 changes: 20 additions & 30 deletions cmd/client/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,8 @@ import (
"context"
"encoding/json"
"errors"
"flag"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
Expand All @@ -22,11 +22,11 @@ import (
_ "k8s.io/client-go/plugin/pkg/client/auth"
"k8s.io/client-go/rest"
"k8s.io/client-go/transport/spdy"
"k8s.io/klog/v2"

"github.com/knight42/krelay/pkg/constants"
"github.com/knight42/krelay/pkg/ports"
"github.com/knight42/krelay/pkg/remoteaddr"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xnet"
)

Expand All @@ -41,6 +41,8 @@ type Options struct {
address string
// targetsFile is the file containing the list of targets.
targetsFile string

verbosity int
}

// setKubernetesDefaults sets default values on the provided client config for accessing the Kubernetes API.
Expand Down Expand Up @@ -153,7 +155,7 @@ func (o *Options) Run(ctx context.Context, args []string) error {
}
}

klog.InfoS("Creating krelay-server", "namespace", o.serverNamespace)
slog.Info("Creating krelay-server", slog.String("namespace", o.serverNamespace))
svrPodName, err := createServerPod(ctx, cs, o.serverImage, o.serverNamespace)
if err != nil {
return fmt.Errorf("create krelay-server pod: %w", err)
Expand All @@ -164,7 +166,7 @@ func (o *Options) Run(ctx context.Context, args []string) error {
if err != nil {
return fmt.Errorf("ensure krelay-server is running: %w", err)
}
klog.InfoS("krelay-server is running", "pod", svrPodName, "namespace", o.serverNamespace)
slog.Info("krelay-server is running", slog.String("pod", svrPodName), slog.String("namespace", o.serverNamespace))

transport, upgrader, err := spdy.RoundTripperFor(restCfg)
if err != nil {
Expand All @@ -191,7 +193,7 @@ func (o *Options) Run(ctx context.Context, args []string) error {
for _, pf := range portForwarders {
err := pf.listen(o.address)
if err != nil {
klog.ErrorS(err, "Fail to listen on port", "port", pf.ports.LocalPort)
slog.Error("Fail to listen on port", slog.Any("port", pf.ports.LocalPort), slog.Any("error", err))
} else {
succeeded = true
}
Expand All @@ -204,15 +206,14 @@ func (o *Options) Run(ctx context.Context, args []string) error {

select {
case <-streamConn.CloseChan():
klog.InfoS("Lost connection to krelay-server pod")
slog.Info("Lost connection to krelay-server pod")
case <-ctx.Done():
}

return nil
}

func main() {
klog.InitFlags(nil)
cf := genericclioptions.NewConfigFlags(true)
o := Options{
getter: cf,
Expand All @@ -237,40 +238,29 @@ service, ip and hostname rather than only pods.`,
})
}

h := slog.NewTextHandler(cmd.ErrOrStderr(), &slog.HandlerOptions{
Level: slogutil.MapVerbosityToLogLevel(o.verbosity),
})
slog.SetDefault(slog.New(h))
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
return o.Run(ctx, args)
},
SilenceUsage: true,
}
flags := c.Flags()
flags.AddGoFlagSet(flag.CommandLine)
cf.AddFlags(flags)
flags.SortFlags = false
flags.StringVar(cf.KubeConfig, "kubeconfig", *cf.KubeConfig, "Path to the kubeconfig file to use for CLI requests.")
flags.StringVarP(cf.Namespace, "namespace", "n", *cf.Namespace, "If present, the namespace scope for this CLI request")
flags.StringVar(cf.Context, "context", *cf.Context, "The name of the kubeconfig context to use")
flags.StringVar(cf.ClusterName, "cluster", *cf.ClusterName, "The name of the kubeconfig cluster to use")

flags.BoolVarP(&printVersion, "version", "V", false, "Print version info and exit.")
flags.StringVar(&o.address, "address", "127.0.0.1", "Address to listen on. Only accepts IP addresses as a value.")
flags.StringVarP(&o.targetsFile, "file", "f", "", "Forward to the targets specified in the given file, with one target per line.")
flags.StringVar(&o.serverImage, "server.image", "ghcr.io/knight42/krelay-server:v0.0.2", "The krelay-server image to use.")
flags.StringVar(&o.serverNamespace, "server.namespace", metav1.NamespaceDefault, "The namespace in which krelay-server is located.")
flags.StringVarP(&o.targetsFile, "file", "f", "", "Forward to the targets specified in the given file, with one target per line.")

// I do not want these flags to show up in --help.
hiddenFlags := []string{
"add_dir_header",
"log_flush_frequency",
"alsologtostderr",
"log_backtrace_at",
"log_dir",
"log_file",
"log_file_max_size",
"one_output",
"logtostderr",
"skip_headers",
"skip_log_headers",
"stderrthreshold",
"vmodule",
}
for _, flagName := range hiddenFlags {
_ = flags.MarkHidden(flagName)
}
flags.IntVarP(&o.verbosity, "v", "v", 3, "Number for the log level verbosity. The bigger the more verbose.")

_ = c.Execute()
}
30 changes: 15 additions & 15 deletions cmd/client/tcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,13 @@ package main

import (
"io"
"log/slog"
"net"

"k8s.io/apimachinery/pkg/util/httpstream"
"k8s.io/klog/v2"

"github.com/knight42/krelay/pkg/constants"
slogutil "github.com/knight42/krelay/pkg/slog"
"github.com/knight42/krelay/pkg/xio"
"github.com/knight42/krelay/pkg/xnet"
)
Expand All @@ -16,18 +17,17 @@ func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAdd
defer clientConn.Close()

requestID := xnet.NewRequestID()
kvs := []any{constants.LogFieldRequestID, requestID}
defer klog.V(4).InfoS("handleTCPConn exit", kvs...)
klog.InfoS("Handling tcp connection",
constants.LogFieldRequestID, requestID,
constants.LogFieldDestAddr, xnet.JoinHostPort(dstAddr.String(), dstPort),
constants.LogFieldLocalAddr, clientConn.LocalAddr().String(),
"clientAddr", clientConn.RemoteAddr().String(),
l := slog.With(slog.String(constants.LogFieldRequestID, requestID))
defer l.Debug("handleTCPConn exit")
l.Info("Handling tcp connection",
slog.String(constants.LogFieldDestAddr, xnet.JoinHostPort(dstAddr.String(), dstPort)),
slog.String(constants.LogFieldLocalAddr, clientConn.LocalAddr().String()),
slog.String("clientAddr", clientConn.RemoteAddr().String()),
)

dataStream, errorChan, err := createStream(serverConn, requestID)
if err != nil {
klog.ErrorS(err, "Fail to create stream", kvs...)
l.Error("Fail to create stream", slogutil.Error(err))
return
}

Expand All @@ -39,18 +39,18 @@ func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAdd
}
_, err = xio.WriteFull(dataStream, hdr.Marshal())
if err != nil {
klog.ErrorS(err, "Fail to write header", kvs...)
l.Error("Fail to write header", slogutil.Error(err))
return
}

var ack xnet.Acknowledgement
err = ack.FromReader(dataStream)
if err != nil {
klog.ErrorS(err, "Fail to receive ack", kvs...)
l.Error("Fail to receive ack", slogutil.Error(err))
return
}
if ack.Code != xnet.AckCodeOK {
klog.ErrorS(ack.Code, "Fail to connect", kvs...)
l.Error("Fail to connect", slogutil.Error(ack.Code))
return
}

Expand All @@ -60,7 +60,7 @@ func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAdd
go func() {
// Copy from the remote side to the local port.
if _, err := io.Copy(clientConn, dataStream); err != nil && !xnet.IsClosedConnectionError(err) {
klog.ErrorS(err, "Fail to copy from remote stream to local connection", kvs...)
l.Error("Fail to copy from remote stream to local connection", slogutil.Error(err))
}

// inform the select below that the remote copy is done
Expand All @@ -73,7 +73,7 @@ func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAdd

// Copy from the local port to the remote side.
if _, err := io.Copy(dataStream, clientConn); err != nil && !xnet.IsClosedConnectionError(err) {
klog.ErrorS(err, "Fail to copy from local connection to remote stream", kvs...)
l.Error("Fail to copy from local connection to remote stream", slogutil.Error(err))
// break out of the select below without waiting for the other copy to finish
close(localError)
}
Expand All @@ -88,6 +88,6 @@ func handleTCPConn(clientConn net.Conn, serverConn httpstream.Connection, dstAdd
// always expect something on errorChan (it may be nil)
err = <-errorChan
if err != nil {
klog.ErrorS(err, "Unexpected error from stream", kvs...)
l.Error("Unexpected error from stream", slogutil.Error(err))
}
}
Loading

0 comments on commit cad54d6

Please sign in to comment.