Skip to content

Commit

Permalink
feat: added deployment and replicaset log output during app deploy
Browse files Browse the repository at this point in the history
  • Loading branch information
arthurcgc committed May 25, 2021
1 parent e8d8fdb commit 7344d4d
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 8 deletions.
2 changes: 1 addition & 1 deletion etc/tsuru-local.conf
@@ -1,5 +1,5 @@
listen: "0.0.0.0:8080"
host: <your inet ipv4>:8080
host: http://192.168.1.104:8080
use-tls: false
database:
url: 127.0.0.1:27017
Expand Down
87 changes: 80 additions & 7 deletions provision/kubernetes/deploy.go
Expand Up @@ -954,6 +954,38 @@ func listOptsForPodEvent(podName string) metav1.ListOptions {
}
}

func listOptsForResourceEvent(resourceType, resourceName string) metav1.ListOptions {
selector := map[string]string{
"involvedObject.kind": resourceType,
}
if resourceName != "" {
selector["involvedObject.name"] = resourceName
}
return metav1.ListOptions{
FieldSelector: labels.SelectorFromSet(labels.Set(selector)).String(),
}
}

func filteredResourceEvents(ctx context.Context, client *ClusterClient, evtResourceVersion, resourceType, resourceName, namespace string) (watch.Interface, error) {
var err error
client, err = NewClusterClient(client.Cluster)
if err != nil {
return nil, err
}
err = client.SetTimeout(time.Hour)
if err != nil {
return nil, err
}
opts := listOptsForResourceEvent(resourceType, resourceName)
opts.Watch = true
opts.ResourceVersion = evtResourceVersion
evtWatch, err := client.CoreV1().Events(namespace).Watch(ctx, opts)
if err != nil {
return nil, errors.WithStack(err)
}
return evtWatch, nil
}

func isDeploymentEvent(msg watch.Event, dep *appsv1.Deployment) bool {
evt, ok := msg.Object.(*apiv1.Event)
return ok && strings.HasPrefix(evt.Name, dep.Name)
Expand Down Expand Up @@ -986,18 +1018,43 @@ func monitorDeployment(ctx context.Context, client *ClusterClient, dep *appsv1.D
if err != nil {
return revision, err
}
watch, err := filteredPodEvents(ctx, client, evtResourceVersion, "", ns)
watchPods, err := filteredPodEvents(ctx, client, evtResourceVersion, "", ns)
if err != nil {
return revision, err
}
watchCh := watch.ResultChan()
watchPodCh := watchPods.ResultChan()
defer func() {
watch.Stop()
if watchCh != nil {
watchPods.Stop()
if watchPodCh != nil {
// Drain watch channel to avoid goroutine leaks.
<-watchCh
<-watchPodCh
}
}()
watchDep, err := filteredResourceEvents(ctx, client, evtResourceVersion, "Deployment", dep.Name, ns)
if err != nil {
return revision, err
}
watchDepCh := watchDep.ResultChan()
defer func() {
watchDep.Stop()
if watchDepCh != nil {
// Drain watch channel to avoid goroutine leaks.
<-watchDepCh
}
}()
watchReplicaSet, err := filteredResourceEvents(ctx, client, evtResourceVersion, "ReplicaSet", "", ns)
if err != nil {
return revision, err
}
watchRepCh := watchReplicaSet.ResultChan()
defer func() {
watchReplicaSet.Stop()
if watchRepCh != nil {
// Drain watch channel to avoid goroutine leaks.
<-watchRepCh
}
}()

fmt.Fprintf(w, "\n---- Updating units [%s] [version %d] ----\n", processName, version.Version())
kubeConf := getKubeConfig()
timer := time.NewTimer(kubeConf.DeploymentProgressTimeout)
Expand Down Expand Up @@ -1073,9 +1130,25 @@ func monitorDeployment(ctx context.Context, client *ClusterClient, dep *appsv1.D
}
select {
case <-time.After(100 * time.Millisecond):
case msg, isOpen := <-watchCh:
case msg, isOpen := <-watchPodCh:
if !isOpen {
watchPodCh = nil
break
}
if isDeploymentEvent(msg, dep) {
fmt.Fprintf(w, " ---> %s\n", formatEvtMessage(msg, false))
}
case msg, isOpen := <-watchDepCh:
if !isOpen {
watchDepCh = nil
break
}
if isDeploymentEvent(msg, dep) {
fmt.Fprintf(w, " ---> %s\n", formatEvtMessage(msg, false))
}
case msg, isOpen := <-watchRepCh:
if !isOpen {
watchCh = nil
watchRepCh = nil
break
}
if isDeploymentEvent(msg, dep) {
Expand Down

0 comments on commit 7344d4d

Please sign in to comment.