This repository has been archived by the owner. It is now read-only.
Browse files

Revert "api: Query cluster information from cache"

This reverts commit 60e155a.
It's just not quite ready for prime-time yet practically speaking.
  • Loading branch information...
ejj committed Nov 10, 2017
1 parent de660db commit cd1b39b5ec5eb7af2cd5571b7f9d5586a58bdc7e
Showing with 122 additions and 202 deletions.
  1. +27 −113 api/server/server.go
  2. +56 −89 api/server/server_test.go
  3. +15 −0 cli/command/show.go
  4. +24 −0 cli/command/show_test.go
@@ -8,7 +8,6 @@ import (
@@ -39,16 +38,6 @@ type server struct {
// The credentials to use while connecting to clients in the cluster.
clientCreds connection.Credentials
// clusterInfo is a cache of the cluster's state to avoid requerying the
// entire cluster's state for every `kelda show` call. Its key is a database
// table, and its value is a slice of rows for that database.
// The state is kept in sync by the syncClusterInfo goroutine. Because reading
// and writing occur in different goroutines, callers must first obtain the
// clusterInfoLock before reading or writing to the map.
clusterInfo map[db.TableType]interface{}
clusterInfoLock sync.Mutex
// Run starts a server that responds to connections from the CLI. It runs on both
@@ -83,22 +72,14 @@ func Run(conn db.Conn, listenAddr string, runningOnDaemon bool,
apiServer := server{
conn: conn,
runningOnDaemon: runningOnDaemon,
clientCreds: creds,
clusterInfo: map[db.TableType]interface{}{},
pb.RegisterAPIServer(s, &apiServer)
if runningOnDaemon {
go apiServer.syncClusterInfo()
apiServer := server{conn, runningOnDaemon, creds}
pb.RegisterAPIServer(s, apiServer)
return nil
func (s *server) SetSecret(ctx context.Context, msg *pb.Secret) (*pb.SecretReply, error) {
func (s server) SetSecret(ctx context.Context, msg *pb.Secret) (*pb.SecretReply, error) {
// If this method is called while running on the daemon, forward the secret
// assignment to the leader. The assignment is synchronous, so the user
// will get immediate feedback on whether or not the secret was successfully
@@ -127,7 +108,7 @@ func (s *server) SetSecret(ctx context.Context, msg *pb.Secret) (*pb.SecretReply
// Query proxies certain table requests (e.g. Container and Connection) to the
// cluster. This is necessary because some tables are only used on the minions,
// and aren't synced back to the daemon.
func (s *server) Query(cts context.Context, query *pb.DBQuery) (*pb.QueryReply, error) {
func (s server) Query(cts context.Context, query *pb.DBQuery) (*pb.QueryReply, error) {
var rows interface{}
var err error
@@ -150,7 +131,7 @@ func (s *server) Query(cts context.Context, query *pb.DBQuery) (*pb.QueryReply,
return &pb.QueryReply{TableContents: string(json)}, nil
func (s *server) queryLocal(table db.TableType) (interface{}, error) {
func (s server) queryLocal(table db.TableType) (interface{}, error) {
switch table {
case db.MachineTable:
return s.conn.SelectFromMachine(nil), nil
@@ -171,103 +152,36 @@ func (s *server) queryLocal(table db.TableType) (interface{}, error) {
func (s *server) queryFromDaemon(table db.TableType) (interface{}, error) {
func (s server) queryFromDaemon(table db.TableType) (
interface{}, error) {
switch table {
case db.MachineTable, db.BlueprintTable:
return s.queryLocal(table)
case db.ContainerTable, db.ConnectionTable, db.LoadBalancerTable, db.ImageTable:
defer s.clusterInfoLock.Unlock()
return s.clusterInfo[table], nil
return nil, fmt.Errorf("unrecognized table: %s", table)
// syncClusterInfo periodically connects to the cluster and writes the state
// of the cluster into the `clusterInfo` map. It tracks the Container, LoadBalancer,
// Image, and Connection tables.
func (s *server) syncClusterInfo() {
for range time.Tick(5 * time.Second) {
func (s *server) syncClusterInfoOnce() {
machines := s.conn.SelectFromMachine(func(m db.Machine) bool {
return m.Status == db.Connected
if len(machines) == 0 {
leaderClient, err := newLeaderClient(machines, s.clientCreds)
var leaderClient client.Client
leaderClient, err := newLeaderClient(s.conn.SelectFromMachine(nil), s.clientCreds)
if err != nil {
if _, ok := err.(client.NoLeaderError); ok {
log.WithError(err).Debug("Failed to connect to leader. This " +
"should recover soon if the cluster was just booted.")
} else {
log.WithError(err).Warn("Failed to connect to leader")
return nil, err
defer leaderClient.Close()
// syncSpec defines how to query information from the cluster so that it can
// be stored in the daemon's cluster information cache.
type syncSpec struct {
table db.TableType
queryFunc func(client.Client) (interface{}, error)
syncSpecs := []syncSpec{
func(c client.Client) (interface{}, error) {
return s.getClusterContainers(c)
func(c client.Client) (interface{}, error) {
return c.QueryConnections()
func(c client.Client) (interface{}, error) {
return c.QueryImages()
func(c client.Client) (interface{}, error) {
return c.QueryLoadBalancers()
var wg sync.WaitGroup
for _, toSync := range syncSpecs {
go func(toSync syncSpec) {
defer wg.Done()
res, err := toSync.queryFunc(leaderClient)
if err != nil {
log.WithError(err).WithField("table", toSync.table).Error(
"Failed to query table ")
s.clusterInfo[toSync.table] = res
switch table {
case db.ContainerTable:
return s.getClusterContainers(leaderClient)
case db.ConnectionTable:
return leaderClient.QueryConnections()
case db.LoadBalancerTable:
return leaderClient.QueryLoadBalancers()
case db.ImageTable:
return leaderClient.QueryImages()
return nil, fmt.Errorf("unrecognized table: %s", table)
func (s *server) QueryMinionCounters(ctx context.Context, in *pb.MinionCountersRequest) (
func (s server) QueryMinionCounters(ctx context.Context, in *pb.MinionCountersRequest) (
*pb.CountersReply, error) {
if !s.runningOnDaemon {
return nil, errDaemonOnlyRPC
@@ -290,12 +204,12 @@ func (s *server) QueryMinionCounters(ctx context.Context, in *pb.MinionCountersR
return reply, nil
func (s *server) QueryCounters(ctx context.Context, in *pb.CountersRequest) (
func (s server) QueryCounters(ctx context.Context, in *pb.CountersRequest) (
*pb.CountersReply, error) {
return &pb.CountersReply{Counters: counter.Dump()}, nil
func (s *server) Deploy(cts context.Context, deployReq *pb.DeployRequest) (
func (s server) Deploy(cts context.Context, deployReq *pb.DeployRequest) (
*pb.DeployReply, error) {
if !s.runningOnDaemon {
@@ -341,12 +255,12 @@ func (s *server) Deploy(cts context.Context, deployReq *pb.DeployRequest) (
return &pb.DeployReply{}, nil
func (s *server) Version(_ context.Context, _ *pb.VersionRequest) (
func (s server) Version(_ context.Context, _ *pb.VersionRequest) (
*pb.VersionReply, error) {
return &pb.VersionReply{Version: version.Version}, nil
func (s *server) getClusterContainers(leaderClient client.Client) (interface{}, error) {
func (s server) getClusterContainers(leaderClient client.Client) (interface{}, error) {
leaderContainers, err := leaderClient.QueryContainers()
if err != nil {
return nil, err
Oops, something went wrong.

0 comments on commit cd1b39b

Please sign in to comment.