Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Added a fake_time package to deterministically test time critical eve…
…nts.
- Loading branch information
1 parent
91d306a
commit f5fe9f6
Showing
2 changed files
with
295 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,138 @@ | ||
/* | ||
Copyright 2021 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package faketime | ||
|
||
import ( | ||
"fmt" | ||
"sync" | ||
"time" | ||
) | ||
|
||
// event holds the information to unblock sleeping processes. Calls to Advance() will push a new event | ||
// with the updated time to all event channels. Processes blocked by Sleep() will receive these events | ||
// and unblock when their sleep duration has expired. | ||
type event struct { | ||
time time.Time // current global time. | ||
cancel bool // determine whether the sleeping process should cancel sleep. | ||
} | ||
|
||
const ( | ||
// DefaultTime is used to initialize the global time. | ||
// This represents September 22, 2002 at 00:00:00. | ||
DefaultTime string = "020106 000000" | ||
) | ||
|
||
var ( | ||
// GlobalTime is the primary source of truth for measuring "fake" time. | ||
GlobalTime time.Time | ||
// mutex is used to prevent race conditions when reading or writing any global variable. | ||
mutex sync.Mutex | ||
// events holds all event channels for sleeping processes. | ||
events []chan event | ||
// SleepGroup is used to ensure all Sleep() processes are blocked and listening to their event channel. | ||
SleepGroup sync.WaitGroup | ||
// eventGroup is used after a call to Advance(), ensuring all sleeping processes have received their event. | ||
eventGroup sync.WaitGroup | ||
) | ||
|
||
// Init defines the starting state to mock the time.Sleep function. | ||
func Init() { | ||
var err error | ||
GlobalTime, err = time.Parse(DefaultTime, DefaultTime) | ||
if err != nil { | ||
msg := fmt.Sprintf("The default time %s could not be parsed in fake_time.Init()", DefaultTime) | ||
panic(msg) | ||
} | ||
mutex = sync.Mutex{} | ||
events = make([]chan event, 0) | ||
SleepGroup = sync.WaitGroup{} | ||
eventGroup = sync.WaitGroup{} | ||
} | ||
|
||
// Close cancels and closes all event channels, effectively unblocking all sleeping processes. | ||
func Close() { | ||
for _, e := range events { | ||
eventGroup.Add(1) | ||
e <- event{time: GlobalTime, cancel: true} | ||
close(e) | ||
} | ||
eventGroup.Wait() | ||
} | ||
|
||
// Advance adds the given duration to the global time, and pushes the new time to listeners | ||
// of the event channel. This advance in time MUST trickle down to all sleeping processes in order | ||
// to have a global consensus of time. Therefore, we must wait until every sleeping process has | ||
// received their event. | ||
func Advance(d time.Duration) { | ||
// Grab and hold the lock to prevent race conditions. | ||
mutex.Lock() | ||
defer mutex.Unlock() | ||
// Advance the global time. | ||
GlobalTime = GlobalTime.Add(d) | ||
// Update all sleeping processes of the new current time. | ||
for _, e := range events { | ||
eventGroup.Add(1) | ||
e <- event{time: GlobalTime, cancel: false} | ||
} | ||
// Block execution until all sleeping processes have received the event. | ||
eventGroup.Wait() | ||
} | ||
|
||
// Sleep creates a new event channel and blocks until the global time surpasses the given duration. | ||
func Sleep(d time.Duration) { | ||
var ec chan event | ||
var unblockAt time.Time | ||
|
||
mutex.Lock() | ||
// Create a new event channel for the sleeping process. | ||
ec = make(chan event) | ||
events = append(events, ec) | ||
// Determine when to stop sleeping. | ||
unblockAt = GlobalTime.Add(d) | ||
mutex.Unlock() | ||
|
||
// Acknowledge this sleeping process has finished initializing, and is now listening to | ||
// its event channel. This must be done to synchronize all sleeping processes before any | ||
// call to Advance() is made. | ||
SleepGroup.Done() | ||
// Block until it's time to wake up. | ||
for { | ||
if wakeUp(unblockAt, ec) { | ||
removeEventChannel(ec) | ||
return | ||
} | ||
} | ||
} | ||
|
||
// removeEventChannel removes the given event channel from the global collection of channels. | ||
func removeEventChannel(ec chan event) { | ||
for i := 0; i < len(events); i++ { | ||
if events[i] == ec { | ||
events = append(events[:i], events[i+1:]...) | ||
} | ||
} | ||
} | ||
|
||
// wakeUp only returns true if the given event channel has closed or the event time has surpassed | ||
// the given time. | ||
func wakeUp(alarm time.Time, ec chan event) bool { | ||
// Acknowledge this sleeping process has seen the new event upon return. | ||
defer eventGroup.Done() | ||
// Wait for a new event. | ||
event := <-ec | ||
return (event.cancel || event.time.After(alarm)) | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,157 @@ | ||
/* | ||
Copyright 2021 The Kubernetes Authors. | ||
Licensed under the Apache License, Version 2.0 (the "License"); | ||
you may not use this file except in compliance with the License. | ||
You may obtain a copy of the License at | ||
http://www.apache.org/licenses/LICENSE-2.0 | ||
Unless required by applicable law or agreed to in writing, software | ||
distributed under the License is distributed on an "AS IS" BASIS, | ||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
See the License for the specific language governing permissions and | ||
limitations under the License. | ||
*/ | ||
|
||
package faketime_test | ||
|
||
import ( | ||
"testing" | ||
"time" | ||
|
||
"github.com/stretchr/testify/require" | ||
ft "sigs.k8s.io/k8s-container-image-promoter/legacy/faketime" | ||
) | ||
|
||
// sleepingProcess holds the amount of time to sleep, as well as a unique identity to track | ||
// when it wakes up. | ||
type sleepingProcess struct { | ||
duration time.Duration | ||
id int | ||
} | ||
|
||
// sleep blocks the sleeping process, and inserts its id into the given map when unblocked. | ||
func (p sleepingProcess) sleep(m map[int]bool) { | ||
ft.Sleep(p.duration) | ||
m[p.id] = true | ||
} | ||
|
||
type sleepingProcesses []sleepingProcess | ||
|
||
// createSleepingProcesses returns when all provided sleeping processes are asleep. | ||
func (sp sleepingProcesses) sleep(m map[int]bool) { | ||
for _, p := range sp { | ||
ft.SleepGroup.Add(1) | ||
go p.sleep(m) | ||
} | ||
// Ensure all sleeping processes are actually asleep. | ||
ft.SleepGroup.Wait() | ||
} | ||
|
||
func TestAdvance(t *testing.T) { | ||
errorMessage := "The expected sleeping processes did not unblock." | ||
// This test initializes many sleeping processes and advances a single fixed amount of time. | ||
ft.Init() | ||
timeStep := time.Second * 30 | ||
// Record which sleeping processes wake up from sleep. | ||
awake := map[int]bool{} | ||
// Generate expected record after advancing time. | ||
expected := map[int]bool{0: true, 1: true} | ||
// Create multiple sleeping processes. | ||
sp := sleepingProcesses{ | ||
{ | ||
duration: time.Microsecond, | ||
id: 0, | ||
}, | ||
{ | ||
duration: time.Second, | ||
id: 1, | ||
}, | ||
{ | ||
duration: time.Minute, | ||
id: 2, | ||
}, | ||
{ | ||
duration: time.Hour, | ||
id: 3, | ||
}, | ||
} | ||
// Put all processes to sleep. | ||
sp.sleep(awake) | ||
|
||
// Advance time by the given time step. | ||
ft.Advance(timeStep) | ||
// Ensure only the first two sleeping processes woke up. | ||
require.EqualValues(t, expected, awake, errorMessage) | ||
// Stop all processes. | ||
ft.Close() | ||
|
||
// This test makes several time-steps, while adding new sleeping proccesses each step. | ||
ft.Init() | ||
timeSteps := []time.Duration{time.Second, time.Hour, time.Minute} | ||
step := 0 | ||
// Record which sleeping processes wake up from sleep. | ||
awake = map[int]bool{} | ||
// Begin with a few sleeping processes. | ||
sp = sleepingProcesses{ | ||
{ | ||
duration: time.Second * 2, | ||
id: 0, | ||
}, | ||
{ | ||
duration: time.Microsecond, | ||
id: 1, | ||
}, | ||
{ | ||
duration: time.Hour, | ||
id: 2, | ||
}, | ||
} | ||
// Put all processes to sleep. | ||
sp.sleep(awake) | ||
|
||
// Advance time by the first time-step. | ||
ft.Advance(timeSteps[step]) | ||
step++ | ||
// Determin which sleeping processes should now be awake. | ||
expected = map[int]bool{1: true} | ||
// Ensure the correct sleeping proccesses are awake. | ||
require.EqualValues(t, expected, awake, errorMessage) | ||
|
||
// Add another sleeping process. | ||
sp = sleepingProcesses{ | ||
{ | ||
duration: time.Hour + time.Second, | ||
id: 3, | ||
}, | ||
} | ||
sp.sleep(awake) | ||
|
||
// Advance time by another time-step. | ||
ft.Advance(timeSteps[step]) | ||
step++ | ||
// Determin which sleeping processes should now be awake. | ||
expected = map[int]bool{0: true, 1: true, 2: true} | ||
// Ensure the correct sleeping proccesses are awake. | ||
require.EqualValues(t, expected, awake, errorMessage) | ||
|
||
// Add another sleeping process. | ||
sp = sleepingProcesses{ | ||
{ | ||
duration: time.Millisecond, | ||
id: 4, | ||
}, | ||
} | ||
sp.sleep(awake) | ||
|
||
// Advance time by the final time-step. | ||
ft.Advance(timeSteps[step]) | ||
// Determin which sleeping processes should now be awake. | ||
expected = map[int]bool{0: true, 1: true, 2: true, 3: true, 4: true} | ||
// Ensure the correct sleeping proccesses are awake. | ||
require.EqualValues(t, expected, awake, errorMessage) | ||
|
||
// Stop all processes. | ||
ft.Close() | ||
} |