|
| 1 | +from pybricks.pupdevices import Motor |
| 2 | +from pybricks.parameters import Port, Direction |
| 3 | +from pybricks.robotics import DriveBase |
| 4 | +from pybricks.tools import wait, multitask, run_task |
| 5 | + |
| 6 | +ENDPOINT = 142 |
| 7 | +SPEED = 500 |
| 8 | + |
| 9 | + |
| 10 | +def is_close(motor, target): |
| 11 | + return abs(motor.angle() - target) < 5 |
| 12 | + |
| 13 | + |
| 14 | +# Spins freely. |
| 15 | +motor = Motor(Port.A) |
| 16 | + |
| 17 | +# Physically blocked in two directions. |
| 18 | +lever = Motor(Port.C) |
| 19 | + |
| 20 | + |
| 21 | +def reset(): |
| 22 | + for m in (motor, lever): |
| 23 | + m.run_target(SPEED, 0) |
| 24 | + assert is_close(m, 0) |
| 25 | + |
| 26 | + |
| 27 | +reset() |
| 28 | + |
| 29 | +# Should block until endpoint. |
| 30 | +assert lever.run_until_stalled(SPEED) == ENDPOINT |
| 31 | +assert lever.angle() == ENDPOINT |
| 32 | + |
| 33 | +# Should block until close to target |
| 34 | +lever.run_target(SPEED, 0) |
| 35 | +assert is_close(lever, 0) |
| 36 | + |
| 37 | + |
| 38 | +async def stall(): |
| 39 | + # Should return None for most movements. |
| 40 | + ret = await lever.run_target(SPEED, -90) |
| 41 | + assert is_close(lever, -90) |
| 42 | + assert ret is None |
| 43 | + |
| 44 | + # Should return value at end of stall awaitable. |
| 45 | + stall_angle = await lever.run_until_stalled(SPEED) |
| 46 | + assert stall_angle == ENDPOINT |
| 47 | + |
| 48 | + |
| 49 | +run_task(stall()) |
| 50 | + |
| 51 | + |
| 52 | +def is_coasting(motor): |
| 53 | + return motor.load() == 0 |
| 54 | + |
| 55 | + |
| 56 | +# Confirm that stop() coasts the motor. |
| 57 | +motor.run_angle(SPEED, 360) |
| 58 | +assert not is_coasting(motor) |
| 59 | +motor.stop() |
| 60 | +assert is_coasting(motor) |
| 61 | +reset() |
| 62 | + |
| 63 | + |
| 64 | +async def par1(expect_interruption=False): |
| 65 | + for i in range(4): |
| 66 | + await motor.run_angle(SPEED, 90) |
| 67 | + if expect_interruption: |
| 68 | + raise RuntimeError("Expected interruption, so shold never see this.") |
| 69 | + |
| 70 | + |
| 71 | +async def par2(): |
| 72 | + await wait(100) |
| 73 | + |
| 74 | + |
| 75 | +# Let the motor run on its own. |
| 76 | +reset() |
| 77 | +run_task(par1()) |
| 78 | +assert not is_coasting(motor) |
| 79 | +assert is_close(motor, 360) |
| 80 | + |
| 81 | +# Let the motor run in parallel to a task that does not affect it. |
| 82 | +reset() |
| 83 | +run_task(multitask(par1(), par2())) |
| 84 | +assert not is_coasting(motor) |
| 85 | +assert is_close(motor, 360) |
| 86 | + |
| 87 | +# Let the motor run in parallel to a short task as a race. This should cancel |
| 88 | +# the motor task early and coast it. |
| 89 | +reset() |
| 90 | +run_task(multitask(par1(True), par2(), race=True)) |
| 91 | +assert is_coasting(motor) |
| 92 | +assert not is_close(motor, 360) |
| 93 | + |
| 94 | + |
| 95 | +reset() |
| 96 | + |
| 97 | + |
| 98 | +async def par3(): |
| 99 | + await motor.run_target(SPEED, 36000) |
| 100 | + # We should never make it, but stop waiting and proceed instead. |
| 101 | + assert not is_close(motor, 36000) |
| 102 | + print("motor movement awaiting was cancelled") |
| 103 | + |
| 104 | + |
| 105 | +async def par4(): |
| 106 | + await wait(500) |
| 107 | + print("Going to take over the motor.") |
| 108 | + await motor.run_target(SPEED, 90) |
| 109 | + print("Finished turning after take over.") |
| 110 | + |
| 111 | + |
| 112 | +run_task(multitask(par3(), par4())) |
0 commit comments