Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix etcd in proxy for namespace awareness #1850

Merged
merged 1 commit into from
Oct 16, 2014
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
81 changes: 51 additions & 30 deletions pkg/proxy/config/etcd.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,10 +116,50 @@ func (s ConfigSourceEtcd) Run() {
}
}

// decodeServices recurses from the root of the service storage directory into each namespace to get each service and endpoint object
func (s ConfigSourceEtcd) decodeServices(node *etcd.Node, retServices []api.Service, retEndpoints []api.Endpoints) ([]api.Service, []api.Endpoints, error) {
// TODO this needs to go against API server desperately, so much redundant error prone code here
// we hit a namespace boundary, recurse until we find actual nodes
if node.Dir == true {
for _, n := range node.Nodes {
var err error // Don't shadow the ret* variables.
retServices, retEndpoints, err = s.decodeServices(n, retServices, retEndpoints)
if err != nil {
return retServices, retEndpoints, err
}
}
return retServices, retEndpoints, nil
}

// we have an actual service node
var svc api.Service
err := latest.Codec.DecodeInto([]byte(node.Value), &svc)
if err != nil {
glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err)
} else {
// so we got a service we can handle, and now get endpoints
retServices = append(retServices, svc)
// get the endpoints
endpoints, err := s.GetEndpoints(svc.Namespace, svc.ID)
if err != nil {
if tools.IsEtcdNotFound(err) {
glog.V(4).Infof("Unable to get endpoints for %s %s : %v", svc.Namespace, svc.ID, err)
}
glog.Errorf("Couldn't get endpoints for %s %s : %v skipping", svc.Namespace, svc.ID, err)
endpoints = api.Endpoints{}
} else {
glog.V(3).Infof("Got service: %s %s on localport %d mapping to: %s", svc.Namespace, svc.ID, svc.Port, endpoints)
}
retEndpoints = append(retEndpoints, endpoints)
}
return retServices, retEndpoints, nil
}

// GetServices finds the list of services and their endpoints from etcd.
// This operation is akin to a set a known good at regular intervals.
func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error) {
response, err := s.client.Get(registryRoot+"/specs", true, false)
// this is a recursive query now that services are namespaced under "/registry/services/specs/<ns>/<name>"
response, err := s.client.Get(registryRoot+"/specs", false, true)
if err != nil {
if tools.IsEtcdNotFound(err) {
glog.V(4).Infof("Failed to get the key %s: %v", registryRoot, err)
Expand All @@ -128,40 +168,18 @@ func (s ConfigSourceEtcd) GetServices() ([]api.Service, []api.Endpoints, error)
}
return []api.Service{}, []api.Endpoints{}, err
}
// this code needs to go through the API server in the future, this is one big hack
if response.Node.Dir == true {
retServices := make([]api.Service, len(response.Node.Nodes))
retEndpoints := make([]api.Endpoints, len(response.Node.Nodes))
// Ok, so we have directories, this list should be the list
// of services. Find the local port to listen on and remote endpoints
// and create a Service entry for it.
for i, node := range response.Node.Nodes {
var svc api.Service
err = latest.Codec.DecodeInto([]byte(node.Value), &svc)
if err != nil {
glog.Errorf("Failed to load Service: %s (%#v)", node.Value, err)
continue
}
retServices[i] = svc
endpoints, err := s.GetEndpoints(svc.ID)
if err != nil {
if tools.IsEtcdNotFound(err) {
glog.V(4).Infof("Unable to get endpoints for %s : %v", svc.ID, err)
}
glog.Errorf("Couldn't get endpoints for %s : %v skipping", svc.ID, err)
endpoints = api.Endpoints{}
} else {
glog.V(3).Infof("Got service: %s on localport %d mapping to: %s", svc.ID, svc.Port, endpoints)
}
retEndpoints[i] = endpoints
}
return retServices, retEndpoints, err
retServices := []api.Service{}
retEndpoints := []api.Endpoints{}
return s.decodeServices(response.Node, retServices, retEndpoints)
}
return nil, nil, fmt.Errorf("did not get the root of the registry %s", registryRoot)
}

// GetEndpoints finds the list of endpoints of the service from etcd.
func (s ConfigSourceEtcd) GetEndpoints(service string) (api.Endpoints, error) {
key := path.Join(registryRoot, "endpoints", service)
func (s ConfigSourceEtcd) GetEndpoints(namespace, service string) (api.Endpoints, error) {
key := path.Join(registryRoot, "endpoints", namespace, service)
response, err := s.client.Get(key, true, false)
if err != nil {
glog.Errorf("Failed to get the key: %s %v", key, err)
Expand Down Expand Up @@ -195,7 +213,10 @@ func (s ConfigSourceEtcd) WatchForChanges() {
if !ok {
break
}
s.ProcessChange(watchResponse)
// only listen for non directory changes
if watchResponse.Node.Dir == false {
s.ProcessChange(watchResponse)
}
}
}

Expand Down