Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions .github/workflows/python-package.yml
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,24 @@ jobs:
run: pip install --upgrade --upgrade-strategy eager -e .
- name: Run tests
run: python -m tests.test_examples
test_horizon:
timeout-minutes: 5
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@v3
- name: Set up Python
uses: actions/setup-python@v4
with:
python-version: '3.7'
- name: Install dependencies cache
uses: actions/cache@v2
with:
path: ~/.cache/pip
key: pip-cache
- name: Update
run: pip install --upgrade --upgrade-strategy eager -e .
- name: Run tests
run: python -m tests.test_horizon
test_linear_model:
timeout-minutes: 5
runs-on: ubuntu-latest
Expand Down
36 changes: 12 additions & 24 deletions examples/horizon.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,46 +12,34 @@
ii) Time event is predicted to occur (with uncertainty)
"""

import numpy as np
from progpy.models.thrown_object import ThrownObject
from progpy import *
from progpy.predictors import MonteCarlo
from progpy.uncertain_data import MultivariateNormalDist
from pprint import pprint

def run_example():
# Step 1: Setup model & future loading
m = ThrownObject(process_noise=0.25, measurement_noise=0.2)
m = ThrownObject(process_noise=0.5, measurement_noise=0.15)
initial_state = m.initialize()

# Step 2: Demonstrating state estimator
print("\nPerforming State Estimation Step...")

# Step 2a: Setup
NUM_SAMPLES = 1000
filt = state_estimators.ParticleFilter(m, initial_state, num_particles = NUM_SAMPLES)
# VVV Uncomment this to use UKF State Estimator VVV
# filt = state_estimators.UnscentedKalmanFilter(batt, initial_state)

# Step 2b: One step of state estimator
u = m.InputContainer({}) # No input for ThrownObject
filt.estimate(0.1, u, m.output(initial_state))
x = MultivariateNormalDist(initial_state.keys(), initial_state.values(), np.diag([x_i*0.01 for x_i in initial_state.values()]))

# Note: in a prognostic application the above state estimation
# step would be repeated each time there is new data.
# Here we're doing one step to demonstrate how the state estimator is used

# Step 3: Demonstrating Predictor
# Step 2: Demonstrating Predictor
print("\nPerforming Prediction Step...")

# Step 3a: Setup Predictor
mc = predictors.MonteCarlo(m)
# Step 2a: Setup Predictor
mc = MonteCarlo(m)

# Step 3b: Perform a prediction
# Step 2b: Perform a prediction
# THIS IS WHERE WE DIVERGE FROM THE THROWN_OBJECT_EXAMPLE
# Here we set a prediction horizon
# We're saying we are not interested in any events that occur after this time
PREDICTION_HORIZON = 7.75
samples = filt.x # Since we're using a particle filter, which is also sample-based, we can directly use the samples, without changes
PREDICTION_HORIZON = 7.7
STEP_SIZE = 0.01
mc_results = mc.predict(samples, dt=STEP_SIZE, horizon=PREDICTION_HORIZON)
mc_results = mc.predict(x, n_samples=NUM_SAMPLES,dt=STEP_SIZE, horizon = PREDICTION_HORIZON)

print("\nPredicted Time of Event:")
metrics = mc_results.time_of_event.metrics()
pprint(metrics) # Note this takes some time
Expand Down
6 changes: 6 additions & 0 deletions src/progpy/predictors/monte_carlo.py
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **k

# Perform prediction
t0 = params.get('t0', 0)
HORIZON = params.get('horizon', float('inf')) # Save the horizon to be used later
for x in state:
first_output = self.model.output(x)

Expand All @@ -123,6 +124,7 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **k

params['t0'] = t0
params['x'] = x
params['horizon'] = HORIZON # reset to initial horizon

if 'save_freq' in params and not isinstance(params['save_freq'], tuple):
params['save_freq'] = (params['t0'], params['save_freq'])
Expand All @@ -144,6 +146,10 @@ def predict(self, state: UncertainData, future_loading_eqn: Callable = None, **k

# Non-vectorized prediction
while len(events_remaining) > 0: # Still events to predict
# Since horizon is relative to t0 (the simulation starting point),
# we must subtract the difference in current t0 from the initial (i.e., prediction t0)
# each subsequent simulation
params['horizon'] = HORIZON - (params['t0'] - t0)
(t, u, xi, z, es) = simulate_to_threshold(future_loading_eqn,
first_output,
threshold_keys = events_remaining,
Expand Down
6 changes: 6 additions & 0 deletions tests/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
from tests.test_ensemble import main as ensemble_main
from tests.test_estimate_params import main as estimate_params_main
from tests.test_examples import main as examples_main
from tests.test_horizon import main as horizon_main
from tests.test_linear_model import main as linear_main
from tests.test_metrics import main as metrics_main
from tests.test_pneumatic_valve import main as pneumatic_valve_main
Expand Down Expand Up @@ -77,6 +78,11 @@
except Exception:
was_successful = False

try:
horizon_main()
except Exception:
was_successful = False

try:
linear_main()
except Exception:
Expand Down
78 changes: 78 additions & 0 deletions tests/test_horizon.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
# Copyright © 2021 United States Government as represented by the Administrator of the
# National Aeronautics and Space Administration. All Rights Reserved.

from io import StringIO
import sys
import unittest

from progpy import predictors
from progpy.models import ThrownObject

class TestHorizon(unittest.TestCase):
def setUp(self):
# set stdout (so it won't print)
sys.stdout = StringIO()

def tearDown(self):
sys.stdout = sys.__stdout__

def test_horizon_ex(self):
# Setup model
m = ThrownObject(process_noise=0.25, measurement_noise=0.2)
# Change parameters (to make simulation faster)
m.parameters['thrower_height'] = 1.0
m.parameters['throwing_speed'] = 10.0
initial_state = m.initialize()

# Define future loading (necessary for prediction call)
def future_loading(t, x=None):
return {}

# Setup Predictor (smaller sample size for efficiency)
mc = predictors.MonteCarlo(m)
mc.parameters['n_samples'] = 50

# Perform a prediction
# With this horizon, all samples will reach 'falling', but only some will reach 'impact'
PREDICTION_HORIZON = 2.127
STEP_SIZE = 0.001
mc_results = mc.predict(initial_state, future_loading, dt=STEP_SIZE, horizon = PREDICTION_HORIZON)

# 'falling' happens before the horizon is met
falling_res = [mc_results.time_of_event[iter]['falling'] for iter in range(mc.parameters['n_samples']) if mc_results.time_of_event[iter]['falling'] is not None]
self.assertEqual(len(falling_res), mc.parameters['n_samples'])

# 'impact' happens around the horizon, so some samples have reached this event and others haven't
impact_res = [mc_results.time_of_event[iter]['impact'] for iter in range(mc.parameters['n_samples']) if mc_results.time_of_event[iter]['impact'] is not None]
self.assertLess(len(impact_res), mc.parameters['n_samples'])

# Try again with very low prediction_horizon, where no events are reached
# Note: here we count how many None values there are for each event (in the above and below examples, we count values that are NOT None)
mc_results_no_event = mc.predict(initial_state, future_loading, dt=STEP_SIZE, horizon=0.3)
falling_res_no_event = [mc_results_no_event.time_of_event[iter]['falling'] for iter in range(mc.parameters['n_samples']) if mc_results_no_event.time_of_event[iter]['falling'] is None]
impact_res_no_event = [mc_results_no_event.time_of_event[iter]['impact'] for iter in range(mc.parameters['n_samples']) if mc_results_no_event.time_of_event[iter]['impact'] is None]
self.assertEqual(len(falling_res_no_event), mc.parameters['n_samples'])
self.assertEqual(len(impact_res_no_event), mc.parameters['n_samples'])

# Finally, try without horizon, all events should be reached for all samples
mc_results_all_event = mc.predict(initial_state, future_loading, dt=STEP_SIZE)
falling_res_all_event = [mc_results_all_event.time_of_event[iter]['falling'] for iter in range(mc.parameters['n_samples']) if mc_results_all_event.time_of_event[iter]['falling'] is not None]
impact_res_all_event = [mc_results_all_event.time_of_event[iter]['impact'] for iter in range(mc.parameters['n_samples']) if mc_results_all_event.time_of_event[iter]['impact'] is not None]
self.assertEqual(len(falling_res_all_event), mc.parameters['n_samples'])
self.assertEqual(len(impact_res_all_event), mc.parameters['n_samples'])

# This allows the module to be executed directly
def run_tests():
unittest.main()

def main():
load_test = unittest.TestLoader()
runner = unittest.TextTestRunner()
print("\n\nTesting Horizon functionality")
result = runner.run(load_test.loadTestsFromTestCase(TestHorizon)).wasSuccessful()

if not result:
raise Exception("Failed test")

if __name__ == '__main__':
main()