This repository has been archived by the owner on Dec 7, 2022. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 13
/
redis.rb
132 lines (112 loc) · 4.19 KB
/
redis.rb
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
require 'redis'
require 'ostruct'
module MCollective
module Connector
# A basic connector for mcollective using Redis.
#
# It is not aimed at large deployments more aimed as a getting
# starter / testing style setup which would be easier for new
# users to evaluate mcollective
#
# It supports direct addressing and sub collectives
#
# We'd also add a registration plugin for it and a discovery
# plugin which means we can give a very solid fast first-user
# experience using this
class Redis<Base
def initialize
@config = Config.instance
@sources = []
@subscribed = false
end
def connect
@receiver_redis = ::Redis.new
@receiver_queue = Queue.new
@receiver_thread = nil
@sender_redis = ::Redis.new
@sender_queue = Queue.new
@sender_thread = nil
start_sender_thread
end
def subscribe(agent, type, collective)
unless @subscribed
if PluginManager["security_plugin"].initiated_by == :client
@sources << "mcollective::reply::%s::%d" % [@config.identity, $$]
else
@config.collectives.each do |collective|
@sources << "%s::server::direct::%s" % [collective, @config.identity]
@sources << "%s::server::agents" % collective
end
end
@subscribed = true
start_receiver_thread(@sources)
end
end
def unsubscribe(agent, type, collective); end
def disconnect; end
def receive
msg = @receiver_queue.pop
Message.new(msg.body, msg, :headers => msg.headers)
end
def publish(msg)
target = {:name => nil, :headers => {}, :name => nil}
if msg.type == :direct_request
msg.discovered_hosts.each do |node|
target[:name] = "%s::server::direct::%s" % [msg.collective, node]
target[:headers]["reply-to"] = msg.reply_to
Log.debug("Sending a direct message to Redis target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
@sender_queue << {:channel => target[:name],
:body => msg.payload,
:headers => target[:headers]}
end
else
if msg.type == :reply
target[:name] = msg.request.headers["reply-to"]
target[:headers]["reply-to"] = "mcollective::reply::%s::%d" % [@config.identity, $$]
elsif msg.type == :request
target[:name] = "%s::server::agents" % msg.collective
target[:headers]["reply-to"] = msg.collective
end
Log.debug("Sending a broadcast message to Redis target '#{target[:name]}' with headers '#{target[:headers].inspect}'")
@sender_queue << {:channel => target[:name],
:body => msg.payload,
:headers => target[:headers]}
end
end
def start_receiver_thread(sources)
@receiver_thread = Thread.new do
@receiver_redis.subscribe(@sources) do |on|
on.subscribe do |channel, subscriptions|
Log.debug("Subscribed to %s" % channel)
end
on.message do |channel, message|
begin
decoded_msg = YAML.load(message)
new_message = OpenStruct.new
new_message.channel = channel
new_message.body = decoded_msg[:body]
new_message.headers = decoded_msg[:headers]
@receiver_queue << new_message
rescue => e
Log.warn("Failed to receive from the receiver source: %s: %s" % [e.class, e.to_s])
end
end
end
end
end
def start_sender_thread
@sender_thread = Thread.new do
loop do
begin
msg = @sender_queue.pop
encoded = {:body => msg[:body], :headers => msg[:headers]}.to_yaml
@sender_redis.publish(msg[:channel], encoded)
rescue => e
Log.warn("Could not publish message to redis: %s: %s" % [e.class, e.to_s])
end
end
end
end
end
end
end