-
Notifications
You must be signed in to change notification settings - Fork 1
/
core.rb
175 lines (141 loc) · 5.18 KB
/
core.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
# $LICENSE
# Copyright 2013-2014 Spotify AB. All rights reserved.
#
# The contents of this file are licensed under the Apache License, Version 2.0
# (the "License"); you may not use this file except in compliance with the
# License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
# License for the specific language governing permissions and limitations under
# the License.
require 'json'
require 'eventmachine'
require_relative 'channel'
require_relative 'debug'
require_relative 'lifecycle'
require_relative 'logging'
require_relative 'plugin_channel'
require_relative 'processor'
require_relative 'protocol'
require_relative 'utils'
require_relative 'core/emitter'
require_relative 'core/interface'
require_relative 'core/processor'
require_relative 'core/reporter'
require_relative 'statistics/collector'
module FFWD
class Core
include FFWD::Lifecycle
include FFWD::Logging
def initialize plugins, opts={}
@tunnel_plugins = plugins[:tunnel] || []
@input_plugins = plugins[:input] || []
@output_plugins = plugins[:output] || []
@statistics_opts = opts[:statistics]
@debug_opts = opts[:debug]
@core_opts = opts[:core] || {}
@processors = FFWD::Processor.load_processors(opts[:processor] || {})
@output_channel = FFWD::PluginChannel.build 'core.output'
@input_channel = FFWD::PluginChannel.build 'core.input'
@system_channel = Channel.new log, "system_channel"
memory_config = (@core_opts[:memory] || {})
@memory_limit = (memory_config[:limit] || 1000).to_f.round(3)
@memory_limit95 = @memory_limit * 0.95
if @memory_limit < 0
raise "memory limit must be non-negative number"
end
@emitter = Core::Emitter.build @output_channel, @core_opts
@processor = Core::Processor.build @input_channel, @emitter, @processors
@debug = nil
if @debug_opts
@debug = FFWD::Debug.setup @debug_opts
@debug.monitor @input_channel, FFWD::Debug::Input
@debug.monitor @output_channel, FFWD::Debug::Output
@debug.depend_on self
end
# Configuration for statistics module.
@statistics = nil
if config = @statistics_opts
@statistics = FFWD::Statistics::Collector.build(
@emitter, @system_channel, config)
@statistics.depend_on self
end
@interface = Core::Interface.new(
@input_channel, @output_channel,
@tunnel_plugins, @statistics, @debug, @processors, @core_opts
)
@interface.depend_on self
@input_instances = @input_plugins.map do |factory|
factory.call @interface
end
@output_instances = @output_plugins.map do |factory|
factory.call @interface
end
unless @statistics.nil?
reporters = [@input_channel, @output_channel, @processor]
reporters += @input_instances.select{|i| FFWD.is_reporter?(i)}
reporters += @output_instances.select{|i| FFWD.is_reporter?(i)}
@statistics.register self, "core", Core::Reporter.new(reporters)
end
# Make the core-related channels depend on core.
# They will then be orchestrated with core when it's being
# started/stopped.
@input_channel.depend_on self
@output_channel.depend_on self
end
# Main entry point.
#
# Since all components are governed by the lifecycle of core, it should
# mostly be a matter of calling 'start'.
def run
# What to do when we receive a shutdown signal?
shutdown_handler = proc do
# Hack to get out of trap context and into EM land.
EM.add_timer(0) do
log.info "Shutting down"
stop
EM.stop
end
end
EM.run do
Signal.trap("INT", &shutdown_handler)
Signal.trap("TERM", &shutdown_handler)
start
setup_memory_monitor
end
stopping do
end
end
private
# Sets up a memory monitor based of :core -> :memory -> :limit.
# Will warn at least once before shutting down.
def setup_memory_monitor
if @memory_limit == 0
log.warning "WARNING!!! YOU ARE RUNNING FFWD WITHOUT A MEMORY LIMIT, THIS COULD DAMAGE YOUR SYSTEM"
log.warning "To configure it, set the (:core -> :memory -> :limit) option to a non-zero number!"
return
end
log.info "Memory limited to #{@memory_limit} MB (:core -> :memory -> :limit)"
memory_one_warning = false
@system_channel.subscribe do |system|
memory = system[:memory]
mb = (memory[:resident].to_f / 1000000).round(3)
if memory_one_warning and mb > @memory_limit
log.error "Memory limit exceeded (#{mb}/#{@memory_limit} MB): SHUTTING DOWN"
EM.stop
next
end
if mb > @memory_limit95
log.warning "Memory limit almost reached (#{mb}/#{@memory_limit} MB)"
memory_one_warning = true
else
memory_one_warning = false
end
end
end
end
end