diff --git a/insonmnia/node/config.go b/insonmnia/node/config.go index b5c05319e..b3635d8f7 100644 --- a/insonmnia/node/config.go +++ b/insonmnia/node/config.go @@ -9,6 +9,7 @@ import ( "github.com/sonm-io/core/insonmnia/logging" "github.com/sonm-io/core/insonmnia/matcher" "github.com/sonm-io/core/insonmnia/npp" + "github.com/sonm-io/core/insonmnia/ssh" ) type nodeConfig struct { @@ -18,15 +19,16 @@ type nodeConfig struct { } type Config struct { - Node nodeConfig `yaml:"node"` - NPP npp.Config `yaml:"npp"` - Log logging.Config `yaml:"log"` - Blockchain *blockchain.Config `yaml:"blockchain"` - Eth accounts.EthConfig `yaml:"ethereum" required:"false"` - DWH dwh.YAMLConfig `yaml:"dwh"` - MetricsListenAddr string `yaml:"metrics_listen_addr" default:"127.0.0.1:14003"` - Benchmarks benchmarks.Config `yaml:"benchmarks"` - Matcher *matcher.YAMLConfig `yaml:"matcher"` + Node nodeConfig `yaml:"node"` + NPP npp.Config `yaml:"npp"` + Log logging.Config `yaml:"log"` + Blockchain *blockchain.Config `yaml:"blockchain"` + Eth accounts.EthConfig `yaml:"ethereum" required:"false"` + DWH dwh.YAMLConfig `yaml:"dwh"` + MetricsListenAddr string `yaml:"metrics_listen_addr" default:"127.0.0.1:14003"` + Benchmarks benchmarks.Config `yaml:"benchmarks"` + Matcher *matcher.YAMLConfig `yaml:"matcher"` + SSH *ssh.ProxyServerConfig `yaml:"ssh"` } // NewConfig loads localNode config from given .yaml file diff --git a/insonmnia/node/mod.go b/insonmnia/node/mod.go index 72ac42098..c05242fbe 100644 --- a/insonmnia/node/mod.go +++ b/insonmnia/node/mod.go @@ -59,6 +59,10 @@ func New(ctx context.Context, cfg *Config, options ...Option) (*Node, error) { ) } + if cfg.SSH != nil { + serverOptions = append(serverOptions, WithSSH(*cfg.SSH, transportCredentials, remoteOptions.eth.Market(), log.Sugar())) + } + server, err := newServer(cfg.Node, services, serverOptions...) if err != nil { return nil, fmt.Errorf("failed to build Node instance: %s", err) diff --git a/insonmnia/node/options.go b/insonmnia/node/options.go index 635e7401a..f187edd33 100644 --- a/insonmnia/node/options.go +++ b/insonmnia/node/options.go @@ -5,7 +5,9 @@ import ( "crypto/sha256" "github.com/ethereum/go-ethereum/crypto" + "github.com/sonm-io/core/blockchain" "github.com/sonm-io/core/insonmnia/auth" + "github.com/sonm-io/core/insonmnia/ssh" "github.com/sonm-io/core/util/rest" "github.com/sonm-io/core/util/xgrpc" "go.uber.org/zap" @@ -37,12 +39,14 @@ type serverOptions struct { allowREST bool optionsREST []rest.Option exposeGRPCMetrics bool + sshProxy SSHServer log *zap.Logger } func newServerOptions() *serverOptions { return &serverOptions{ - log: zap.NewNop(), + sshProxy: &ssh.NilSSHProxyServer{}, + log: zap.NewNop(), } } @@ -89,6 +93,19 @@ func WithGRPCServerMetrics() ServerOption { } } +func WithSSH(cfg ssh.ProxyServerConfig, credentials credentials.TransportCredentials, market blockchain.MarketAPI, log *zap.SugaredLogger) ServerOption { + return func(o *serverOptions) error { + server, err := ssh.NewSSHProxyServer(cfg, credentials, market, log) + if err != nil { + return err + } + + o.sshProxy = server + + return nil + } +} + func WithServerLog(log *zap.Logger) ServerOption { return func(o *serverOptions) error { o.log = log diff --git a/insonmnia/node/server.go b/insonmnia/node/server.go index 9ca2fdc66..fe8c921d0 100644 --- a/insonmnia/node/server.go +++ b/insonmnia/node/server.go @@ -53,6 +53,10 @@ type Services interface { Interceptor() grpc.UnaryServerInterceptor } +type SSHServer interface { + Serve(ctx context.Context) error +} + // Server is a server for LocalNode instance. // // Its responsibility is to manage network part, i.e. creating TCP servers and @@ -64,6 +68,7 @@ type Server struct { // Servers for processing requests. serverGRPC *grpc.Server serverREST *rest.Server + serverSSH SSHServer log *zap.SugaredLogger } @@ -107,6 +112,7 @@ func newServer(cfg nodeConfig, services Services, options ...ServerOption) (*Ser GRPC: toLocalAddrs(listenersGRPC), REST: toLocalAddrs(listenersREST), }, + serverSSH: opts.sshProxy, } if opts.allowGRPC { @@ -169,7 +175,10 @@ func (m *Server) Serve(ctx context.Context) error { wg.Go(func() error { return m.serveHTTP(ctx, network.ListenersREST...) }) - // TODO: Also add debug server and ssh. + wg.Go(func() error { + return m.serverSSH.Serve(ctx) + }) + // TODO: Also add debug server. <-ctx.Done() diff --git a/insonmnia/npp/dial.go b/insonmnia/npp/dial.go index 73aa59b13..49fcecc45 100644 --- a/insonmnia/npp/dial.go +++ b/insonmnia/npp/dial.go @@ -4,6 +4,7 @@ package npp import ( "context" + "fmt" "net" "time" @@ -81,6 +82,7 @@ func (m *Dialer) DialContext(ctx context.Context, addr auth.Addr) (net.Conn, err nppChannel <- newConnTuple(nil, err) return } + //defer puncher.Close() nppChannel <- newConnTuple(puncher.Dial(ethAddr)) }() @@ -104,6 +106,11 @@ func (m *Dialer) DialContext(ctx context.Context, addr auth.Addr) (net.Conn, err } } + if m.relayDialer == nil { + log.Warn("failed to connect using NPP - all methods failed") + return nil, fmt.Errorf("failed to connect using NPP - all methods failed") + } + log.Debug("connecting using Relay") channel := make(chan connTuple) go func() { diff --git a/insonmnia/npp/options.go b/insonmnia/npp/options.go index 2b0725081..4ac04c9dd 100644 --- a/insonmnia/npp/options.go +++ b/insonmnia/npp/options.go @@ -7,6 +7,7 @@ import ( "github.com/sonm-io/core/insonmnia/npp/relay" "github.com/sonm-io/core/insonmnia/npp/rendezvous" + "github.com/sonm-io/core/proto" "go.uber.org/zap" "google.golang.org/grpc/credentials" ) @@ -23,6 +24,7 @@ type options struct { nppMaxBackoffInterval time.Duration relayListener *relay.Listener relayDialer *relay.Dialer + protocol string } func newOptions() *options { @@ -31,6 +33,7 @@ func newOptions() *options { nppBacklog: 128, nppMinBackoffInterval: 500 * time.Millisecond, nppMaxBackoffInterval: 8000 * time.Millisecond, + protocol: sonm.DefaultNPPProtocol, } } @@ -49,7 +52,7 @@ func WithRendezvous(cfg rendezvous.Config, credentials credentials.TransportCred for _, addr := range cfg.Endpoints { client, err := newRendezvousClient(ctx, addr, credentials) if err == nil { - return newNATPuncher(ctx, cfg, client) + return newNATPuncher(ctx, cfg, client, o.protocol) } } @@ -112,3 +115,10 @@ func WithRelayDialer(dialer *relay.Dialer) Option { return nil } } + +func WithProtocol(protocol string) Option { + return func(o *options) error { + o.protocol = protocol + return nil + } +} diff --git a/insonmnia/npp/puncher.go b/insonmnia/npp/puncher.go index 50d285f7e..bf805358f 100644 --- a/insonmnia/npp/puncher.go +++ b/insonmnia/npp/puncher.go @@ -50,6 +50,7 @@ type natPuncher struct { log *zap.Logger client *rendezvousClient + protocol string listener net.Listener listenerChannel chan connTuple @@ -57,7 +58,7 @@ type natPuncher struct { timeout time.Duration } -func newNATPuncher(ctx context.Context, cfg rendezvous.Config, client *rendezvousClient) (NATPuncher, error) { +func newNATPuncher(ctx context.Context, cfg rendezvous.Config, client *rendezvousClient, proto string) (NATPuncher, error) { // It's important here to reuse the Rendezvous client local address for // successful NAT penetration in the case of cone NAT. listener, err := reuseport.Listen(protocol, client.LocalAddr().String()) @@ -71,6 +72,7 @@ func newNATPuncher(ctx context.Context, cfg rendezvous.Config, client *rendezvou ctx: ctx, log: ctxlog.G(ctx), client: client, + protocol: proto, listenerChannel: channel, listener: listener, @@ -153,7 +155,7 @@ func (m *natPuncher) resolve(ctx context.Context, addr common.Address) (*sonm.Re } request := &sonm.ConnectRequest{ - Protocol: protocol, + Protocol: m.protocol, PrivateAddrs: []*sonm.Addr{}, ID: addr.Bytes(), } @@ -173,6 +175,7 @@ func (m *natPuncher) publish(ctx context.Context) (*sonm.RendezvousReply, error) } request := &sonm.PublishRequest{ + Protocol: m.protocol, PrivateAddrs: []*sonm.Addr{}, } diff --git a/insonmnia/ssh/config.go b/insonmnia/ssh/config.go new file mode 100644 index 000000000..d2a422945 --- /dev/null +++ b/insonmnia/ssh/config.go @@ -0,0 +1,11 @@ +package ssh + +import ( + "github.com/sonm-io/core/insonmnia/npp" +) + +// ProxyServerConfig specifies SSH proxy server configuration. +type ProxyServerConfig struct { + Addr string `yaml:"endpoint" required:"true"` + NPP npp.Config `yaml:"npp" required:"true"` +} diff --git a/insonmnia/ssh/proxy.go b/insonmnia/ssh/proxy.go new file mode 100644 index 000000000..ea97ae431 --- /dev/null +++ b/insonmnia/ssh/proxy.go @@ -0,0 +1,331 @@ +package ssh + +import ( + "context" + "fmt" + "io" + "math/big" + "net" + "os" + "strings" + + "github.com/sonm-io/core/blockchain" + "github.com/sonm-io/core/insonmnia/auth" + "github.com/sonm-io/core/insonmnia/npp" + "github.com/sonm-io/core/proto" + "go.uber.org/zap" + "golang.org/x/crypto/ssh" + "golang.org/x/crypto/ssh/agent" + "golang.org/x/sync/errgroup" + "google.golang.org/grpc/credentials" + + sshd "github.com/gliderlabs/ssh" +) + +const ( + proto = "ssh" + sshAgentSockName = "SSH_AUTH_SOCK" +) + +type NilSSHProxyServer struct{} + +func (m *NilSSHProxyServer) Serve(ctx context.Context) error { + return nil +} + +type SSHProxyServer struct { + cfg ProxyServerConfig + market blockchain.MarketAPI + options []npp.Option + log *zap.SugaredLogger +} + +func convertHostSigners(v []ssh.Signer) []sshd.Signer { + var converted []sshd.Signer + for id := range v { + converted = append(converted, v[id]) + } + + return converted +} + +// NewSSHProxyServer constructs a new SSH proxy server that will serve SSH +// connections in remote containers by smart-forwarding traffic via itself. +// +// The server requires SSH agent running on the host system with appropriate +// keys loaded in it. While running it will NOT modify the data within the +// agent. +// +// Example of external usage: "ssh :@ -p ". +func NewSSHProxyServer(cfg ProxyServerConfig, credentials credentials.TransportCredentials, market blockchain.MarketAPI, log *zap.SugaredLogger) (*SSHProxyServer, error) { + options := []npp.Option{ + npp.WithProtocol(proto), + npp.WithRendezvous(cfg.NPP.Rendezvous, credentials), + // TODO: Activate relay, but for now disable for rendezvous testing. + npp.WithLogger(log.Desugar()), + } + + m := &SSHProxyServer{ + cfg: cfg, + market: market, + options: options, + log: log, + } + + return m, nil +} + +// Serve starts serving the SSH proxy server until the specified context is +// canceled or a critical error occurs. +func (m *SSHProxyServer) Serve(ctx context.Context) error { + m.log.Infof("exposing SSH server on %s", m.cfg.Addr) + defer m.log.Infof("stopped SSH server on %s", m.cfg.Addr) + + agentSock, err := net.Dial("unix", os.Getenv(sshAgentSockName)) + if err != nil { + return fmt.Errorf("failed to open ssh agent socket: %v", err) + } + defer agentSock.Close() + + hostSigners, err := agent.NewClient(agentSock).Signers() + if err != nil { + return fmt.Errorf("failed to extract signers from ssh agent: %v", err) + } + if len(hostSigners) == 0 { + return fmt.Errorf("failed to extract signers from ssh agent: no identities known to the agent") + } + + nppDialer, err := npp.NewDialer(m.options...) + if err != nil { + return err + } + defer nppDialer.Close() + + connHandler := &connHandler{ + market: m.market, + nppDialer: nppDialer, + hostSigners: hostSigners, + log: m.log, + } + + server := &sshd.Server{ + Addr: m.cfg.Addr, + Handler: connHandler.onHandle, + HostSigners: convertHostSigners(hostSigners), + } + + wg, ctx := errgroup.WithContext(ctx) + wg.Go(func() error { + return server.ListenAndServe() + }) + + <-ctx.Done() + server.Close() + + return wg.Wait() +} + +type connHandler struct { + market blockchain.MarketAPI + nppDialer *npp.Dialer + hostSigners []ssh.Signer + log *zap.SugaredLogger +} + +func (m *connHandler) onHandle(session sshd.Session) { + defer session.Close() + + if err := m.handle(session); err != nil { + session.Write(formatErr(err)) + session.Exit(1) + return + } + + session.Exit(0) +} + +func (m *connHandler) handle(session sshd.Session) error { + m.log.Infof("accepted SSH connection from %s", session.RemoteAddr()) + + pty, windows, isTty := session.Pty() + m.log.Debugw("handling SSH connection", + zap.Bool("tty", isTty), + zap.String("user", session.User()), + zap.String("terminal", pty.Term), + zap.String("publicKey", safeFingerprintSHA256(session.PublicKey())), + ) + + user, err := parseUserIdentity(session.User()) + if err != nil { + return err + } + + m.log.Debugw("resolving worker remote using passed user identity", zap.Any("user", user)) + addr, err := m.resolve(session.Context(), user.DealID) + if err != nil { + return err + } + + m.log.Debugf("resolved remote: %s", addr.String()) + + conn, err := m.nppDialer.Dial(*addr) + if err != nil { + return err + } + + m.log.Debugf("connected to remote endpoint %s", conn.RemoteAddr()) + + cfg := &ssh.ClientConfig{ + User: user.TaskID, + Auth: []ssh.AuthMethod{ + ssh.PublicKeys(m.hostSigners...), + }, + HostKeyCallback: ssh.InsecureIgnoreHostKey(), + } + + clientConn, channels, requests, err := ssh.NewClientConn(conn, addr.String(), cfg) + if err != nil { + return err + } + + client := ssh.NewClient(clientConn, channels, requests) + defer client.Close() + + remoteSession, err := client.NewSession() + if err != nil { + return err + } + defer remoteSession.Close() + + if isTty { + if err := remoteSession.RequestPty(pty.Term, pty.Window.Height, pty.Window.Width, ssh.TerminalModes{}); err != nil { + return err + } + } + + for _, env := range session.Environ() { + parts := strings.SplitN(env, "=", 2) + if len(parts) != 2 { + return fmt.Errorf("invalid environment variable: %s", env) + } + + remoteSession.Setenv(parts[0], parts[1]) + } + + stdin, err := remoteSession.StdinPipe() + if err != nil { + return err + } + stdout, err := remoteSession.StdoutPipe() + if err != nil { + return err + } + stderr, err := remoteSession.StderrPipe() + if err != nil { + return err + } + if err := remoteSession.Start(strings.Join(session.Command(), " ")); err != nil { + return err + } + + m.log.Infof("opened SSH tunnel between %s -> %s, version %s", session.LocalAddr(), clientConn.RemoteAddr(), clientConn.ClientVersion()) + + forwardFunc := func(direction string, rd io.Reader, wr io.Writer) error { + m.log.Infof("forwarding %s", direction) + + written, err := io.Copy(wr, rd) + if err != nil { + m.log.With(zap.Error(err)).Warnf("failed to forward %s", direction) + return err + } + + m.log.Infof("finished forwarding %s: %d bytes written", direction, written) + + return nil + } + + wg, ctx := errgroup.WithContext(session.Context()) + wg.Go(func() error { + return forwardFunc("-> stdin", session, stdin) + }) + // TODO: stdout/stderr intermixing is possible. How to get with it? + wg.Go(func() error { + return forwardFunc("<- stdout", stdout, session) + }) + wg.Go(func() error { + return forwardFunc("<- stderr", stderr, session) + }) + wg.Go(func() error { + for window := range windows { + m.log.Debugf("detected window change: %dx%d", window.Height, window.Width) + if err := remoteSession.WindowChange(window.Height, window.Width); err != nil { + return err + } + } + + return nil + }) + wg.Go(func() error { + // When we're closing session first. + <-ctx.Done() + remoteSession.Close() + return nil + }) + wg.Go(func() error { + // When remote session is finished. + err := rem.oteSession.Wait() + remoteSession.Close() + session.Close() + return err + }) + + return wg.Wait() +} + +func (m *connHandler) resolve(ctx context.Context, dealID *big.Int) (*auth.Addr, error) { + deal, err := m.market.GetDealInfo(ctx, dealID) + if err != nil { + return nil, fmt.Errorf("failed to resolve `%s` deal into ETH address: %v", dealID.String(), err) + } + + if deal.Status == sonm.DealStatus_DEAL_CLOSED { + return nil, fmt.Errorf("failed to resolve `%s` deal into ETH address: deal is closed", dealID.String()) + } + + return auth.NewAddr(deal.GetSupplierID().Unwrap().Hex()) +} + +type userIdentity struct { + DealID *big.Int + TaskID string +} + +func parseUserIdentity(user string) (*userIdentity, error) { + parts := strings.Split(user, ".") + + if len(parts) != 2 { + return nil, fmt.Errorf("user identity must be in format :, but received `%s`", user) + } + + dealID, ok := new(big.Int).SetString(parts[0], 10) + if !ok { + return nil, fmt.Errorf("deal ID must be a number, but received `%s`", parts[0]) + } + + return &userIdentity{ + DealID: dealID, + TaskID: parts[1], + }, nil +} + +func formatErr(err error) []byte { + return []byte(fmt.Sprintf("Failed to ssh: %s.\n", err.Error())) +} + +func safeFingerprintSHA256(publicKey ssh.PublicKey) string { + if publicKey == nil { + return "" + } + + return ssh.FingerprintSHA256(publicKey) +} diff --git a/insonmnia/worker/config.go b/insonmnia/worker/config.go index 4991306a0..cdc37e5bf 100644 --- a/insonmnia/worker/config.go +++ b/insonmnia/worker/config.go @@ -16,11 +16,6 @@ import ( "github.com/sonm-io/core/util/debug" ) -type SSHConfig struct { - BindEndpoint string `required:"true" yaml:"bind"` - PrivateKeyPath string `required:"true" yaml:"private_key_path"` -} - type ResourcesConfig struct { Cgroup string `required:"true" yaml:"cgroup"` Resources *specs.LinuxResources `required:"false" yaml:"resources"` diff --git a/insonmnia/worker/container.go b/insonmnia/worker/container.go index a168f5034..f80a9f82a 100644 --- a/insonmnia/worker/container.go +++ b/insonmnia/worker/container.go @@ -146,7 +146,7 @@ func (c *containerDescriptor) execCommand(ctx context.Context, cmd []string, env if !ok { return } - c.log.Info("resizing tty to %dx%d", w.Height, w.Width) + c.log.Infof("resizing tty to %dx%d", w.Height, w.Width) err = c.client.ContainerExecResize(ctx, execId.ID, types.ResizeOptions{Height: uint(w.Height), Width: uint(w.Width)}) if err != nil { log.G(ctx).Warn("ContainerExecResize finished with error", zap.Error(err)) diff --git a/insonmnia/worker/options.go b/insonmnia/worker/options.go index 1f58eeef3..8e6e2f2ab 100644 --- a/insonmnia/worker/options.go +++ b/insonmnia/worker/options.go @@ -106,10 +106,6 @@ func (m *options) SetupDefaults() error { return err } - if err := m.setupSSH(); err != nil { - return err - } - if err := m.setupOverseer(); err != nil { return err } @@ -279,7 +275,16 @@ func (m *options) setupNetworkOptions() error { return errors.New("failed to get public IPs") } -func (m *options) setupSSH() error { +func (m *options) setupSSH(view OverseerView) error { + if m.cfg.SSH != nil { + ssh, err := NewSSHServer(*m.cfg.SSH, m.creds, view, ctxlog.S(m.ctx)) + if err != nil { + return err + } + m.ssh = ssh + return nil + } + if m.ssh == nil { m.ssh = nilSSH{} } diff --git a/insonmnia/worker/server.go b/insonmnia/worker/server.go index f125ae10c..0152c2944 100644 --- a/insonmnia/worker/server.go +++ b/insonmnia/worker/server.go @@ -2,6 +2,7 @@ package worker import ( "bytes" + context2 "context" "encoding/json" "errors" "fmt" @@ -18,6 +19,7 @@ import ( "github.com/docker/go-connections/nat" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/crypto" + "github.com/gliderlabs/ssh" "github.com/gogo/protobuf/proto" "github.com/grpc-ecosystem/go-grpc-prometheus" log "github.com/noxiouz/zapctx/ctxlog" @@ -73,6 +75,31 @@ var ( } ) +type overseerView struct { + worker *Worker +} + +func (m *overseerView) ContainerInfo(id string) (*ContainerInfo, bool) { + return m.worker.GetContainerInfo(id) +} + +func (m *overseerView) IdentityLevel(id string) (pb.IdentityLevel, error) { + plan, err := m.worker.AskPlanByTaskID(id) + if err != nil { + return pb.IdentityLevel_UNKNOWN, err + } + + return plan.Identity, nil +} + +func (m *overseerView) ExecIdentity() pb.IdentityLevel { + return m.worker.cfg.SSH.Identity +} + +func (m *overseerView) Exec(ctx context2.Context, id string, cmd []string, env []string, isTty bool, wCh <-chan ssh.Window) (types.HijackedResponse, error) { + return m.worker.ovs.Exec(ctx, id, cmd, env, isTty, wCh) +} + // Worker holds information about jobs, make orders to Observer and communicates with Worker type Worker struct { *options @@ -151,20 +178,25 @@ func NewWorker(opts ...Option) (m *Worker, err error) { return nil, err } + if err := m.setupSSH(&overseerView{worker: m}); err != nil { + m.Close() + return nil, err + } + return m, nil } // Serve starts handling incoming API gRPC requests func (m *Worker) Serve() error { - defer m.Close() - m.startTime = time.Now() if err := m.waitMasterApproved(); err != nil { + m.Close() return err } relayListener, err := relay.NewListener(m.cfg.NPP.Relay.Endpoints, m.key, log.G(m.ctx)) if err != nil { + m.Close() return err } @@ -177,14 +209,26 @@ func (m *Worker) Serve() error { ) if err != nil { log.G(m.ctx).Error("failed to listen", zap.String("address", m.cfg.Endpoint), zap.Error(err)) + m.Close() return err } m.listener = listener - log.G(m.ctx).Info("listening for gRPC API connections", zap.Stringer("address", listener.Addr())) - err = m.externalGrpc.Serve(listener) + wg, ctx := errgroup.WithContext(m.ctx) + wg.Go(func() error { + return m.RunSSH(ctx) + }) + wg.Go(func() error { + log.S(m.ctx).Infof("listening for gRPC API connections on %s", listener.Addr()) + defer log.S(m.ctx).Infof("finished listening for gRPC API connections on %s", listener.Addr()) + + return m.externalGrpc.Serve(listener) + }) + + <-ctx.Done() + m.Close() - return err + return wg.Wait() } func (m *Worker) waitMasterApproved() error { @@ -379,17 +423,6 @@ func (m *Worker) GetContainerInfo(id string) (*ContainerInfo, bool) { return info, ok } -func (m *Worker) getContainerIdByTaskId(id string) (string, bool) { - m.mu.Lock() - defer m.mu.Unlock() - - info, ok := m.containers[id] - if ok { - return info.ID, ok - } - return "", ok -} - func (m *Worker) Devices(ctx context.Context, request *pb.Empty) (*pb.DevicesReply, error) { return m.hardware.IntoProto(), nil } @@ -779,7 +812,7 @@ func (m *Worker) TaskLogs(request *pb.TaskLogsRequest, server pb.Worker_TaskLogs return err } log.G(m.ctx).Info("handling TaskLogs request", zap.Any("request", request)) - cid, ok := m.getContainerIdByTaskId(request.Id) + containerInfo, ok := m.GetContainerInfo(request.Id) if !ok { return status.Errorf(codes.NotFound, "no job with id %s", request.Id) } @@ -792,7 +825,7 @@ func (m *Worker) TaskLogs(request *pb.TaskLogsRequest, server pb.Worker_TaskLogs Tail: request.Tail, Details: request.Details, } - reader, err := m.ovs.Logs(server.Context(), cid, opts) + reader, err := m.ovs.Logs(server.Context(), containerInfo.ID, opts) if err != nil { return err } @@ -855,8 +888,8 @@ func (m *Worker) TaskStatus(ctx context.Context, req *pb.ID) (*pb.TaskStatusRepl return reply, nil } -func (m *Worker) RunSSH() error { - return m.ssh.Run() +func (m *Worker) RunSSH(ctx context.Context) error { + return m.ssh.Run(ctx) } // RunBenchmarks perform benchmarking of Worker's resources. diff --git a/insonmnia/worker/ssh.go b/insonmnia/worker/ssh.go index e358f1534..f7da97007 100644 --- a/insonmnia/worker/ssh.go +++ b/insonmnia/worker/ssh.go @@ -1,104 +1,131 @@ package worker import ( + "context" + "fmt" "io" "io/ioutil" "net" + "strings" + "github.com/docker/docker/api/types" "github.com/docker/docker/pkg/stdcopy" - "github.com/gliderlabs/ssh" - log "github.com/noxiouz/zapctx/ctxlog" + "github.com/sonm-io/core/insonmnia/npp" + "github.com/sonm-io/core/proto" "go.uber.org/zap" - gossh "golang.org/x/crypto/ssh" + "golang.org/x/crypto/ssh" + "google.golang.org/grpc/credentials" + + sshd "github.com/gliderlabs/ssh" +) + +const ( + sshStatusOK sshStatus = 0 + sshStatusServerError = 255 ) +type sshStatus int + +type SSHConfig struct { + Endpoint string `yaml:"endpoint" required:"true"` + PrivateKeyPath string `yaml:"private_key_path" required:"true"` + NPP npp.Config `yaml:"npp"` + Identity sonm.IdentityLevel `yaml:"identity" default:"identified"` +} + type SSH interface { - Run() error - Close() + Run(ctx context.Context) error + Close() error } type nilSSH struct{} -func (nilSSH) Run() error { - return nil -} +func (nilSSH) Run(ctx context.Context) error { return nil } +func (nilSSH) Close() error { return nil } -func (nilSSH) Close() {} +// OverseerView is a bridge between keeping "Worker" as a parameter and +// slightly more decomposed architecture. +type OverseerView interface { + ContainerInfo(id string) (*ContainerInfo, bool) + IdentityLevel(id string) (sonm.IdentityLevel, error) + ExecIdentity() sonm.IdentityLevel + Exec(ctx context.Context, id string, cmd []string, env []string, isTty bool, wCh <-chan sshd.Window) (types.HijackedResponse, error) +} -type sshServer struct { - worker *Worker - laddr string - privateKeyPath string - listener net.Listener - server *ssh.Server +type connHandler struct { + overseer OverseerView + log *zap.SugaredLogger } -func NewSSH(worker *Worker, config *SSHConfig) (SSH, error) { - ret := sshServer{ - laddr: config.BindEndpoint, - privateKeyPath: config.PrivateKeyPath, - worker: worker, +func newConnHandler(overseer OverseerView, log *zap.SugaredLogger) *connHandler { + return &connHandler{ + overseer: overseer, + log: log, } - return &ret, nil } -func (s *sshServer) Run() error { - l, err := net.Listen("tcp", s.laddr) - if err != nil { - return err - } - s.listener = l - s.server = &ssh.Server{} - if len(s.privateKeyPath) != 0 { - pkeyData, err := ioutil.ReadFile(s.privateKeyPath) - if err != nil { - return err - } - pkey, err := gossh.ParsePrivateKey(pkeyData) - if err != nil { - return err - } - s.server.HostSigners = append(s.server.HostSigners, pkey) +func (m *connHandler) Verify(ctx sshd.Context, key sshd.PublicKey) bool { + if err := m.verify(ctx.User(), key); err != nil { + m.log.Warnw("verification failed", zap.Error(err)) + return false } - s.server.Handle(s.onSession) - s.server.PublicKeyHandler = s.verify - return s.server.Serve(s.listener) + + return true } -func (s *sshServer) verify(ctx ssh.Context, key ssh.PublicKey) bool { - cinfo, ok := s.worker.GetContainerInfo(ctx.User()) +func (m *connHandler) verify(taskID string, key sshd.PublicKey) error { + m.log.Debugf("public key %s verification from user %s", ssh.FingerprintSHA256(key), taskID) + + containerInfo, ok := m.overseer.ContainerInfo(taskID) if !ok { - return false + return fmt.Errorf("container `%s` not found", taskID) + } + + if !sshd.KeysEqual(containerInfo.PublicKey, key) { + return fmt.Errorf("provided public key `%s` is not equal with the specified key `%s`", ssh.FingerprintSHA256(key), ssh.FingerprintSHA256(containerInfo.PublicKey)) } - log.G(s.worker.ctx).Info("verifying public key") - return ssh.KeysEqual(cinfo.PublicKey, key) + + return nil } -func (s *sshServer) onSession(session ssh.Session) { - status := s.process(session) - session.Exit(status) - log.G(s.worker.ctx).Info("finished processing ssh session", zap.Int("status", status)) +func (m *connHandler) onSession(session sshd.Session) { + status, err := m.process(session) + if err != nil { + session.Write([]byte(capitalize(err.Error()) + "\n")) + m.log.Warnw("failed to process ssh session", zap.Error(err)) + } + + session.Exit(int(status)) + + m.log.Infof("finished processing ssh session with %d status", int(status)) } -func (s *sshServer) process(session ssh.Session) (status int) { - status = 255 +func (m *connHandler) process(session sshd.Session) (sshStatus, error) { + m.log.Debugf("processing %v", session.RemoteAddr()) _, wCh, isTty := session.Pty() cmd := session.Command() if len(cmd) == 0 { - cmd = append(cmd, "login", "-f", "root") + cmd = []string{"login", "-f", "root"} + } + + identity, err := m.overseer.IdentityLevel(session.User()) + if err != nil { + return sshStatusServerError, fmt.Errorf("failed to extract identity level for task `%s`: %v", session.User(), err) + } + + if identity < m.overseer.ExecIdentity() { + return sshStatusServerError, fmt.Errorf("identity level `%s` does not allow to exec ssh: must be `%s` or higher", identity.String(), m.overseer.ExecIdentity()) } - cid, ok := s.worker.getContainerIdByTaskId(session.User()) + + containerInfo, ok := m.overseer.ContainerInfo(session.User()) if !ok { - msg := "could not find container by task " + string(session.User()+"\n") - session.Write([]byte(msg)) - log.G(s.worker.ctx).Warn(msg) - return + return sshStatusServerError, fmt.Errorf("failed to find container for task `%s`", session.User()) } - stream, err := s.worker.ovs.Exec(s.worker.ctx, cid, cmd, session.Environ(), isTty, wCh) + + stream, err := m.overseer.Exec(session.Context(), containerInfo.ID, cmd, session.Environ(), isTty, wCh) if err != nil { - session.Write([]byte(err.Error())) - return + return sshStatusServerError, err } defer stream.Close() outputErr := make(chan error) @@ -120,24 +147,82 @@ func (s *sshServer) process(session ssh.Session) (status int) { err = <-outputErr if err != nil { - status = 0 - } else { - log.G(s.worker.ctx).Warn("io error during ssh session:", zap.Error(err)) + m.log.Warnw("I/O error during SSH session", zap.Error(err)) + return sshStatusServerError, nil + } + + return sshStatusOK, nil +} + +type sshServer struct { + cfg SSHConfig + credentials credentials.TransportCredentials + server *sshd.Server + log *zap.SugaredLogger +} + +func NewSSHServer(cfg SSHConfig, credentials credentials.TransportCredentials, overseer OverseerView, log *zap.SugaredLogger) (*sshServer, error) { + privateKeyData, err := ioutil.ReadFile(cfg.PrivateKeyPath) + if err != nil { + return nil, err + } + + privateKey, err := ssh.ParsePrivateKey(privateKeyData) + if err != nil { + return nil, err + } + + connHandler := newConnHandler(overseer, log) + + server := &sshd.Server{ + Handler: connHandler.onSession, + HostSigners: []sshd.Signer{privateKey}, + PublicKeyHandler: connHandler.Verify, + } + + m := &sshServer{ + cfg: cfg, + credentials: credentials, + server: server, + log: log, + } + + return m, nil +} + +func (m *sshServer) Run(ctx context.Context) error { + m.log.Info("running ssh server") + defer m.log.Info("stopped ssh server") + + listener, err := m.newListener(ctx) + if err != nil { + return err } - return + defer listener.Close() + + return m.server.Serve(listener) } -func (s *sshServer) Close() { - if s.server != nil { - log.G(s.worker.ctx).Info("closing ssh server") - s.server.Close() +func (m *sshServer) newListener(ctx context.Context) (net.Listener, error) { + nppOptions := []npp.Option{ + npp.WithProtocol("ssh"), + npp.WithRendezvous(m.cfg.NPP.Rendezvous, m.credentials), + // TODO: Relay. + npp.WithLogger(m.log.Desugar()), } + + return npp.NewListener(ctx, m.cfg.Endpoint, nppOptions...) +} + +func (m *sshServer) Close() error { + m.log.Info("closing ssh server") + + return m.server.Close() } func parsePublicKey(key string) (ssh.PublicKey, error) { var publicKey ssh.PublicKey if len(key) != 0 { - var err error k, _, _, _, err := ssh.ParseAuthorizedKey([]byte(key)) if err != nil { return nil, err @@ -147,3 +232,11 @@ func parsePublicKey(key string) (ssh.PublicKey, error) { return publicKey, nil } + +func capitalize(s string) string { + if len(s) == 0 { + return s + } + + return strings.ToUpper(s[:1]) + s[1:] +}