@@ -22,7 +22,6 @@ import (
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
"os/exec"
"path/filepath"
@@ -87,16 +86,6 @@ type ContainerFailures struct {
restarts int
}
type RCConfig struct {
Client * client.Client
Image string
Name string
Namespace string
PollInterval int
PodStatusFile * os.File
Replicas int
}
func Logf (format string , a ... interface {}) {
fmt .Fprintf (GinkgoWriter , "INFO: " + format + "\n " , a ... )
}
@@ -596,16 +585,16 @@ func (p PodDiff) Print(ignorePhases util.StringSet) {
}
// Diff computes a PodDiff given 2 lists of pods.
func Diff (oldPods [] api.Pod , curPods [] api.Pod ) PodDiff {
func Diff (oldPods * api.PodList , curPods * api.PodList ) PodDiff {
podInfoMap := PodDiff {}
// New pods will show up in the curPods list but not in oldPods. They have oldhostname/phase == nonexist.
for _ , pod := range curPods {
for _ , pod := range curPods . Items {
podInfoMap [pod .Name ] = & podInfo {hostname : pod .Spec .Host , phase : string (pod .Status .Phase ), oldHostname : nonExist , oldPhase : nonExist }
}
// Deleted pods will show up in the oldPods list but not in curPods. They have a hostname/phase == nonexist.
for _ , pod := range oldPods {
for _ , pod := range oldPods . Items {
if info , ok := podInfoMap [pod .Name ]; ok {
info .oldHostname , info .oldPhase = pod .Spec .Host , string (pod .Status .Phase )
} else {
@@ -619,20 +608,12 @@ func Diff(oldPods []api.Pod, curPods []api.Pod) PodDiff {
// It will waits for all pods it spawns to become "Running".
// It's the caller's responsibility to clean up externally (i.e. use the
// namespace lifecycle for handling cleanup).
func RunRC (config RCConfig ) error {
func RunRC (c * client. Client , name string , ns , image string , replicas int ) error {
var last int
c := config .Client
name := config .Name
ns := config .Namespace
image := config .Image
replicas := config .Replicas
interval := config .PollInterval
maxContainerFailures := int (math .Max (1.0 , float64 (replicas )* .01 ))
current := 0
same := 0
label := labels .SelectorFromSet (labels .Set (map [string ]string {"name" : name }))
podLists := newFifoQueue ()
By (fmt .Sprintf ("Creating replication controller %s" , name ))
rc := & api.ReplicationController {
@@ -666,52 +647,35 @@ func RunRC(config RCConfig) error {
}
Logf ("Created replication controller with name: %v, namespace: %v, replica count: %v" , rc .Name , ns , rc .Spec .Replicas )
// Create a routine to query for the list of pods
stop := make (chan struct {})
go func (stop <- chan struct {}, n string , ns string , l labels.Selector , i int ) {
for {
select {
case <- stop :
return
default :
p , err := c .Pods (ns ).List (l , fields .Everything ())
if err != nil {
Logf ("Warning: Failed to get pod list: %v" , err )
} else {
podLists .Push (p .Items )
}
time .Sleep (time .Duration (i ) * time .Second )
}
}
}(stop , name , ns , label , interval )
defer close (stop )
By (fmt .Sprintf ("Making sure all %d replicas of rc %s in namespace %s exist" , replicas , name , ns ))
failCount := int (25 / interval )
label := labels .SelectorFromSet (labels .Set (map [string ]string {"name" : name }))
pods , err := listPods (c , ns , label , fields .Everything ())
if err != nil {
return fmt .Errorf ("Error listing pods: %v" , err )
}
current = len (pods .Items )
failCount := 5
for same < failCount && current < replicas {
time .Sleep (time .Duration (float32 (interval )* 1.1 ) * time .Second )
// Greedily read all existing entries in the queue until
// all pods are found submitted or the queue is empty
for podLists .Len () > 0 && current < replicas {
item := podLists .Pop ()
pods := item .value .([]api.Pod )
current = len (pods )
Logf ("Controller %s: Found %d pods out of %d" , name , current , replicas )
if last < current {
same = 0
} else if last == current {
same ++
} else if current < last {
return fmt .Errorf ("Controller %s: Number of submitted pods dropped from %d to %d" , name , last , current )
}
Logf ("Controller %s: Found %d pods out of %d" , name , current , replicas )
if last < current {
same = 0
} else if last == current {
same ++
} else if current < last {
return fmt .Errorf ("Controller %s: Number of submitted pods dropped from %d to %d" , name , last , current )
}
if same >= failCount {
return fmt .Errorf ("Controller %s: No pods submitted for the last %d checks" , name , failCount )
}
if same >= failCount {
return fmt .Errorf ("Controller %s: No pods submitted for the last %d checks" , name , failCount )
}
last = current
last = current
time .Sleep (5 * time .Second )
pods , err = listPods (c , ns , label , fields .Everything ())
if err != nil {
return fmt .Errorf ("Error listing pods: %v" , err )
}
current = len (pods .Items )
}
if current != replicas {
return fmt .Errorf ("Controller %s: Only found %d replicas out of %d" , name , current , replicas )
@@ -721,91 +685,82 @@ func RunRC(config RCConfig) error {
By (fmt .Sprintf ("Waiting for all %d replicas to be running with a max container failures of %d" , replicas , maxContainerFailures ))
same = 0
last = 0
failCount = int ( 100 / interval )
failCount = 10
current = 0
var oldPods []api.Pod
podLists .Reset ()
foundAllPods := false
oldPods := & api.PodList {}
for same < failCount && current < replicas {
time .Sleep (time .Duration (float32 (interval )* 1.1 ) * time .Second )
// Greedily read all existing entries in the queue until
// either all pods are running or the queue is empty
for podLists .Len () > 0 && current < replicas {
item := podLists .Pop ()
current = 0
waiting := 0
pending := 0
unknown := 0
inactive := 0
failedContainers := 0
currentPods := item .value .([]api.Pod )
for _ , p := range currentPods {
if p .Status .Phase == api .PodRunning {
current ++
for _ , v := range FailedContainers (p ) {
failedContainers = failedContainers + v .restarts
}
} else if p .Status .Phase == api .PodPending {
if p .Spec .Host == "" {
waiting ++
} else {
pending ++
}
} else if p .Status .Phase == api .PodSucceeded || p .Status .Phase == api .PodFailed {
inactive ++
} else if p .Status .Phase == api .PodUnknown {
unknown ++
current = 0
waiting := 0
pending := 0
unknown := 0
inactive := 0
failedContainers := 0
time .Sleep (10 * time .Second )
// TODO: Use a reflector both to put less strain on the cluster and
// for more clarity.
currentPods , err := listPods (c , ns , label , fields .Everything ())
if err != nil {
return fmt .Errorf ("Error listing pods: %v" , err )
}
for _ , p := range currentPods .Items {
if p .Status .Phase == api .PodRunning {
current ++
for _ , v := range FailedContainers (p ) {
failedContainers = failedContainers + v .restarts
}
} else if p .Status .Phase == api .PodPending {
if p .Spec .Host == "" {
waiting ++
} else {
pending ++
}
} else if p .Status .Phase == api .PodSucceeded || p .Status .Phase == api .PodFailed {
inactive ++
} else if p .Status .Phase == api .PodUnknown {
unknown ++
}
Logf ("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown " , current , pending , waiting , inactive , unknown )
if config .PodStatusFile != nil {
fmt .Fprintf (config .PodStatusFile , "%s, %d, running, %d, pending, %d, waiting, %d, inactive, %d, unknown\n " , item .createTime , current , pending , waiting , inactive , unknown )
}
if foundAllPods && len (currentPods ) != len (oldPods ) {
// This failure mode includes:
// kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt .Sprintf ("Number of reported pods changed: %d vs %d" , len (currentPods ), len (oldPods ))
Logf ("%v, pods that changed since the last iteration:" , errorStr )
Diff (oldPods , currentPods ).Print (util .NewStringSet ())
return fmt .Errorf (errorStr )
}
if last < current {
same = 0
} else if last == current {
same ++
} else if current < last {
// The pod failed or succeeded, or was somehow pushed out of running by the kubelet.
errorStr := fmt .Sprintf ("Number of running pods dropped from %d to %d" , last , current )
Logf ("%v, pods that changed since the last iteration:" , errorStr )
Diff (oldPods , currentPods ).Print (util .NewStringSet ())
return fmt .Errorf (errorStr )
}
if same >= failCount {
// Most times this happens because a few nodes have kubelet problems, and their pods are
// stuck in pending.
errorStr := fmt .Sprintf ("No pods started for the last %d checks" , failCount )
Logf ("%v, pods currently in pending:" , errorStr )
Diff (currentPods , make ([]api.Pod , 0 )).Print (util .NewStringSet (string (api .PodRunning )))
return fmt .Errorf (errorStr )
}
if ! foundAllPods {
foundAllPods = len (currentPods ) == replicas
}
last = current
oldPods = currentPods
}
Logf ("Pod States: %d running, %d pending, %d waiting, %d inactive, %d unknown " , current , pending , waiting , inactive , unknown )
if len (currentPods .Items ) != len (pods .Items ) {
// This failure mode includes:
// kubelet is dead, so node controller deleted pods and rc creates more
// - diagnose by noting the pod diff below.
// pod is unhealthy, so replication controller creates another to take its place
// - diagnose by comparing the previous "2 Pod states" lines for inactive pods
errorStr := fmt .Sprintf ("Number of reported pods changed: %d vs %d" , len (currentPods .Items ), len (pods .Items ))
Logf ("%v, pods that changed since the last iteration:" , errorStr )
Diff (oldPods , currentPods ).Print (util .NewStringSet ())
return fmt .Errorf (errorStr )
}
if last < current {
same = 0
} else if last == current {
same ++
} else if current < last {
// The pod failed or succeeded, or was somehow pushed out of running by the kubelet.
errorStr := fmt .Sprintf ("Number of running pods dropped from %d to %d" , last , current )
Logf ("%v, pods that changed since the last iteration:" , errorStr )
Diff (oldPods , currentPods ).Print (util .NewStringSet ())
return fmt .Errorf (errorStr )
}
if same >= failCount {
// Most times this happens because a few nodes have kubelet problems, and their pods are
// stuck in pending.
errorStr := fmt .Sprintf ("No pods started for the last %d checks" , failCount )
Logf ("%v, pods currently in pending:" , errorStr )
Diff (currentPods , & api.PodList {}).Print (util .NewStringSet (string (api .PodRunning )))
return fmt .Errorf (errorStr )
}
last = current
oldPods = currentPods
if failedContainers > maxContainerFailures {
return fmt .Errorf ("%d containers failed which is more than allowed %d" , failedContainers , maxContainerFailures )
}
if failedContainers > maxContainerFailures {
return fmt .Errorf ("%d containers failed which is more than allowed %d" , failedContainers , maxContainerFailures )
}
}
if current != replicas {
@@ -1059,7 +1014,7 @@ type LatencyMetric struct {
}
func ReadLatencyMetrics (c * client.Client ) ([]LatencyMetric , error ) {
body , err := getMetrics ( c )
body , err := c . Get (). AbsPath ( "/metrics" ). DoRaw ( )
if err != nil {
return nil , err
}
@@ -1115,74 +1070,3 @@ func HighLatencyRequests(c *client.Client, threshold time.Duration, ignoredResou
return len (badMetrics ), nil
}
// Retrieve metrics information
func getMetrics (c * client.Client ) (string , error ) {
body , err := c .Get ().AbsPath ("/metrics" ).DoRaw ()
if err != nil {
return "" , err
}
return string (body ), nil
}
// Retrieve debug information
func getDebugInfo (c * client.Client ) (map [string ]string , error ) {
data := make (map [string ]string )
for _ , key := range []string {"block" , "goroutine" , "heap" , "threadcreate" } {
resp , err := http .Get (c .Get ().AbsPath (fmt .Sprintf ("debug/pprof/%s" , key )).URL ().String () + "?debug=2" )
body , err := ioutil .ReadAll (resp .Body )
resp .Body .Close ()
if err != nil {
Logf ("Warning: Error trying to fetch %s debug data: %v" , key , err )
}
data [key ] = string (body )
}
return data , nil
}
func writePerfData (c * client.Client , dirName string , postfix string ) error {
fname := fmt .Sprintf ("%s/metrics_%s.txt" , dirName , postfix )
handler , err := os .Create (fname )
if err != nil {
return fmt .Errorf ("Error creating file '%s': %v" , fname , err )
}
metrics , err := getMetrics (c )
if err != nil {
return fmt .Errorf ("Error retrieving metrics: %v" , err )
}
_ , err = handler .WriteString (metrics )
if err != nil {
return fmt .Errorf ("Error writing metrics: %v" , err )
}
err = handler .Close ()
if err != nil {
return fmt .Errorf ("Error closing '%s': %v" , fname , err )
}
debug , err := getDebugInfo (c )
if err != nil {
return fmt .Errorf ("Error retrieving debug information: %v" , err )
}
for key , value := range debug {
fname := fmt .Sprintf ("%s/%s_%s.txt" , dirName , key , postfix )
handler , err = os .Create (fname )
if err != nil {
return fmt .Errorf ("Error creating file '%s': %v" , fname , err )
}
_ , err = handler .WriteString (value )
if err != nil {
return fmt .Errorf ("Error writing %s: %v" , key , err )
}
err = handler .Close ()
if err != nil {
return fmt .Errorf ("Error closing '%s': %v" , fname , err )
}
}
return nil
}