/
base.rb
132 lines (103 loc) · 3.41 KB
/
base.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
# frozen_string_literal: true
require "time"
module WSDirector
module Protocols
# Base protocol describes basic actions
class Base
include WSDirector::Utils
def initialize(task)
@task = task
end
def init_client(**options)
@client = build_client(**options)
end
def handle_step(step)
type = step.delete("type")
raise Error, "Unknown step: #{type}" unless respond_to?(type)
public_send(type, step)
end
# Sleeps for a specified number of seconds.
#
# If "shift" is provided than the initial value is
# shifted by random number from (-shift, shift).
#
# Set "debug" to true to print the delay time.
def sleep(step)
delay = step.fetch("time").to_f
shift = step.fetch("shift", 0).to_f
delay = delay - shift * rand + shift * rand
print("Sleep for #{delay}s") if step.fetch("debug", false)
Kernel.sleep delay if delay > 0
end
# Prints provided message
def debug(step)
print(step.fetch("message"))
end
def receive(step)
expected = step.fetch("data")
received = client.receive
raise UnmatchedExpectationError, prepare_receive_error(expected, received) unless
receive_matches?(expected, received)
rescue ThreadError
raise NoMessageError, "Expected to receive #{expected} but nothing has been received"
end
# rubocop: disable Metrics/CyclomaticComplexity
def receive_all(step)
messages = step.delete("messages")
raise ArgumentError, "Messages array must be specified" if
messages.nil? || messages.empty?
expected =
Hash[messages.map do |msg|
multiplier = parse_multiplier(msg.delete("multiplier") || "1")
[msg["data"], multiplier]
end]
total_expected = expected.values.sum
total_received = 0
total_expected.times do
received = client.receive
total_received += 1
match = expected.find { |k, _| receive_matches?(k, received) }
raise UnexpectedMessageError, "Unexpected message received: #{received}" if
match.nil?
expected[match.first] -= 1
expected.delete(match.first) if expected[match.first].zero?
end
rescue ThreadError
raise NoMessageError,
"Expected to receive #{total_expected} messages " \
"but received only #{total_received}"
end
# rubocop: enable Metrics/CyclomaticComplexity
def send(step)
data = step.fetch("data")
data = JSON.generate(data) if data.is_a?(Hash)
client.send(data)
end
def wait_all(_step)
task.global_holder.wait_all
end
def to_proc
proc { |step| handle_step(step) }
end
private
attr_reader :client, :task
def build_client(**options)
Client.new(**options)
end
def receive_matches?(expected, received)
received = JSON.parse(received) if expected.is_a?(Hash)
received == expected
end
def prepare_receive_error(expected, received)
<<~MSG
Action failed: #receive
-- expected: #{expected}
++ got: #{received}
MSG
end
def print(msg)
$stdout.puts "DEBUG #{Time.now.iso8601} client=#{client.id} #{msg}\n"
end
end
end
end