Skip to content

Commit

Permalink
Add some more godoc
Browse files Browse the repository at this point in the history
  • Loading branch information
jacksontj committed Feb 12, 2019
1 parent d8fce0d commit dec9f34
Show file tree
Hide file tree
Showing 4 changed files with 33 additions and 2 deletions.
10 changes: 10 additions & 0 deletions pkg/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ var (
ConsulK8sLinkName = "external-k8s-link"
)

// DaemonConfig contains the configuration options for a katalog-sync-daemon
type DaemonConfig struct {
MinSyncInterval time.Duration `long:"min-sync-interval" description:"minimum duration allowed for sync" default:"500ms"`
MaxSyncInterval time.Duration `long:"max-sync-interval" description:"maximum duration allowed for sync" default:"5s"`
Expand All @@ -27,6 +28,7 @@ type DaemonConfig struct {
SyncTTLBuffer time.Duration `long:"sync-ttl-buffer-duration" description:"how much time to ensure is between sync time and ttl" default:"10s"`
}

// NewDaemon is a helper function to return a new *Daemon
func NewDaemon(c DaemonConfig, k8sClient Kubelet, consulClient *consulApi.Client) *Daemon {
return &Daemon{
c: c,
Expand Down Expand Up @@ -65,6 +67,10 @@ func (d *Daemon) doSync(ctx context.Context) error {
}
}

// Register handles a sidecar request for registration. This will block until
// (1) the pod excluding the sidecar container is ready
// (2) the service has been pushed to the agent services API
// (3) the entry shows up in the catalog API (meaning it synced to the cluster)
func (d *Daemon) Register(ctx context.Context, in *katalogsync.RegisterQuery) (*katalogsync.RegisterResult, error) {
if err := d.doSync(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -118,6 +124,9 @@ func (d *Daemon) Register(ctx context.Context, in *katalogsync.RegisterQuery) (*
}
}

// Deregister handles a sidecar request for deregistration. This will block until
// (2) the service has been removed from the agent services API
// (3) the entry has been removed from the catalog API (meaning it synced to the cluster)
func (d *Daemon) Deregister(ctx context.Context, in *katalogsync.DeregisterQuery) (*katalogsync.DeregisterResult, error) {
if err := d.doSync(ctx); err != nil {
return nil, err
Expand Down Expand Up @@ -364,6 +373,7 @@ func (d *Daemon) syncConsul() error {

type consulNodeFunc func(*consulApi.CatalogNode) bool

// ConsulNodeDoUntil is a helper to wait until a change has propogated into the CatalogAPI
func (d *Daemon) ConsulNodeDoUntil(ctx context.Context, nodeName string, opts *consulApi.QueryOptions, f consulNodeFunc) error {
for {
// If the client is no longer waiting, lets stop checking
Expand Down
4 changes: 4 additions & 0 deletions pkg/daemon/interface.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,18 @@ import (
k8sApi "k8s.io/api/core/v1"
)

// Kubelet encapsulates the interface for kubelet interaction
type Kubelet interface {
GetPodList() (*k8sApi.PodList, error)
}

// ConsulCatalog encapsulates the interface for interacting with the Catalog API
type ConsulCatalog interface {
Services() (map[string]*consulApi.AgentService, error)
}

// ConsulAgent encapsulates the interface for interacting with the local agent
// and service API
type ConsulAgent interface {
UpdateTTL(checkID, output, status string) error
Services() (map[string]*consulApi.AgentService, error)
Expand Down
4 changes: 4 additions & 0 deletions pkg/daemon/k8s.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,13 @@ import (
"k8s.io/client-go/rest"
)

// KubeletClientConfig holds the config options for connecting to the kubelet API
type KubeletClientConfig struct {
APIEndpoint string `long:"kubelet-api" description:"kubelet API endpoint" default:"http://localhost:10255/pods"`
InsecureSkipVerify bool `long:"kubelet-api-insecure-skip-verify" description:"skip verification of TLS certificate from kubelet API"`
}

// NewKubeletClient returns a new KubeletClient based on the given config
func NewKubeletClient(c KubeletClientConfig) (*KubeletClient, error) {
// creates the in-cluster config
config, err := rest.InClusterConfig()
Expand Down Expand Up @@ -42,11 +44,13 @@ func NewKubeletClient(c KubeletClientConfig) (*KubeletClient, error) {
return &KubeletClient{c: c, client: &http.Client{Transport: transport}}, nil
}

// KubeletClient is an HTTP client for kubelet that implements the Kubelet interface
type KubeletClient struct {
c KubeletClientConfig
client *http.Client
}

// GetPodList returns the list of pods the kubelet is managing
func (k *KubeletClient) GetPodList() (*k8sApi.PodList, error) {
// k8s testing
req, err := http.NewRequest("GET", k.c.APIEndpoint, nil)
Expand Down
17 changes: 15 additions & 2 deletions pkg/daemon/struct.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ var (
ConsulServiceCheckTTL = "katalog-sync.wish.com/service-check-ttl" // TTL for the service checks we put in consul
)

// NewPod returns a daemon pod based on a config and a k8s pod
func NewPod(pod k8sApi.Pod, dc *DaemonConfig) (*Pod, error) {
var sidecarState *SidecarState
// If we have an annotation saying we have a sidecar, lets load it
Expand Down Expand Up @@ -84,7 +85,7 @@ func NewPod(pod k8sApi.Pod, dc *DaemonConfig) (*Pod, error) {

}

// Our representation of a pod in k8s
// Pod is our representation of a pod in k8s
type Pod struct {
k8sApi.Pod
*SidecarState
Expand All @@ -100,7 +101,7 @@ func (p *Pod) HasChange(service *consulApi.AgentService) bool {
return false
}

// GetID returns an identifier that addresses this pod.
// GetServiceID returns an identifier that addresses this pod.
func (p *Pod) GetServiceID(serviceName string) string {
// ServiceID is katalog-sync_service_namespace_pod
return strings.Join([]string{
Expand All @@ -111,14 +112,17 @@ func (p *Pod) GetServiceID(serviceName string) string {
}, "_")
}

// UpdatePod updates the k8s pod
func (p *Pod) UpdatePod(k8sPod k8sApi.Pod) {
p.Pod = k8sPod
}

// GetServiceNames returns the list of service names defined in the k8s annotations
func (p *Pod) GetServiceNames() []string {
return strings.Split(p.Pod.ObjectMeta.Annotations[ConsulServiceNames], ",")
}

// HasServiceName returns whether a given name is one of the annotated service names for this pod
func (p *Pod) HasServiceName(n string) bool {
for _, name := range p.GetServiceNames() {
if name == n {
Expand All @@ -128,6 +132,8 @@ func (p *Pod) HasServiceName(n string) bool {
return false
}

// GetTags returns the tags for a given service for this pod
// This first checks the service-specific tags, and falls back to the service-level tags
func (p *Pod) GetTags(n string) []string {
if tagStr, ok := p.Pod.ObjectMeta.Annotations[ConsulServiceTagsOverride+n]; ok {
return strings.Split(tagStr, ",")
Expand All @@ -136,6 +142,8 @@ func (p *Pod) GetTags(n string) []string {
return strings.Split(p.Pod.ObjectMeta.Annotations[ConsulServiceTags], ",")
}

// GetPort returns the port for a given service for this pod
// This first checks the service-specific port, and falls back to the service-level port
func (p *Pod) GetPort(n string) int {
if portStr, ok := p.Pod.ObjectMeta.Annotations[ConsulServicePortOverride+n]; ok {
port, err := strconv.Atoi(portStr)
Expand Down Expand Up @@ -196,8 +204,10 @@ type SidecarState struct {
Ready bool
}

// SyncStatuses is a map of SyncStatus for each service defined in a pod (serviceName -> *SyncStatus)
type SyncStatuses map[string]*SyncStatus

// GetStatus returns the SyncStatus for the given serviceName
func (s SyncStatuses) GetStatus(n string) *SyncStatus {
status, ok := s[n]
if !ok {
Expand All @@ -207,6 +217,7 @@ func (s SyncStatuses) GetStatus(n string) *SyncStatus {
return status
}

// GetError returns the first error found in the set of SyncStatuses
func (s SyncStatuses) GetError() error {
for _, status := range s {
if status.LastError != nil {
Expand All @@ -217,11 +228,13 @@ func (s SyncStatuses) GetError() error {
return nil
}

// SyncStatus encapsulates the result of the last sync attempt
type SyncStatus struct {
LastUpdated time.Time
LastError error
}

// SetError sets the error and LastUpdated time for the status
func (s *SyncStatus) SetError(e error) {
s.LastError = e
s.LastUpdated = time.Now()
Expand Down

0 comments on commit dec9f34

Please sign in to comment.