-
Notifications
You must be signed in to change notification settings - Fork 15
/
Copy pathstdin.rb
102 lines (86 loc) · 2.48 KB
/
stdin.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
# encoding: utf-8
require "logstash/inputs/base"
require "logstash/namespace"
require 'logstash/plugin_mixins/ecs_compatibility_support'
require "socket" # for Socket.gethostname
require "jruby-stdin-channel"
# Read events from standard input.
#
# By default, each event is assumed to be one line. If you
# want to join lines, you'll want to use the multiline codec.
class LogStash::Inputs::Stdin < LogStash::Inputs::Base
include LogStash::PluginMixins::ECSCompatibilitySupport(:disabled, :v1, :v8 => :v1)
config_name "stdin"
default :codec, "line"
READ_SIZE = 16384
# When a configuration is using this plugin
# We are defining a blocking pipeline which cannot be reloaded
def self.reloadable?
false
end
def initialize(*params)
super
@host_key = ecs_select[disabled: 'host', v1: '[host][hostname]']
@event_original_key = ecs_select[disabled: nil, v1: '[event][original]']
end
def register
begin
@stdin = StdinChannel::Reader.new
self.class.module_exec { alias_method :stdin_read, :channel_read }
self.class.module_exec { alias_method :stop, :channel_stop}
rescue => e
@logger.debug("fallback to reading from regular $stdin", :exception => e)
self.class.module_exec { alias_method :stdin_read, :default_read }
self.class.module_exec { alias_method :stop, :default_stop }
end
@host = Socket.gethostname
fix_streaming_codecs
end
def run(queue)
puts "The stdin plugin is now waiting for input:" if $stdin.tty?
while !stop?
if data = stdin_read
process(data, queue)
end
end
end
private
def process(data, queue)
@codec.decode(data) do |event|
decorate(event)
if @event_original_key && !event.include?(@event_original_key)
event.set(@event_original_key, data)
end
event.set(@host_key, @host) if !event.include?(@host_key)
queue << event
end
end
def default_stop
$stdin.close rescue nil
end
def channel_stop
@stdin.close rescue nil
end
def default_read
begin
return $stdin.sysread(READ_SIZE)
rescue IOError, EOFError
do_stop
rescue => e
# ignore any exception in the shutdown process
raise(e) unless stop?
end
nil
end
def channel_read
begin
return @stdin.read(READ_SIZE)
rescue IOError, EOFError, StdinChannel::ClosedChannelError
do_stop
rescue => e
# ignore any exception in the shutdown process
raise(e) unless stop?
end
nil
end
end