-
Notifications
You must be signed in to change notification settings - Fork 0
/
stream.rb
154 lines (129 loc) · 3.04 KB
/
stream.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
# -*- coding: utf-8 -*-
require 'anytick'
require 'digest'
module Riser
class Stream
extend Anytick.rule(Anytick::DefineMethod)
using CompatibleStringIO
def initialize(io)
@io = io
end
def to_io
@io.to_io
end
# compatible with Ruby 2.6 and 2.7
`def gets(...)
@io.gets(...)
end
`
def read(size)
@io.read(size)
end
def readpartial(maxlen, outbuf=nil)
@io.readpartial(maxlen, outbuf)
end
def write(message)
@io.write(message)
end
def <<(message)
write(message)
self
end
def flush
@io.flush
self
end
def close
flush
@io.close
nil
end
end
class WriteBufferStream < Stream
def initialize(io, buffer_limit=1024*16)
super(io)
@buffer_limit = buffer_limit
@buffer_string = ''.b
end
def write_and_flush
write_bytes = @io.write(@buffer_string)
while (write_bytes < @buffer_string.bytesize)
remaining_byte_range = write_bytes..-1
write_bytes += @io.write(@buffer_string.byteslice(remaining_byte_range))
end
@buffer_string.clear
@io.flush
write_bytes
end
private :write_and_flush
def write(message)
@buffer_string << message.b
write_and_flush if (@buffer_string.bytesize >= @buffer_limit)
end
def flush
write_and_flush unless @buffer_string.empty?
self
end
end
class LoggingStream < Stream
using CompatibleStringIO
def self.make_tag(io)
hex = Digest::SHA256.hexdigest(io.to_s)[0, 7]
io = io.to_io
fd = io.to_i
if (io.respond_to? :remote_address) then
addr = io.remote_address
if (addr.ip?) then
# expected only stream type
"[#{hex},#{fd},tcp://#{io.remote_address.inspect_sockaddr}]"
elsif (addr.unix?) then
"[#{hex},#{fd},unix:#{io.remote_address.unix_path}]"
else
"[#{hex},#{fd},unknown:#{io.remote_address.inspect_sockaddr}]"
end
else
"[#{hex},#{fd}]"
end
end
def initialize(io, logger)
super(io)
@logger = logger
@tag = self.class.make_tag(io)
@logger.debug("#{@tag} start") if @logger.debug?
end
# compatible with Ruby 2.6 and 2.7
`def gets(...)
line = super
@logger.info("\#{@tag} r \#{line.inspect}") if @logger.info?
line
end
`
def read(size)
data = super
@logger.info("#{@tag} r #{data.inspect}") if @logger.info?
data
end
def readpartial(maxlen, outbuf=nil)
data = super
@logger.info("#{@tag} r #{data.inspect}") if @logger.info?
data
end
def write(message)
@logger.info("#{@tag} w #{message.inspect}") if @logger.info?
super
end
def flush
@logger.info("#{@tag} flush") if @logger.info?
super
end
def close
ret_val = super
@logger.info("#{@tag} close") if @logger.info?
ret_val
end
end
end
# Local Variables:
# mode: Ruby
# indent-tabs-mode: nil
# End: