Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Second step in decoupling the service controller, use the apiserver for writes too. #1465

Merged
merged 1 commit into from
Sep 26, 2014
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
20 changes: 20 additions & 0 deletions pkg/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -448,6 +448,26 @@ func (c *Client) WatchEndpoints(label, field labels.Selector, resourceVersion ui
Watch()
}

func (c *Client) CreateEndpoints(endpoints *api.Endpoints) (*api.Endpoints, error) {
result := &api.Endpoints{}
err := c.Post().Path("endpoints").Body(endpoints).Do().Into(result)
return result, err
}

func (c *Client) UpdateEndpoints(endpoints *api.Endpoints) (*api.Endpoints, error) {
result := &api.Endpoints{}
if endpoints.ResourceVersion == 0 {
return nil, fmt.Errorf("invalid update object, missing resource version: %v", endpoints)
}
err := c.Put().
Path("endpoints").
Path(endpoints.ID).
Body(endpoints).
Do().
Into(result)
return result, err
}

// ServerVersion retrieves and parses the server's version.
func (c *Client) ServerVersion() (*version.Info, error) {
body, err := c.Get().AbsPath("/version").Do().Raw()
Expand Down
35 changes: 31 additions & 4 deletions pkg/registry/endpoint/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,13 @@ package endpoint

import (
"errors"
"fmt"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/apiserver"
"github.com/GoogleCloudPlatform/kubernetes/pkg/labels"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
"github.com/GoogleCloudPlatform/kubernetes/pkg/watch"
)

Expand Down Expand Up @@ -56,14 +59,38 @@ func (rs *REST) Watch(ctx api.Context, label, field labels.Selector, resourceVer
return rs.registry.WatchEndpoints(ctx, label, field, resourceVersion)
}

// Create satisfies the RESTStorage interface but is unimplemented.
// Create satisfies the RESTStorage interface.
func (rs *REST) Create(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) {
return nil, errors.New("unimplemented")
endpoints, ok := obj.(*api.Endpoints)
if !ok {
return nil, fmt.Errorf("not an endpoints: %#v", obj)
}
if len(endpoints.ID) == 0 {
return nil, fmt.Errorf("id is required: %#v", obj)
}
endpoints.CreationTimestamp = util.Now()
return apiserver.MakeAsync(func() (runtime.Object, error) {
err := rs.registry.UpdateEndpoints(ctx, endpoints)
if err != nil {
return nil, err
}
return rs.registry.GetEndpoints(ctx, endpoints.ID)
}), nil
}

// Update satisfies the RESTStorage interface but is unimplemented.
// Update satisfies the RESTStorage interface.
func (rs *REST) Update(ctx api.Context, obj runtime.Object) (<-chan runtime.Object, error) {
return nil, errors.New("unimplemented")
endpoints, ok := obj.(*api.Endpoints)
if !ok {
return nil, fmt.Errorf("not an endpoints: %#v", obj)
}
return apiserver.MakeAsync(func() (runtime.Object, error) {
err := rs.registry.UpdateEndpoints(ctx, endpoints)
if err != nil {
return nil, err
}
return rs.registry.GetEndpoints(ctx, endpoints.ID)
}), nil
}

// Delete satisfies the RESTStorage interface but is unimplemented.
Expand Down
59 changes: 54 additions & 5 deletions pkg/service/endpoints_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"fmt"
"net"
"strconv"
"strings"

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
Expand Down Expand Up @@ -72,11 +73,35 @@ func (e *EndpointController) SyncServiceEndpoints() error {
}
endpoints[ix] = net.JoinHostPort(pod.CurrentState.PodIP, strconv.Itoa(port))
}
// TODO: this is totally broken, we need to compute this and store inside an AtomicUpdate loop.
err = e.serviceRegistry.UpdateEndpoints(api.NewContext(), &api.Endpoints{
JSONBase: api.JSONBase{ID: service.ID},
Endpoints: endpoints,
})
currentEndpoints, err := e.client.GetEndpoints(service.ID)
if err != nil {
// TODO this is brittle as all get out, refactor the client libraries to return a structured error.
if strings.Contains(err.Error(), "(404)") {
currentEndpoints = &api.Endpoints{
JSONBase: api.JSONBase{
ID: service.ID,
},
}
} else {
glog.Errorf("Error getting endpoints: %#v", err)
continue
}
}
newEndpoints := &api.Endpoints{}
*newEndpoints = *currentEndpoints
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Nit; suggest in the future: newEndpoints := *currentEndpoints)

newEndpoints.Endpoints = endpoints

if currentEndpoints.ResourceVersion == 0 {
// No previous endpoints, create them
_, err = e.client.CreateEndpoints(newEndpoints)
} else {
// Pre-existing
if endpointsEqual(currentEndpoints, endpoints) {
glog.V(2).Infof("endpoints are equal for %s, skipping update", service.ID)
continue
}
_, err = e.client.UpdateEndpoints(newEndpoints)
}
if err != nil {
glog.Errorf("Error updating endpoints: %#v", err)
continue
Expand All @@ -85,6 +110,30 @@ func (e *EndpointController) SyncServiceEndpoints() error {
return resultErr
}

func containsEndpoint(endpoints *api.Endpoints, endpoint string) bool {
if endpoints == nil {
return false
}
for ix := range endpoints.Endpoints {
if endpoints.Endpoints[ix] == endpoint {
return true
}
}
return false
}

func endpointsEqual(e *api.Endpoints, endpoints []string) bool {
if len(e.Endpoints) != len(endpoints) {
return false
}
for _, endpoint := range endpoints {
if !containsEndpoint(e, endpoint) {
return false
}
}
return true
}

// findPort locates the container port for the given manifest and portName.
func findPort(manifest *api.ContainerManifest, portName util.IntOrString) (int, error) {
if ((portName.Kind == util.IntstrString && len(portName.StrVal) == 0) ||
Expand Down
110 changes: 96 additions & 14 deletions pkg/service/endpoints_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,10 @@ import (

"github.com/GoogleCloudPlatform/kubernetes/pkg/api"
_ "github.com/GoogleCloudPlatform/kubernetes/pkg/api/latest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/api/v1beta1"
"github.com/GoogleCloudPlatform/kubernetes/pkg/client"
"github.com/GoogleCloudPlatform/kubernetes/pkg/registry/registrytest"
"github.com/GoogleCloudPlatform/kubernetes/pkg/runtime"
"github.com/GoogleCloudPlatform/kubernetes/pkg/util"
)

Expand Down Expand Up @@ -127,7 +129,7 @@ type serverResponse struct {
obj interface{}
}

func makeTestServer(t *testing.T, podResponse serverResponse, serviceResponse serverResponse) *httptest.Server {
func makeTestServer(t *testing.T, podResponse serverResponse, serviceResponse serverResponse, endpointsResponse serverResponse) (*httptest.Server, *util.FakeHandler) {
fakePodHandler := util.FakeHandler{
StatusCode: podResponse.statusCode,
ResponseBody: util.EncodeJSON(podResponse.obj),
Expand All @@ -136,20 +138,27 @@ func makeTestServer(t *testing.T, podResponse serverResponse, serviceResponse se
StatusCode: serviceResponse.statusCode,
ResponseBody: util.EncodeJSON(serviceResponse.obj),
}
fakeEndpointsHandler := util.FakeHandler{
StatusCode: endpointsResponse.statusCode,
ResponseBody: util.EncodeJSON(endpointsResponse.obj),
}
mux := http.NewServeMux()
mux.Handle("/api/v1beta1/pods", &fakePodHandler)
mux.Handle("/api/v1beta1/services", &fakeServiceHandler)
mux.Handle("/api/v1beta1/endpoints", &fakeEndpointsHandler)
mux.Handle("/api/v1beta1/endpoints/", &fakeEndpointsHandler)
mux.HandleFunc("/", func(res http.ResponseWriter, req *http.Request) {
t.Errorf("unexpected request: %v", req.RequestURI)
res.WriteHeader(http.StatusNotFound)
})
return httptest.NewServer(mux)
return httptest.NewServer(mux), &fakeEndpointsHandler
}

func TestSyncEndpointsEmpty(t *testing.T) {
testServer := makeTestServer(t,
testServer, _ := makeTestServer(t,
serverResponse{http.StatusOK, newPodList(0)},
serverResponse{http.StatusOK, api.ServiceList{}})
serverResponse{http.StatusOK, api.ServiceList{}},
serverResponse{http.StatusOK, api.Endpoints{}})
client := client.NewOrDie(testServer.URL, "v1beta1", nil)
serviceRegistry := registrytest.ServiceRegistry{}
endpoints := NewEndpointController(&serviceRegistry, client)
Expand All @@ -159,9 +168,10 @@ func TestSyncEndpointsEmpty(t *testing.T) {
}

func TestSyncEndpointsError(t *testing.T) {
testServer := makeTestServer(t,
testServer, _ := makeTestServer(t,
serverResponse{http.StatusOK, newPodList(0)},
serverResponse{http.StatusInternalServerError, api.ServiceList{}})
serverResponse{http.StatusInternalServerError, api.ServiceList{}},
serverResponse{http.StatusOK, api.Endpoints{}})
client := client.NewOrDie(testServer.URL, "v1beta1", nil)
serviceRegistry := registrytest.ServiceRegistry{
Err: fmt.Errorf("test error"),
Expand All @@ -172,29 +182,100 @@ func TestSyncEndpointsError(t *testing.T) {
}
}

func TestSyncEndpointsItems(t *testing.T) {
func TestSyncEndpointsItemsPreexisting(t *testing.T) {
serviceList := api.ServiceList{
Items: []api.Service{
{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{
"foo": "bar",
},
},
},
}
testServer, endpointsHandler := makeTestServer(t,
serverResponse{http.StatusOK, newPodList(1)},
serverResponse{http.StatusOK, serviceList},
serverResponse{http.StatusOK, api.Endpoints{
JSONBase: api.JSONBase{
ID: "foo",
ResourceVersion: 1,
},
Endpoints: []string{"6.7.8.9:1000"},
}})
client := client.NewOrDie(testServer.URL, "v1beta1", nil)
serviceRegistry := registrytest.ServiceRegistry{}
endpoints := NewEndpointController(&serviceRegistry, client)
if err := endpoints.SyncServiceEndpoints(); err != nil {
t.Errorf("unexpected error: %v", err)
}
data := runtime.EncodeOrDie(v1beta1.Codec, &api.Endpoints{
JSONBase: api.JSONBase{
ID: "foo",
ResourceVersion: 1,
},
Endpoints: []string{"1.2.3.4:8080"},
})
endpointsHandler.ValidateRequest(t, "/api/v1beta1/endpoints/foo", "PUT", &data)
}

func TestSyncEndpointsItemsPreexistingIdentical(t *testing.T) {
serviceList := api.ServiceList{
Items: []api.Service{
{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{
"foo": "bar",
},
},
},
}
testServer := makeTestServer(t,
testServer, endpointsHandler := makeTestServer(t,
serverResponse{http.StatusOK, newPodList(1)},
serverResponse{http.StatusOK, serviceList})
serverResponse{http.StatusOK, serviceList},
serverResponse{http.StatusOK, api.Endpoints{
JSONBase: api.JSONBase{
ResourceVersion: 1,
},
Endpoints: []string{"1.2.3.4:8080"},
}})
client := client.NewOrDie(testServer.URL, "v1beta1", nil)
serviceRegistry := registrytest.ServiceRegistry{}
endpoints := NewEndpointController(&serviceRegistry, client)
if err := endpoints.SyncServiceEndpoints(); err != nil {
t.Errorf("unexpected error: %v", err)
}
if len(serviceRegistry.Endpoints.Endpoints) != 1 ||
serviceRegistry.Endpoints.Endpoints[0] != "1.2.3.4:8080" {
t.Errorf("Unexpected endpoints update: %#v", serviceRegistry.Endpoints)
endpointsHandler.ValidateRequest(t, "/api/v1beta1/endpoints/foo", "GET", nil)
}

func TestSyncEndpointsItems(t *testing.T) {
serviceList := api.ServiceList{
Items: []api.Service{
{
JSONBase: api.JSONBase{ID: "foo"},
Selector: map[string]string{
"foo": "bar",
},
},
},
}
testServer, endpointsHandler := makeTestServer(t,
serverResponse{http.StatusOK, newPodList(1)},
serverResponse{http.StatusOK, serviceList},
serverResponse{http.StatusOK, api.Endpoints{}})
client := client.NewOrDie(testServer.URL, "v1beta1", nil)
serviceRegistry := registrytest.ServiceRegistry{}
endpoints := NewEndpointController(&serviceRegistry, client)
if err := endpoints.SyncServiceEndpoints(); err != nil {
t.Errorf("unexpected error: %v", err)
}
data := runtime.EncodeOrDie(v1beta1.Codec, &api.Endpoints{
JSONBase: api.JSONBase{
ResourceVersion: 0,
},
Endpoints: []string{"1.2.3.4:8080"},
})
endpointsHandler.ValidateRequest(t, "/api/v1beta1/endpoints", "POST", &data)
}

func TestSyncEndpointsPodError(t *testing.T) {
Expand All @@ -207,9 +288,10 @@ func TestSyncEndpointsPodError(t *testing.T) {
},
},
}
testServer := makeTestServer(t,
testServer, _ := makeTestServer(t,
serverResponse{http.StatusInternalServerError, api.PodList{}},
serverResponse{http.StatusOK, serviceList})
serverResponse{http.StatusOK, serviceList},
serverResponse{http.StatusOK, api.Endpoints{}})
client := client.NewOrDie(testServer.URL, "v1beta1", nil)
serviceRegistry := registrytest.ServiceRegistry{
List: api.ServiceList{
Expand Down