/
sequence_shot.py
223 lines (167 loc) · 8.3 KB
/
sequence_shot.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
"""A shot in MPF."""
import uuid
from collections import namedtuple
from typing import List, Dict, Set
import mpf.core.delays
from mpf.core.events import event_handler
from mpf.core.mode import Mode
from mpf.core.player import Player
from mpf.core.mode_device import ModeDevice
from mpf.core.system_wide_device import SystemWideDevice
ActiveSequence = namedtuple("ActiveSequence", ["id", "current_position_index", "next_event"])
class SequenceShot(SystemWideDevice, ModeDevice):
"""A device which represents a sequence shot."""
config_section = 'sequence_shots'
collection = 'sequence_shots'
class_label = 'sequence_shot'
__slots__ = ["delay", "active_sequences", "active_delays", "_sequence_events", "_delay_events", "_start_time"]
def __init__(self, machine, name):
"""Initialize sequence shot."""
super().__init__(machine, name)
self.delay = mpf.core.delays.DelayManager(self.machine)
self.active_sequences = list() # type: List[ActiveSequence]
self.active_delays = set() # type: Set[str]
self._sequence_events = [] # type: List[str]
self._delay_events = {} # type: Dict[str, int]
self._start_time = None
@property
def can_exist_outside_of_game(self):
"""Return true if this device can exist outside of a game."""
return True
async def device_added_system_wide(self):
"""Register switch handlers on load."""
await super().device_added_system_wide()
self._register_handlers()
def device_loaded_in_mode(self, mode: Mode, player: Player):
"""Register switch handlers on mode start."""
super().device_loaded_in_mode(mode, player)
self._register_handlers()
def device_removed_from_mode(self, mode):
"""Unregister switch handlers on mode end."""
del mode
self._remove_handlers()
self.reset_all_sequences()
self.delay.clear()
async def _initialize(self):
await super()._initialize()
if self.config['switch_sequence'] and self.config['event_sequence']:
raise AssertionError("Sequence shot {} only supports switch_sequence or event_sequence".format(self.name))
self._sequence_events = self.config['event_sequence']
for switch in self.config['switch_sequence']:
self._sequence_events.append(self.machine.switch_controller.get_active_event_for_switch(switch.name))
def _register_handlers(self):
for event in set(self._sequence_events):
self.machine.events.add_handler(event, self._sequence_advance, event_name=event)
for switch in self.config['cancel_switches']:
self.machine.switch_controller.add_switch_handler_obj(
switch, self.event_cancel, 1)
for switch, ms in list(self.config['delay_switch_list'].items()):
self.machine.switch_controller.add_switch_handler_obj(
switch, self._delay_switch_hit, 1, callback_kwargs={"name": switch.name, "ms": ms})
for event, ms in list(self.config['delay_event_list'].items()):
self.machine.events.add_handler(event, self._delay_switch_hit, name=event, ms=ms)
def _remove_handlers(self):
self.machine.events.remove_handler(self._sequence_advance)
self.machine.events.remove_handler(self._delay_switch_hit)
for switch in self.config['cancel_switches']:
self.machine.switch_controller.remove_switch_handler(
switch.name, self.event_cancel, 1)
for switch in list(self.config['delay_switch_list'].keys()):
self.machine.switch_controller.remove_switch_handler(
switch.name, self._delay_switch_hit, 1)
def _sequence_advance(self, event_name, **kwargs):
# Since we can track multiple simultaneous sequences (e.g. two balls
# going into an orbit in a row), we first have to see whether this
# switch is starting a new sequence or continuing an existing one
del kwargs
# mark playfield active
if self.config['playfield']:
self.config['playfield'].mark_playfield_active_from_device_action()
self.debug_log("Sequence advance: %s", event_name)
if event_name == self._sequence_events[0]:
if len(self._sequence_events) > 1:
# start a new sequence
self._start_new_sequence()
elif not self.active_delays:
# if it only has one step it will finish right away
self._completed()
else:
# Get the seq_id of the first sequence this switch is next for.
# This is not a loop because we only want to advance 1 sequence
seq = next((x for x in self.active_sequences if
x.next_event == event_name), None)
if seq:
# advance this sequence
self._advance_sequence(seq)
def _start_new_sequence(self):
# If the sequence hasn't started, make sure we're not within the
# delay_switch hit window
if self.active_delays:
self.debug_log("There's a delay timer in effect from %s. Sequence will not be started.",
self.active_delays)
return
#record start time
self._start_time = self.machine.clock.get_time()
# create a new sequence
seq_id = uuid.uuid4()
next_event = self._sequence_events[1]
self.debug_log("Setting up a new sequence. Next: %s", next_event)
self.active_sequences.append(ActiveSequence(seq_id, 0, next_event))
# if this sequence has a time limit, set that up
if self.config['sequence_timeout']:
self.debug_log("Setting up a sequence timer for %sms",
self.config['sequence_timeout'])
self.delay.reset(name=seq_id,
ms=self.config['sequence_timeout'],
callback=self._sequence_timeout,
seq_id=seq_id)
def _advance_sequence(self, sequence: ActiveSequence):
# Remove this sequence from the list
self.active_sequences.remove(sequence)
if sequence.current_position_index == (len(self._sequence_events) - 2): # complete
self.debug_log("Sequence complete!")
self.delay.remove(sequence.id)
self._completed()
else:
current_position_index = sequence.current_position_index + 1
next_event = self._sequence_events[current_position_index + 1]
self.debug_log("Advancing the sequence. Next: %s", next_event)
self.active_sequences.append(ActiveSequence(sequence.id, current_position_index, next_event))
def _completed(self):
#measure the elapsed time between start and completion of the sequence
if self._start_time is not None:
elapsed = self.machine.clock.get_time() - self._start_time
else:
elapsed = 0
"""Post sequence complete event including its elapsed time to complete."""
self.machine.events.post("{}_hit".format(self.name), elapsed=elapsed)
'''event: (name)_hit
desc: The sequence_shot called (name) was just completed.
'''
@event_handler(0)
def event_cancel(self, **kwargs):
"""Event handler for cancel event."""
del kwargs
self.reset_all_sequences()
def reset_all_sequences(self):
"""Reset all sequences."""
seq_ids = [x.id for x in self.active_sequences]
for seq_id in seq_ids:
self.delay.remove(seq_id)
self.active_sequences = list()
def _delay_switch_hit(self, name, ms, **kwargs):
del kwargs
self.debug_log("Delaying sequence by %sms", ms)
self.delay.reset(name=name + '_delay_timer',
ms=ms,
callback=self._release_delay,
delay_name=name)
self.active_delays.add(name)
def _release_delay(self, delay_name):
self.active_delays.remove(delay_name)
def _sequence_timeout(self, seq_id):
"""Sequence timeouted."""
self.debug_log("Sequence %s timeouted", seq_id)
self.active_sequences = [x for x in self.active_sequences
if x[0] != seq_id]
self.machine.events.post("{}_timeout".format(self.name))