Skip to content

Commit

Permalink
Implement the rest of the logger interfaces
Browse files Browse the repository at this point in the history
  • Loading branch information
dtomcej committed Mar 13, 2020
1 parent 8116418 commit 0528771
Show file tree
Hide file tree
Showing 13 changed files with 300 additions and 134 deletions.
44 changes: 23 additions & 21 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"github.com/containous/maesh/pkg/deploylog"
"github.com/containous/traefik/v2/pkg/safe"
"github.com/gorilla/mux"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus"
kubeerror "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/selection"
Expand All @@ -27,6 +27,7 @@ type Interface interface {

// API is an implementation of an api.
type API struct {
log logrus.FieldLogger
router *mux.Router
readiness bool
lastConfiguration *safe.Safe
Expand All @@ -44,8 +45,9 @@ type podInfo struct {
}

// NewAPI creates a new api.
func NewAPI(apiPort int32, apiHost string, lastConfiguration *safe.Safe, deployLog deploylog.Interface, podLister listers.PodLister, meshNamespace string) *API {
func NewAPI(log logrus.FieldLogger, apiPort int32, apiHost string, lastConfiguration *safe.Safe, deployLog deploylog.Interface, podLister listers.PodLister, meshNamespace string) *API {
a := &API{
log: log,
readiness: false,
lastConfiguration: lastConfiguration,
apiPort: apiPort,
Expand All @@ -64,7 +66,7 @@ func NewAPI(apiPort int32, apiHost string, lastConfiguration *safe.Safe, deployL

// Init handles any api initialization.
func (a *API) Init() error {
log.Debugln("API.Init")
a.log.Debugln("API.Init")

a.router = mux.NewRouter()

Expand All @@ -79,20 +81,20 @@ func (a *API) Init() error {

// Start runs the API.
func (a *API) Start() {
log.Debugln("API.Start")
a.log.Debugln("API.Start")

go a.Run()
}

// Run wraps the listenAndServe method.
func (a *API) Run() {
log.Error(http.ListenAndServe(fmt.Sprintf("%s:%d", a.apiHost, a.apiPort), a.router))
a.log.Error(http.ListenAndServe(fmt.Sprintf("%s:%d", a.apiHost, a.apiPort), a.router))
}

// EnableReadiness enables the readiness flag in the API.
func (a *API) EnableReadiness() {
if !a.readiness {
log.Debug("Controller Readiness enabled")
a.log.Debug("Controller Readiness enabled")

a.readiness = true
}
Expand All @@ -103,7 +105,7 @@ func (a *API) getCurrentConfiguration(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")

if err := json.NewEncoder(w).Encode(a.lastConfiguration.Get()); err != nil {
log.Error(err)
a.log.Error(err)
}
}

Expand All @@ -116,7 +118,7 @@ func (a *API) getReadiness(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")

if err := json.NewEncoder(w).Encode(a.readiness); err != nil {
log.Error(err)
a.log.Error(err)
}
}

Expand All @@ -126,14 +128,14 @@ func (a *API) getDeployLog(w http.ResponseWriter, r *http.Request) {

data, err := json.Marshal(entries)
if err != nil {
writeErrorResponse(w, fmt.Sprintf("unable to marshal deploy log entries: %v", err), http.StatusInternalServerError)
a.writeErrorResponse(w, fmt.Sprintf("unable to marshal deploy log entries: %v", err), http.StatusInternalServerError)
return
}

w.Header().Set("Content-Type", "application/json")

if _, err := w.Write(data); err != nil {
log.Error(err)
a.log.Error(err)
}
}

Expand All @@ -145,14 +147,14 @@ func (a *API) getMeshNodes(w http.ResponseWriter, r *http.Request) {

requirement, err := labels.NewRequirement("component", selection.Equals, []string{"maesh-mesh"})
if err != nil {
log.Error(err)
a.log.Error(err)
}

sel = sel.Add(*requirement)

podList, err := a.podLister.Pods(a.meshNamespace).List(sel)
if err != nil {
writeErrorResponse(w, fmt.Sprintf("unable to retrieve pod list: %v", err), http.StatusInternalServerError)
a.writeErrorResponse(w, fmt.Sprintf("unable to retrieve pod list: %v", err), http.StatusInternalServerError)
return
}

Expand All @@ -178,7 +180,7 @@ func (a *API) getMeshNodes(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")

if err := json.NewEncoder(w).Encode(podInfoList); err != nil {
log.Error(err)
a.log.Error(err)
}
}

Expand All @@ -189,43 +191,43 @@ func (a *API) getMeshNodeConfiguration(w http.ResponseWriter, r *http.Request) {
pod, err := a.podLister.Pods(a.meshNamespace).Get(vars["node"])
if err != nil {
if kubeerror.IsNotFound(err) {
writeErrorResponse(w, fmt.Sprintf("unable to find pod: %s", vars["node"]), http.StatusNotFound)
a.writeErrorResponse(w, fmt.Sprintf("unable to find pod: %s", vars["node"]), http.StatusNotFound)
return
}

writeErrorResponse(w, fmt.Sprintf("unable to retrieve pod: %v", err), http.StatusInternalServerError)
a.writeErrorResponse(w, fmt.Sprintf("unable to retrieve pod: %v", err), http.StatusInternalServerError)

return
}

resp, err := http.Get(fmt.Sprintf("http://%s:8080/api/rawdata", pod.Status.PodIP))
if err != nil {
writeErrorResponse(w, fmt.Sprintf("unable to get configuration from pod: %v", err), http.StatusBadGateway)
a.writeErrorResponse(w, fmt.Sprintf("unable to get configuration from pod: %v", err), http.StatusBadGateway)
return
}

defer resp.Body.Close()

body, err := ioutil.ReadAll(resp.Body)
if err != nil {
writeErrorResponse(w, fmt.Sprintf("unable to get configuration response body from pod: %v", err), http.StatusBadGateway)
a.writeErrorResponse(w, fmt.Sprintf("unable to get configuration response body from pod: %v", err), http.StatusBadGateway)
return
}

w.Header().Set("Content-Type", "application/json")

if _, err := w.Write(body); err != nil {
log.Error(err)
a.log.Error(err)
}
}

func writeErrorResponse(w http.ResponseWriter, errorMessage string, status int) {
func (a *API) writeErrorResponse(w http.ResponseWriter, errorMessage string, status int) {
w.WriteHeader(status)
log.Error(errorMessage)
a.log.Error(errorMessage)

w.Header().Set("Content-Type", "text/plain; charset=us-ascii")

if _, err := w.Write([]byte(errorMessage)); err != nil {
log.Error(err)
a.log.Error(err)
}
}
53 changes: 41 additions & 12 deletions pkg/api/api_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import (
"net"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

Expand All @@ -14,6 +15,7 @@ import (
"github.com/containous/traefik/v2/pkg/safe"
"github.com/containous/traefik/v2/pkg/testhelpers"
"github.com/gorilla/mux"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

Expand All @@ -23,7 +25,12 @@ var (

func TestEnableReadiness(t *testing.T) {
config := safe.Safe{}
api := NewAPI(9000, localhost, &config, nil, nil, "foo")
log := logrus.New()

log.SetOutput(os.Stdout)
log.SetLevel(logrus.DebugLevel)

api := NewAPI(log, 9000, localhost, &config, nil, nil, "foo")

assert.Equal(t, false, api.readiness)

Expand Down Expand Up @@ -55,7 +62,12 @@ func TestGetReadiness(t *testing.T) {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
config := safe.Safe{}
api := NewAPI(9000, localhost, &config, nil, nil, "foo")
log := logrus.New()

log.SetOutput(os.Stdout)
log.SetLevel(logrus.DebugLevel)

api := NewAPI(log, 9000, localhost, &config, nil, nil, "foo")
api.readiness = test.readiness

res := httptest.NewRecorder()
Expand All @@ -70,7 +82,12 @@ func TestGetReadiness(t *testing.T) {

func TestGetCurrentConfiguration(t *testing.T) {
config := safe.Safe{}
api := NewAPI(9000, localhost, &config, nil, nil, "foo")
log := logrus.New()

log.SetOutput(os.Stdout)
log.SetLevel(logrus.DebugLevel)

api := NewAPI(log, 9000, localhost, &config, nil, nil, "foo")

config.Set("foo")

Expand All @@ -84,11 +101,15 @@ func TestGetCurrentConfiguration(t *testing.T) {

func TestGetDeployLog(t *testing.T) {
config := safe.Safe{}
log := deploylog.NewDeployLog(1000)
api := NewAPI(9000, localhost, &config, log, nil, "foo")
log := logrus.New()

log.SetOutput(os.Stdout)
log.SetLevel(logrus.DebugLevel)

deploylog := deploylog.NewDeployLog(log, 1000)
api := NewAPI(log, 9000, localhost, &config, deploylog, nil, "foo")
currentTime := time.Now()
log.LogDeploy(currentTime, "foo", "bar", true, "blabla")
deploylog.LogDeploy(currentTime, "foo", "bar", true, "blabla")

data, err := currentTime.MarshalJSON()
assert.NoError(t, err)
Expand Down Expand Up @@ -137,14 +158,18 @@ func TestGetMeshNodes(t *testing.T) {
t.Run(test.desc, func(t *testing.T) {
t.Parallel()
config := safe.Safe{}
log := deploylog.NewDeployLog(1000)
log := logrus.New()

log.SetOutput(os.Stdout)
log.SetLevel(logrus.DebugLevel)

deploylog := deploylog.NewDeployLog(log, 1000)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clientMock := k8s.NewClientMock(ctx.Done(), test.mockFile, false)
api := NewAPI(9000, localhost, &config, log, clientMock.PodLister, "foo")

api := NewAPI(log, 9000, localhost, &config, deploylog, clientMock.PodLister, "foo")
res := httptest.NewRecorder()
req := testhelpers.MustNewRequest(http.MethodGet, "/api/status/nodes", nil)

Expand Down Expand Up @@ -183,14 +208,18 @@ func TestGetMeshNodeConfiguration(t *testing.T) {

for _, test := range testCases {
config := safe.Safe{}
log := deploylog.NewDeployLog(1000)
log := logrus.New()

log.SetOutput(os.Stdout)
log.SetLevel(logrus.DebugLevel)

deploylog := deploylog.NewDeployLog(log, 1000)

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

clientMock := k8s.NewClientMock(ctx.Done(), test.mockFile, false)
api := NewAPI(9000, localhost, &config, log, clientMock.PodLister, "foo")

api := NewAPI(log, 9000, localhost, &config, deploylog, clientMock.PodLister, "foo")
res := httptest.NewRecorder()
req := testhelpers.MustNewRequest(http.MethodGet, "/api/status/node/mesh-pod-1/configuration", nil)

Expand Down
12 changes: 6 additions & 6 deletions pkg/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,11 @@ func (c *Controller) init() {
c.kubernetesFactory = informers.NewSharedInformerFactoryWithOptions(c.clients.GetKubernetesClient(), k8s.ResyncPeriod)
c.ServiceLister = c.kubernetesFactory.Core().V1().Services().Lister()

c.serviceManager = NewShadowServiceManager(c.ServiceLister, c.meshNamespace, c.tcpStateTable, c.defaultMode, c.minHTTPPort, c.maxHTTPPort, c.clients.GetKubernetesClient())
c.serviceManager = NewShadowServiceManager(c.log, c.ServiceLister, c.meshNamespace, c.tcpStateTable, c.defaultMode, c.minHTTPPort, c.maxHTTPPort, c.clients.GetKubernetesClient())

// configRefreshChan is used to trigger configuration refreshes and deploys.
c.configRefreshChan = make(chan string)
c.handler = NewHandler(c.ignored, c.serviceManager, c.configRefreshChan)
c.handler = NewHandler(c.log, c.ignored, c.serviceManager, c.configRefreshChan)

c.kubernetesFactory.Core().V1().Services().Informer().AddEventHandler(c.handler)
c.kubernetesFactory.Core().V1().Endpoints().Informer().AddEventHandler(c.handler)
Expand All @@ -154,8 +154,8 @@ func (c *Controller) init() {
c.PodLister = c.kubernetesFactory.Core().V1().Pods().Lister()
c.EndpointsLister = c.kubernetesFactory.Core().V1().Endpoints().Lister()

c.deployLog = deploylog.NewDeployLog(1000)
c.api = api.NewAPI(c.apiPort, c.apiHost, &c.lastConfiguration, c.deployLog, c.PodLister, c.meshNamespace)
c.deployLog = deploylog.NewDeployLog(c.log, 1000)
c.api = api.NewAPI(c.log, c.apiPort, c.apiHost, &c.lastConfiguration, c.deployLog, c.PodLister, c.meshNamespace)

if c.smiEnabled {
// Create new SharedInformerFactories, and register the event handler to informers.
Expand All @@ -175,13 +175,13 @@ func (c *Controller) init() {
c.TCPRouteLister = c.specsFactory.Specs().V1alpha1().TCPRoutes().Lister()
c.TrafficSplitLister = c.splitFactory.Split().V1alpha2().TrafficSplits().Lister()

c.provider = smi.New(c.defaultMode, c.tcpStateTable, c.ignored, c.ServiceLister, c.EndpointsLister, c.PodLister, c.TrafficTargetLister, c.HTTPRouteGroupLister, c.TCPRouteLister, c.TrafficSplitLister, c.minHTTPPort, c.maxHTTPPort)
c.provider = smi.New(c.log, c.defaultMode, c.tcpStateTable, c.ignored, c.ServiceLister, c.EndpointsLister, c.PodLister, c.TrafficTargetLister, c.HTTPRouteGroupLister, c.TCPRouteLister, c.TrafficSplitLister, c.minHTTPPort, c.maxHTTPPort)

return
}

// If SMI is not configured, use the kubernetes provider.
c.provider = kubernetes.New(c.defaultMode, c.tcpStateTable, c.ignored, c.ServiceLister, c.EndpointsLister, c.minHTTPPort, c.maxHTTPPort)
c.provider = kubernetes.New(c.log, c.defaultMode, c.tcpStateTable, c.ignored, c.ServiceLister, c.EndpointsLister, c.minHTTPPort, c.maxHTTPPort)
}

// Run is the main entrypoint for the controller.
Expand Down
12 changes: 12 additions & 0 deletions pkg/controller/controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ package controller

import (
"context"
"os"
"testing"

"github.com/containous/maesh/pkg/k8s"
"github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
)

Expand All @@ -21,9 +23,14 @@ func TestNewController(t *testing.T) {
defer cancel()

clientMock := k8s.NewClientMock(ctx.Done(), "mock.yaml", false)
log := logrus.New()

log.SetOutput(os.Stdout)
log.SetLevel(logrus.DebugLevel)

// Create a new controller with base HTTP mode.
controller, err := NewMeshController(clientMock, MeshControllerConfig{
Log: log,
SMIEnabled: false,
DefaultMode: "http",
Namespace: meshNamespace,
Expand All @@ -43,9 +50,14 @@ func TestNewControllerWithSMI(t *testing.T) {
defer cancel()

clientMock := k8s.NewClientMock(ctx.Done(), "mock.yaml", true)
log := logrus.New()

log.SetOutput(os.Stdout)
log.SetLevel(logrus.DebugLevel)

// Create a new controller with base HTTP mode, in SMI mode.
controller, err := NewMeshController(clientMock, MeshControllerConfig{
Log: log,
SMIEnabled: true,
DefaultMode: "http",
Namespace: meshNamespace,
Expand Down

0 comments on commit 0528771

Please sign in to comment.