From f5fe9f69cd63cbd1b3cc47421cefb24f4c87c2c4 Mon Sep 17 00:00:00 2001 From: Tyler Ferrara Date: Mon, 26 Jul 2021 20:41:00 -0400 Subject: [PATCH] Added a fake_time package to deterministically test time critical events. --- legacy/faketime/faketime.go | 138 +++++++++++++++++++++++++++ legacy/faketime/faketime_test.go | 157 +++++++++++++++++++++++++++++++ 2 files changed, 295 insertions(+) create mode 100644 legacy/faketime/faketime.go create mode 100644 legacy/faketime/faketime_test.go diff --git a/legacy/faketime/faketime.go b/legacy/faketime/faketime.go new file mode 100644 index 00000000..c08d6290 --- /dev/null +++ b/legacy/faketime/faketime.go @@ -0,0 +1,138 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package faketime + +import ( + "fmt" + "sync" + "time" +) + +// event holds the information to unblock sleeping processes. Calls to Advance() will push a new event +// with the updated time to all event channels. Processes blocked by Sleep() will receive these events +// and unblock when their sleep duration has expired. +type event struct { + time time.Time // current global time. + cancel bool // determine whether the sleeping process should cancel sleep. +} + +const ( + // DefaultTime is used to initialize the global time. + // This represents September 22, 2002 at 00:00:00. + DefaultTime string = "020106 000000" +) + +var ( + // GlobalTime is the primary source of truth for measuring "fake" time. + GlobalTime time.Time + // mutex is used to prevent race conditions when reading or writing any global variable. + mutex sync.Mutex + // events holds all event channels for sleeping processes. + events []chan event + // SleepGroup is used to ensure all Sleep() processes are blocked and listening to their event channel. + SleepGroup sync.WaitGroup + // eventGroup is used after a call to Advance(), ensuring all sleeping processes have received their event. + eventGroup sync.WaitGroup +) + +// Init defines the starting state to mock the time.Sleep function. +func Init() { + var err error + GlobalTime, err = time.Parse(DefaultTime, DefaultTime) + if err != nil { + msg := fmt.Sprintf("The default time %s could not be parsed in fake_time.Init()", DefaultTime) + panic(msg) + } + mutex = sync.Mutex{} + events = make([]chan event, 0) + SleepGroup = sync.WaitGroup{} + eventGroup = sync.WaitGroup{} +} + +// Close cancels and closes all event channels, effectively unblocking all sleeping processes. +func Close() { + for _, e := range events { + eventGroup.Add(1) + e <- event{time: GlobalTime, cancel: true} + close(e) + } + eventGroup.Wait() +} + +// Advance adds the given duration to the global time, and pushes the new time to listeners +// of the event channel. This advance in time MUST trickle down to all sleeping processes in order +// to have a global consensus of time. Therefore, we must wait until every sleeping process has +// received their event. +func Advance(d time.Duration) { + // Grab and hold the lock to prevent race conditions. + mutex.Lock() + defer mutex.Unlock() + // Advance the global time. + GlobalTime = GlobalTime.Add(d) + // Update all sleeping processes of the new current time. + for _, e := range events { + eventGroup.Add(1) + e <- event{time: GlobalTime, cancel: false} + } + // Block execution until all sleeping processes have received the event. + eventGroup.Wait() +} + +// Sleep creates a new event channel and blocks until the global time surpasses the given duration. +func Sleep(d time.Duration) { + var ec chan event + var unblockAt time.Time + + mutex.Lock() + // Create a new event channel for the sleeping process. + ec = make(chan event) + events = append(events, ec) + // Determine when to stop sleeping. + unblockAt = GlobalTime.Add(d) + mutex.Unlock() + + // Acknowledge this sleeping process has finished initializing, and is now listening to + // its event channel. This must be done to synchronize all sleeping processes before any + // call to Advance() is made. + SleepGroup.Done() + // Block until it's time to wake up. + for { + if wakeUp(unblockAt, ec) { + removeEventChannel(ec) + return + } + } +} + +// removeEventChannel removes the given event channel from the global collection of channels. +func removeEventChannel(ec chan event) { + for i := 0; i < len(events); i++ { + if events[i] == ec { + events = append(events[:i], events[i+1:]...) + } + } +} + +// wakeUp only returns true if the given event channel has closed or the event time has surpassed +// the given time. +func wakeUp(alarm time.Time, ec chan event) bool { + // Acknowledge this sleeping process has seen the new event upon return. + defer eventGroup.Done() + // Wait for a new event. + event := <-ec + return (event.cancel || event.time.After(alarm)) +} diff --git a/legacy/faketime/faketime_test.go b/legacy/faketime/faketime_test.go new file mode 100644 index 00000000..4f9fd742 --- /dev/null +++ b/legacy/faketime/faketime_test.go @@ -0,0 +1,157 @@ +/* +Copyright 2021 The Kubernetes Authors. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ + +package faketime_test + +import ( + "testing" + "time" + + "github.com/stretchr/testify/require" + ft "sigs.k8s.io/k8s-container-image-promoter/legacy/faketime" +) + +// sleepingProcess holds the amount of time to sleep, as well as a unique identity to track +// when it wakes up. +type sleepingProcess struct { + duration time.Duration + id int +} + +// sleep blocks the sleeping process, and inserts its id into the given map when unblocked. +func (p sleepingProcess) sleep(m map[int]bool) { + ft.Sleep(p.duration) + m[p.id] = true +} + +type sleepingProcesses []sleepingProcess + +// createSleepingProcesses returns when all provided sleeping processes are asleep. +func (sp sleepingProcesses) sleep(m map[int]bool) { + for _, p := range sp { + ft.SleepGroup.Add(1) + go p.sleep(m) + } + // Ensure all sleeping processes are actually asleep. + ft.SleepGroup.Wait() +} + +func TestAdvance(t *testing.T) { + errorMessage := "The expected sleeping processes did not unblock." + // This test initializes many sleeping processes and advances a single fixed amount of time. + ft.Init() + timeStep := time.Second * 30 + // Record which sleeping processes wake up from sleep. + awake := map[int]bool{} + // Generate expected record after advancing time. + expected := map[int]bool{0: true, 1: true} + // Create multiple sleeping processes. + sp := sleepingProcesses{ + { + duration: time.Microsecond, + id: 0, + }, + { + duration: time.Second, + id: 1, + }, + { + duration: time.Minute, + id: 2, + }, + { + duration: time.Hour, + id: 3, + }, + } + // Put all processes to sleep. + sp.sleep(awake) + + // Advance time by the given time step. + ft.Advance(timeStep) + // Ensure only the first two sleeping processes woke up. + require.EqualValues(t, expected, awake, errorMessage) + // Stop all processes. + ft.Close() + + // This test makes several time-steps, while adding new sleeping proccesses each step. + ft.Init() + timeSteps := []time.Duration{time.Second, time.Hour, time.Minute} + step := 0 + // Record which sleeping processes wake up from sleep. + awake = map[int]bool{} + // Begin with a few sleeping processes. + sp = sleepingProcesses{ + { + duration: time.Second * 2, + id: 0, + }, + { + duration: time.Microsecond, + id: 1, + }, + { + duration: time.Hour, + id: 2, + }, + } + // Put all processes to sleep. + sp.sleep(awake) + + // Advance time by the first time-step. + ft.Advance(timeSteps[step]) + step++ + // Determin which sleeping processes should now be awake. + expected = map[int]bool{1: true} + // Ensure the correct sleeping proccesses are awake. + require.EqualValues(t, expected, awake, errorMessage) + + // Add another sleeping process. + sp = sleepingProcesses{ + { + duration: time.Hour + time.Second, + id: 3, + }, + } + sp.sleep(awake) + + // Advance time by another time-step. + ft.Advance(timeSteps[step]) + step++ + // Determin which sleeping processes should now be awake. + expected = map[int]bool{0: true, 1: true, 2: true} + // Ensure the correct sleeping proccesses are awake. + require.EqualValues(t, expected, awake, errorMessage) + + // Add another sleeping process. + sp = sleepingProcesses{ + { + duration: time.Millisecond, + id: 4, + }, + } + sp.sleep(awake) + + // Advance time by the final time-step. + ft.Advance(timeSteps[step]) + // Determin which sleeping processes should now be awake. + expected = map[int]bool{0: true, 1: true, 2: true, 3: true, 4: true} + // Ensure the correct sleeping proccesses are awake. + require.EqualValues(t, expected, awake, errorMessage) + + // Stop all processes. + ft.Close() +}