-
Notifications
You must be signed in to change notification settings - Fork 12
/
decided.py
151 lines (116 loc) · 4.69 KB
/
decided.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
from twisted.internet.protocol import ClientFactory
from twisted.internet import reactor
from spacecraft.client_helpers import relative_angle
import spacecraft
class AbortExecutionException(Exception):
pass
class Decition(object):
def __init__(self, action=lambda _, __: None, value=0,
priority=1, condition=lambda _: True,
abort_condition=lambda _: False, children=None):
self.action = (action, value)
self.priority = priority
self.condition = (condition,)
self.abort_condition = (abort_condition,)
if children is None:
children = []
self.children = children
def execute(self, message):
if self.abort_condition[0](message):
raise AbortExecutionException()
if self.condition[0](message):
self.action[0](value=self.action[1], message=message)
for child in self.children:
child.execute(message)
class BaseDecitionTreeClient(spacecraft.server.ClientBase):
name = 'decitiontree'
def _define_behaviors(self):
pass
def _initial_behavior(self):
self.current = []
def throttle(self, value=0, message=""):
self.command('throttle', value=value)
def fire(self, value=0, message=""):
self.command('fire')
def turn(self, value=0, message=""):
self.command('turn', value=value)
def messageReceived(self, message):
self.message = message
if not getattr(self, 'initialized', False):
self.initialized = True
self._define_behaviors()
self._initial_behavior()
try:
for decition in self.current:
decition.execute(message)
except AbortExecutionException:
pass
class DecitionTreeClient(BaseDecitionTreeClient):
def player_near(self, message):
if message['type'] == 'sensor':
if filter(lambda x: x['object_type'] == 'player', message['proximity']):
return True
return False
def player_far(self, message):
return not self.player_near(message)
def is_healthy(self, message):
if message['type'] == 'sensor':
return message.get('status', {}).get('health', 100) > 50
return True
def is_damaged(self, message):
return not self.is_healthy(message)
def to_passive(self, value=0, message=""):
self.current = self.passive
def to_avoid(self, value=0, message=""):
self.current = self.avoid
def to_frenzy(self, value=0, message=""):
self.current = self.frenzy
def smart_turn(self):
if self.message['type'] == 'sensor':
currentx = self.message['gps']['position'][0]
currenty = self.message['gps']['position'][1]
currentangle = self.message['gps']['angle']
for x in self.message['proximity']:
if x['object_type'] == 'player':
targetx = x['position'][0]
targety = x['position'][1]
break
return relative_angle(currentx, currenty, targetx, targety, currentangle)
def can_smart_turn(self):
return -0.3 < self.smart_turn() < 0.3
def frenzy_decition(self):
return Decition(action=self.to_frenzy, condition=self.is_damaged, children=[
Decition(abort_condition=lambda _: True)
])
def _define_behaviors(self):
self.passive = [
Decition(action=self.fire),
Decition(action=self.throttle, value=.2),
Decition(action=self.turn, value=.2),
self.frenzy_decition(),
Decition(action=self.to_avoid, condition=self.player_near),
]
self.avoid = [
Decition(action=self.fire),
Decition(action=self.turn, value=self.smart_turn(), condition=self.can_smart_turn),
Decition(action=self.turn, value=.3),
Decition(action=self.throttle, value=1),
self.frenzy_decition(),
Decition(action=self.to_passive, condition=self.player_far)
]
self.frenzy = [
Decition(action=self.throttle, value=.7, condition=self.player_far),
Decition(action=self.throttle, value=1, condition=self.player_near),
Decition(action=self.turn, value=.1, condition=self.player_far),
Decition(action=self.turn, value=.2, condition=self.player_near),
Decition(action=self.fire),
]
def _initial_behavior(self):
self.current = self.passive
def main():
factory = ClientFactory()
factory.protocol = DecitionTreeClient
reactor.connectTCP("localhost", 11106, factory)
if __name__ == "__main__":
reactor.callWhenRunning(main)
reactor.run()