From a34172e4c659c88f53e0b026b53bf2f8adc7f2e2 Mon Sep 17 00:00:00 2001 From: Christopher Teubert Date: Fri, 18 Aug 2023 14:03:55 -0700 Subject: [PATCH] Make future loading optional --- examples/basic_example.py | 7 ++----- examples/horizon.py | 6 ++---- examples/predict_specific_event.py | 4 +--- examples/sensitivity.py | 14 +++++--------- examples/state_limits.py | 12 ++++-------- src/progpy/predictors/monte_carlo.py | 7 +++++-- src/progpy/predictors/unscented_transform.py | 7 +++++-- tests/test_predictors.py | 12 +++++------- 8 files changed, 29 insertions(+), 40 deletions(-) diff --git a/examples/basic_example.py b/examples/basic_example.py index 17481bb7..edd28473 100644 --- a/examples/basic_example.py +++ b/examples/basic_example.py @@ -20,9 +20,6 @@ def run_example(): # Step 1: Setup model & future loading m = ThrownObject(process_noise = 1) - def future_loading(t, x = None): - # No load for a thrown object - return m.InputContainer({}) initial_state = m.initialize() # Step 2: Demonstrating state estimator @@ -42,7 +39,7 @@ def future_loading(t, x = None): # Step 2c: Perform state estimation step, given some measurement, above what's expected example_measurements = m.OutputContainer({'x': 7.5}) t = 0.1 - u = future_loading(t) + u = m.InputContainer({}) filt.estimate(t, u, example_measurements) # Update state, given (example) sensor data # Step 2d: Print & Plot Resulting Posterior State @@ -65,7 +62,7 @@ def future_loading(t, x = None): # Step 3b: Perform a prediction NUM_SAMPLES = 50 STEP_SIZE = 0.01 - mc_results = mc.predict(filt.x, future_loading, n_samples = NUM_SAMPLES, dt=STEP_SIZE, save_freq=STEP_SIZE) + mc_results = mc.predict(filt.x, n_samples = NUM_SAMPLES, dt=STEP_SIZE, save_freq=STEP_SIZE) print('Predicted time of event (ToE): ', mc_results.time_of_event.mean) # Here there are 2 events predicted, when the object starts falling, and when it impacts the ground. diff --git a/examples/horizon.py b/examples/horizon.py index ec081147..b484ae1c 100644 --- a/examples/horizon.py +++ b/examples/horizon.py @@ -18,9 +18,7 @@ def run_example(): # Step 1: Setup model & future loading - def future_loading(t, x = None): - return {} - m = ThrownObject(process_noise = 0.25, measurement_noise = 0.2) + m = ThrownObject(process_noise=0.25, measurement_noise=0.2) initial_state = m.initialize() # Step 2: Demonstrating state estimator @@ -53,7 +51,7 @@ def future_loading(t, x = None): PREDICTION_HORIZON = 7.75 samples = filt.x # Since we're using a particle filter, which is also sample-based, we can directly use the samples, without changes STEP_SIZE = 0.01 - mc_results = mc.predict(samples, future_loading, dt=STEP_SIZE, horizon = PREDICTION_HORIZON) + mc_results = mc.predict(samples, dt=STEP_SIZE, horizon=PREDICTION_HORIZON) print("\nPredicted Time of Event:") metrics = mc_results.time_of_event.metrics() pprint(metrics) # Note this takes some time diff --git a/examples/predict_specific_event.py b/examples/predict_specific_event.py index f9c2c3d0..2093e971 100644 --- a/examples/predict_specific_event.py +++ b/examples/predict_specific_event.py @@ -12,8 +12,6 @@ def run_example(): m = ThrownObject() initial_state = m.initialize() load = m.InputContainer({}) # Optimization - create once - def future_loading(t, x = None): - return load ## State Estimation - perform a single ukf state estimate step filt = state_estimators.UnscentedKalmanFilter(m, initial_state) @@ -24,7 +22,7 @@ def future_loading(t, x = None): pred = predictors.UnscentedTransformPredictor(m) # Predict with a step size of 0.1 - mc_results = pred.predict(filt.x, future_loading, dt=0.1, save_freq= 1, events=['impact']) + mc_results = pred.predict(filt.x, dt=0.1, save_freq= 1, events=['impact']) # Print Results for i, time in enumerate(mc_results.times): diff --git a/examples/sensitivity.py b/examples/sensitivity.py index f6003962..375f0988 100644 --- a/examples/sensitivity.py +++ b/examples/sensitivity.py @@ -14,22 +14,18 @@ def run_example(): # Step 1: Create instance of model m = ThrownObject() - # Step 2: Setup for simulation - def future_load(t, x=None): - return m.InputContainer({}) - - # Step 3: Setup range on parameters considered + # Step 2: Setup range on parameters considered thrower_height_range = np.arange(1.2, 2.1, 0.1) - # Step 4: Sim for each + # Step 3: Sim for each event = 'impact' eods = np.empty(len(thrower_height_range)) for (i, thrower_height) in zip(range(len(thrower_height_range)), thrower_height_range): m.parameters['thrower_height'] = thrower_height - simulated_results = m.simulate_to_threshold(future_load, threshold_keys=[event], dt =1e-3, save_freq =10) + simulated_results = m.simulate_to_threshold(threshold_keys=[event], dt =1e-3, save_freq =10) eods[i] = simulated_results.times[-1] - # Step 5: Analysis + # Step 4: Analysis print('For a reasonable range of heights, impact time is between {} and {}'.format(round(eods[0],3), round(eods[-1],3))) sensitivity = (eods[-1]-eods[0])/(thrower_height_range[-1] - thrower_height_range[0]) print(' - Average sensitivity: {} s per cm height'.format(round(sensitivity/100, 6))) @@ -40,7 +36,7 @@ def future_load(t, x=None): eods = np.empty(len(throw_speed_range)) for (i, throw_speed) in zip(range(len(throw_speed_range)), throw_speed_range): m.parameters['throwing_speed'] = throw_speed - simulated_results = m.simulate_to_threshold(future_load, threshold_keys=[event], options={'dt':1e-3, 'save_freq':10}) + simulated_results = m.simulate_to_threshold(threshold_keys=[event], options={'dt':1e-3, 'save_freq':10}) eods[i] = simulated_results.times[-1] print('\nFor a reasonable range of throwing speeds, impact time is between {} and {}'.format(round(eods[0],3), round(eods[-1],3))) diff --git a/examples/state_limits.py b/examples/state_limits.py index 0d15f409..a60696ea 100644 --- a/examples/state_limits.py +++ b/examples/state_limits.py @@ -15,11 +15,7 @@ def run_example(): # Step 1: Create instance of model (without drag) m = ThrownObject( cd = 0 ) - # Step 2: Setup for simulation - def future_load(t, x=None): - return {} - - # add state limits + # Step 2: add state limits m.state_limits = { # object may not go below ground height 'x': (0, inf), @@ -30,7 +26,7 @@ def future_load(t, x=None): # Step 3: Simulate to impact event = 'impact' - simulated_results = m.simulate_to_threshold(future_load, threshold_keys=[event], dt=0.005, save_freq=1) + simulated_results = m.simulate_to_threshold(threshold_keys=[event], dt=0.005, save_freq=1) # Print states print('Example 1') @@ -42,7 +38,7 @@ def future_load(t, x=None): x0 = m.initialize(u = {}, z = {}) x0['x'] = -1 - simulated_results = m.simulate_to_threshold(future_load, threshold_keys=[event], dt=0.005, save_freq=1, x = x0) + simulated_results = m.simulate_to_threshold(threshold_keys=[event], dt=0.005, save_freq=1, x=x0) # Print states print('Example 2') @@ -57,7 +53,7 @@ def future_load(t, x=None): m.parameters['g'] = -50000000 print('Example 3') - simulated_results = m.simulate_to_threshold(future_load, threshold_keys=[event], dt=0.005, save_freq=0.3, x = x0, print = True, progress = False) + simulated_results = m.simulate_to_threshold(threshold_keys=[event], dt=0.005, save_freq=0.3, x=x0, print=True, progress=False) # Note that the limits can also be applied manually using the apply_limits function print('limiting states') diff --git a/src/progpy/predictors/monte_carlo.py b/src/progpy/predictors/monte_carlo.py index 923b684f..cd6490a0 100644 --- a/src/progpy/predictors/monte_carlo.py +++ b/src/progpy/predictors/monte_carlo.py @@ -31,7 +31,7 @@ class MonteCarlo(Predictor): 'n_samples': None } - def predict(self, state: UncertainData, future_loading_eqn: Callable, **kwargs) -> PredictionResults: + def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **kwargs) -> PredictionResults: """ Perform a single prediction @@ -39,7 +39,7 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable, **kwargs) ---------- state : UncertainData Distribution representing current state of the system - future_loading_eqn : function (t, x) -> z + future_loading_eqn : function (t, x=None) -> z, optional Function to generate an estimate of loading at future time t, and state x Keyword Arguments @@ -77,6 +77,9 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable, **kwargs) else: raise TypeError("state must be UncertainData, dict, or StateContainer") + if future_loading_eqn is None: + future_loading_eqn = lambda t, x=None: self.model.InputContainer({}) + params = deepcopy(self.parameters) # copy parameters params.update(kwargs) # update for specific run params['print'] = False diff --git a/src/progpy/predictors/unscented_transform.py b/src/progpy/predictors/unscented_transform.py index 8288e2dd..e788ec0d 100644 --- a/src/progpy/predictors/unscented_transform.py +++ b/src/progpy/predictors/unscented_transform.py @@ -123,14 +123,14 @@ def state_transition(x, dt): self.filter = kalman.UnscentedKalmanFilter(num_states, num_measurements, self.parameters['dt'], measure, state_transition, self.sigma_points) self.filter.Q = self.parameters['Q'] - def predict(self, state, future_loading_eqn: Callable, **kwargs) -> PredictionResults: + def predict(self, state, future_loading_eqn: Callable = None, **kwargs) -> PredictionResults: """ Perform a single prediction Parameters ---------- state (UncertaintData): Distribution of states - future_loading_eqn : function (t, x={}) -> z + future_loading_eqn: function (t, x=None) -> z, optional Function to generate an estimate of loading at future time t options (optional, kwargs): configuration options\n Any additional configuration values. Note: These parameters can also be specified in the predictor constructor. The following configuration parameters are supported: \n @@ -169,6 +169,9 @@ def predict(self, state, future_loading_eqn: Callable, **kwargs) -> PredictionRe else: raise TypeError("state must be UncertainData, dict, or StateContainer") + if future_loading_eqn is None: + future_loading_eqn = lambda t, x=None: self.model.InputContainer({}) + params = deepcopy(self.parameters) # copy parameters params.update(kwargs) # update for specific run diff --git a/tests/test_predictors.py b/tests/test_predictors.py index 00efa3c8..879f9657 100644 --- a/tests/test_predictors.py +++ b/tests/test_predictors.py @@ -66,10 +66,9 @@ def test_UTP_ThrownObject(self): m = ThrownObject() pred = UnscentedTransformPredictor(m) samples = MultivariateNormalDist(['x', 'v'], [1.83, 40], [[0.1, 0.01], [0.01, 0.1]]) - def future_loading(t, x={}): - return {} - mc_results = pred.predict(samples, future_loading, dt=0.01, save_freq=1) + # No future loading (i.e., no load) + mc_results = pred.predict(samples, dt=0.01, save_freq=1) self.assertAlmostEqual(mc_results.time_of_event.mean['impact'], 8.21, 0) self.assertAlmostEqual(mc_results.time_of_event.mean['falling'], 4.15, 0) # self.assertAlmostEqual(mc_results.times[-1], 9, 1) # Saving every second, last time should be around the 1s after impact event (because one of the sigma points fails afterwards) @@ -126,10 +125,9 @@ def future_loading(t, x=None): def test_MC(self): m = ThrownObject() mc = MonteCarlo(m) - def future_loading(t=None, x=None): - return {} - - mc.predict(m.initialize(), future_loading, dt=0.2, num_samples=3, save_freq=1) + + # Test with empty future loading (i.e., no load) + mc.predict(m.initialize(), dt=0.2, num_samples=3, save_freq=1) def test_prediction_mvnormaldist(self): times = list(range(10))