/
order_server_singlethread.py
93 lines (76 loc) · 2.53 KB
/
order_server_singlethread.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import rclpy
from rclpy.executors import ExternalShutdownException
from rclpy.node import Node
from ros2_example_msgs.srv import OrderCreate
import datetime
import time
class OrderServer(Node):
"""
発注サービスServerノード
"""
def __init__(self) -> None:
"""
ノードの初期化
"""
super().__init__("order_server")
# ロガー取得
self.logger = self.get_logger()
# order_createサービスserverの定義
self.srv = self.create_service(
OrderCreate,
"order_create",
self._order_create,
)
# 開始時刻を取得
self._start_time = datetime.datetime.now()
# 5.0秒周期で実行されるROSタイマーの定義
self.timer = self.create_timer(5.0, self._timer_callback)
def _timer_callback(self) -> None:
"""
timerコールバック
"""
# 開始経過時刻を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<SVR>timer_callback:開始".format(elps))
time.sleep(3.0) # 3.0秒停止
# 終了経過時間を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info("[{}]<SVR>timer_callback:終了".format(elps))
def _order_create(self, req, rsp):
"""
order_createサービスコールバック
"""
# 開始経過時刻を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info(
"[{}]<SVR>発注リクエスト受信(通貨ペア[{}] 数量[{}])".format(
elps, req.instrument, req.units
)
)
time.sleep(1.0) # 1.0秒停止
# 終了経過時間を取得
elps = datetime.datetime.now() - self._start_time
self.logger.info(
"[{}]<SVR>発注レスポンス送信(通貨ペア[{}] 数量[{}])".format(
elps, req.instrument, req.units
)
)
rsp.result = True
return rsp
def main(args: list[str] | None = None) -> None:
# ROSの初期化
rclpy.init(args=args)
# order_serverノードの作成
os = OrderServer()
os.get_logger().info("order_server start!")
try:
# ノードの実行開始
rclpy.spin(os)
except (KeyboardInterrupt, ExternalShutdownException):
pass
else:
# ROSのシャットダウン
rclpy.shutdown()
finally:
# ノードの破棄
os.destroy_node()