Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Comparing changes

Choose two branches to see what’s changed or to start a new pull request. If you need to, you can also compare across forks.

Open a pull request

Create a new pull request by comparing changes across two branches. If you need to, you can also compare across forks.
base fork: jedi4ever/mcollective-cpp-agents
base: 3bf24bc445
...
head fork: jedi4ever/mcollective-cpp-agents
compare: 528b7e4c0f
  • 2 commits
  • 8 files changed
  • 0 commit comments
  • 1 contributor
View
9 libstomp-src/Makefile
@@ -23,7 +23,7 @@ main.o: main.cpp
$(CXX) -Os -c main.cpp -lmcollective -lstomp
listener.o: listener.cpp
- $(CXX) -Os -fPIC -c listener.cpp -o listener.o -I/usr/include/apr-1 -I.
+ $(CXX) -Os -fPIC -c listener.cpp -o listener.o -I/usr/include/apr-1 -I. -I/opt/include/yaml-cpp
stomp.o: stomp.c
gcc -Os -fPIC -c stomp.c -I. -I/usr/include/apr-1 -lapr-1
@@ -34,8 +34,11 @@ agent/base.o: agent/base.cpp
agent/discovery.o: agent/discovery.cpp
$(CXX) -Os -fPIC -c agent/discovery.cpp -o agent/discovery.o -I/usr/include/apr-1 -I. -I/opt/include/yaml-cpp
-libmcollective.so: listener.o agent/base.o agent/discovery.o stomp.o
- $(CXX) -Os -shared -Wl -o libmcollective.so agent/base.o listener.o agent/discovery.o stomp.o
+agent/puppetd.o: agent/puppetd.cpp
+ $(CXX) -Os -fPIC -c agent/puppetd.cpp -o agent/puppetd.o -I/usr/include/apr-1 -I. -I/opt/include/yaml-cpp
+
+libmcollective.so: listener.o agent/base.o agent/discovery.o agent/puppetd.o stomp.o
+ $(CXX) -Os -shared -Wl -o libmcollective.so agent/base.o listener.o agent/discovery.o stomp.o agent/puppetd.o
clean:
rm *.o agent/*.o *.so $(PROGRAM)
View
152 libstomp-src/agent/base.cpp
@@ -29,7 +29,16 @@ namespace Mcollective
return reason;
}
- BaseAgent::BaseAgent (stomp_connection * connection,
+
+ string BaseAgent::agentName() {
+ //std::string name = "base";
+ //return name;
+ };
+
+ BaseAgent::BaseAgent () {
+ }
+
+ void BaseAgent::init (stomp_connection * connection,
apr_pool_t * pool)
{
//_pool = pool;
@@ -46,7 +55,7 @@ namespace Mcollective
rc == APR_SUCCESS || die (-2, "Could not connect", rc);
fprintf (stdout, "OK\n");
-
+ name=agentName();
fprintf (stdout, "Sending connect message.");
{
@@ -73,8 +82,9 @@ namespace Mcollective
frame.command = command;
frame.headers = apr_hash_make(_pool);
-
- apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, "/topic/mcollective.discovery.command");
+ std::string topic= "/topic/mcollective."+name+".command";
+ std::cout << topic ;
+ apr_hash_set(frame.headers, "destination", APR_HASH_KEY_STRING, topic.c_str());
frame.body_length = -1;
frame.body = NULL;
@@ -88,9 +98,141 @@ namespace Mcollective
};
- void BaseAgent::handle (stomp_frame *) {
+ void BaseAgent::reply(string requestid,YAML::Emitter *reply_message_body_yaml) {
+ std::cout << "requestid to reply with" << requestid;
+ /////////// MD5 sign body
+ std::string reply_message_body = (*reply_message_body_yaml).c_str ();
+ std::cout << "@@" << reply_message_body << "@@" << std::endl;
+ // Append PSK to it
+ std::string psk = "unset";
+ std::string body_psk = reply_message_body;
+ body_psk.append (psk);
+ std::stringstream md5sumstream;
+
+ // MD5 - https://gist.github.com/2389719
+ // but needs a real string
+ // http://social.msdn.microsoft.com/Forums/en/Vsexpressvc/thread/e1774395-ba99-4fe6-98eb-2224a67984b9
+ unsigned char md5_result[MD5_DIGEST_LENGTH];
+ const unsigned char *constStr =
+ reinterpret_cast < const unsigned char *>(body_psk.c_str ());
+ MD5 (constStr, body_psk.length (), md5_result);
+ for (int i = 0; i < MD5_DIGEST_LENGTH; i++)
+ {
+ printf ("%02x", md5_result[i]);
+ char digit[2];
+ sprintf (digit, "%02x", md5_result[i]);
+ md5sumstream << digit;
+ }
+ printf ("\n");
+
+
+ std::cout << "md5 stream:" << md5sumstream.str () << std::endl;
+ std::string hash = md5sumstream.str ();
+ std::cout << "hash:" << hash << std::endl;
+
+ std::string topic= "/topic/mcollective."+name+".reply";
+
+ // Construct answer
+
+ YAML::Emitter reply_message_yaml;
+
+ reply_message_yaml << YAML::BeginMap;
+ reply_message_yaml << YAML::Key << ":msgtime";
+ reply_message_yaml << YAML::Value << 1010101;
+ reply_message_yaml << YAML::Key << ":requestid";
+ reply_message_yaml << YAML::Value << requestid;
+ reply_message_yaml << YAML::Key << ":body";
+ reply_message_yaml << YAML::Value << reply_message_body.c_str();
+ reply_message_yaml << YAML::Key << ":senderid";
+ reply_message_yaml << YAML::Value << "mcpp";
+ reply_message_yaml << YAML::Key << ":senderagent";
+ //reply_message_yaml << YAML::Value << "discovery";
+ reply_message_yaml << YAML::Value << name;
+ reply_message_yaml << YAML::Key << ":msgtarget";
+ reply_message_yaml << YAML::Value << topic;
+
+
+ reply_message_yaml << YAML::Key << ":hash";
+ reply_message_yaml << YAML::Value << hash;
+ reply_message_yaml << YAML::EndMap;
+
+
+ // Put it in a string
+ std::string reply_message = reply_message_yaml.c_str ();
+ cout << reply_message;
+
+ ///Send it
+
+ stomp_frame reply_frame;
+ char command[] = "SEND";
+ reply_frame.command = command;
+ reply_frame.headers = apr_hash_make (_pool);
+ apr_hash_set (reply_frame.headers, "destination", APR_HASH_KEY_STRING,
+ topic.c_str());
+
+ reply_frame.body_length = -1;
+ char *caution = const_cast<char *>(reply_message.c_str());
+
+ reply_frame.body = caution;
+ apr_status_t rc;
+ rc = stomp_write (_connection, &reply_frame, _pool);
+ //rc == APR_SUCCESS || die (-2, "Could not send frame", rc);
+
};
+ void BaseAgent::handle (stomp_frame * frame)
+ {
+ fprintf (stdout, "Response: %s, %s\n", frame->command, frame->body);
+ std::string msg (frame->body);
+ if (strlen (frame->body) == 0)
+ {
+ cout << "No body in packet";
+ return;
+ }
+
+ std::stringstream msg_stream (msg);
+ cout << "******\n";
+ cout << msg_stream.str ();
+ cout << "******\n";
+
+ //Parse msg stream
+ YAML::Parser msg_parser (msg_stream);
+ YAML::Node msg_doc;
+ msg_parser.GetNextDocument (msg_doc);
+
+ for (YAML::Iterator it = msg_doc.begin (); it != msg_doc.end (); ++it)
+ {
+ std::string key, value;
+ it.first () >> key;
+ //std::cout << "Key: " << key << std::endl;
+ }
+
+
+ // Body seems to be multiline string of yaml
+ // Parsing strings http://stackoverflow.com/questions/2813030/yaml-cpp-parsing-strings
+ std::string body;
+ msg_doc[":body"] >> body;
+
+ std::stringstream body_stream (body);
+ cout << "******\n";
+ cout << body_stream.str ();
+ cout << "******\n";
+ YAML::Parser body_parser (body_stream);
+ YAML::Node body_doc;
+ std::string action;
+ body_parser.GetNextDocument (body_doc);
+ //body_doc >> action;
+ //std::cout << action;
+ receive(&msg_doc,&body_doc);
+
+
+
+ };
+
+
+ void BaseAgent::receive(YAML::Node *msg_doc, YAML::Node *body_doc) {
+ };
+
void BaseAgent::start() {
printf("lalalaal");
while (1)
View
15 libstomp-src/agent/base.h
@@ -1,13 +1,26 @@
+#include <yaml.h>
+
+#ifndef __MCOLLECTIVEBASEAGENT__
+#define __MCOLLECTIVEBASEAGENT__
+
namespace Mcollective
{
class BaseAgent {
public: stomp_connection *_connection;
public: apr_pool_t *_pool;
+ public: std::string name;
public:
- BaseAgent (stomp_connection *, apr_pool_t *);
+ BaseAgent ();
virtual void handle (stomp_frame *);
public: virtual void start();
+ virtual void init (stomp_connection *, apr_pool_t *);
+
+ public: virtual std::string agentName();
+ virtual void receive(YAML::Node *msg_doc, YAML::Node *body_doc);
+ virtual void reply(std::string request_id, YAML::Emitter *reply_message_yaml);
};
}
+
+#endif
View
136 libstomp-src/agent/discovery.cpp
@@ -19,13 +19,18 @@ using namespace std;
namespace Mcollective
{
- DiscoveryAgent::DiscoveryAgent (stomp_connection * connection,
- apr_pool_t * pool): BaseAgent::BaseAgent(connection,pool)
+ DiscoveryAgent::DiscoveryAgent (): BaseAgent::BaseAgent()
{
};
+ string DiscoveryAgent::agentName() {
+ std::string name = "discovery";
+ return name;
+ }
+
void DiscoveryAgent::receive(YAML::Node *msg_doc, YAML::Node *body_doc)
{
+ printf("yes we are receiving");
std::string requestid;
std::string senderid;
std::string msgtarget;
@@ -33,6 +38,7 @@ namespace Mcollective
(*msg_doc)[":requestid"] >> requestid;
(*msg_doc)[":senderid"] >> senderid;
+// std::cout << "We received requesid" << requestid;
///////////// Construct body
// Construct YAML body
YAML::Emitter reply_message_body_yaml;
@@ -40,132 +46,6 @@ namespace Mcollective
reply(requestid,&reply_message_body_yaml);
};
- void DiscoveryAgent::reply(string requestid,YAML::Emitter *reply_message_body_yaml) {
- /////////// MD5 sign body
- std::string reply_message_body = reply_message_body_yaml->c_str ();
- std::cout << reply_message_body << std::endl;
- // Append PSK to it
- std::string psk = "unset";
- std::string body_psk = reply_message_body;
- body_psk.append (psk);
- std::stringstream md5sumstream;
-
- // MD5 - https://gist.github.com/2389719
- // but needs a real string
- // http://social.msdn.microsoft.com/Forums/en/Vsexpressvc/thread/e1774395-ba99-4fe6-98eb-2224a67984b9
- unsigned char md5_result[MD5_DIGEST_LENGTH];
- const unsigned char *constStr =
- reinterpret_cast < const unsigned char *>(body_psk.c_str ());
- MD5 (constStr, body_psk.length (), md5_result);
- for (int i = 0; i < MD5_DIGEST_LENGTH; i++)
- {
- printf ("%02x", md5_result[i]);
- char digit[2];
- sprintf (digit, "%02x", md5_result[i]);
- md5sumstream << digit;
- }
- printf ("\n");
-
-
- std::cout << "md5 stream:" << md5sumstream.str () << std::endl;
- std::string hash = md5sumstream.str ();
- std::cout << "hash:" << hash << std::endl;
-
- // Construct answer
-
- YAML::Emitter reply_message_yaml;
-
- reply_message_yaml << YAML::BeginMap;
- reply_message_yaml << YAML::Key << ":msgtime";
- reply_message_yaml << YAML::Value << 1010101;
- reply_message_yaml << YAML::Key << ":requestid";
- reply_message_yaml << YAML::Value << requestid;
- reply_message_yaml << YAML::Key << ":body";
- reply_message_yaml << YAML::Value << reply_message_body;
- reply_message_yaml << YAML::Key << ":senderid";
- reply_message_yaml << YAML::Value << "mcpp";
- reply_message_yaml << YAML::Key << ":senderagent";
- reply_message_yaml << YAML::Value << "discovery";
- reply_message_yaml << YAML::Key << ":msgtarget";
- reply_message_yaml << YAML::Value << "/topic/mcollective.discovery.reply";
-
- reply_message_yaml << YAML::Key << ":hash";
- reply_message_yaml << YAML::Value << hash;
- reply_message_yaml << YAML::EndMap;
-
-
- // Put it in a string
- std::string reply_message = reply_message_yaml.c_str ();
- cout << reply_message;
-
- ///Send it
-
- stomp_frame reply_frame;
- char command[] = "SEND";
- reply_frame.command = command;
- reply_frame.headers = apr_hash_make (_pool);
- apr_hash_set (reply_frame.headers, "destination", APR_HASH_KEY_STRING,
- "/topic/mcollective.discovery.reply");
-
- reply_frame.body_length = -1;
- char *caution = const_cast<char *>(reply_message.c_str());
-
- reply_frame.body = caution;
- apr_status_t rc;
- rc = stomp_write (_connection, &reply_frame, _pool);
- //rc == APR_SUCCESS || die (-2, "Could not send frame", rc);
-
- };
-
- void DiscoveryAgent::handle (stomp_frame * frame)
- {
- fprintf (stdout, "Response: %s, %s\n", frame->command, frame->body);
- std::string msg (frame->body);
- if (strlen (frame->body) == 0)
- {
- cout << "No body in packet";
- return;
- }
-
- std::stringstream msg_stream (msg);
- cout << "******\n";
- cout << msg_stream.str ();
- cout << "******\n";
-
- //Parse msg stream
- YAML::Parser msg_parser (msg_stream);
- YAML::Node msg_doc;
- msg_parser.GetNextDocument (msg_doc);
-
- for (YAML::Iterator it = msg_doc.begin (); it != msg_doc.end (); ++it)
- {
- std::string key, value;
- it.first () >> key;
- //std::cout << "Key: " << key << std::endl;
- }
-
-
-
- // Body seems to be multiline string of yaml
- // Parsing strings http://stackoverflow.com/questions/2813030/yaml-cpp-parsing-strings
- std::string body;
- msg_doc[":body"] >> body;
-
- std::stringstream body_stream (body);
- cout << "******\n";
- cout << body_stream.str ();
- cout << "******\n";
- YAML::Parser body_parser (body_stream);
- YAML::Node body_doc;
- std::string action;
- body_parser.GetNextDocument (body_doc);
- body_doc >> action;
- std::cout << action;
- receive(&msg_doc,&body_doc);
-
-
-
- };
}
View
12 libstomp-src/agent/discovery.h
@@ -4,15 +4,13 @@
namespace Mcollective
{
- class DiscoveryAgent : public BaseAgent {
+ class DiscoveryAgent : public BaseAgent {
- public:
- DiscoveryAgent (stomp_connection *, apr_pool_t *);
+ public: DiscoveryAgent ();
- virtual void handle (stomp_frame *);
- void receive(YAML::Node *msg_doc, YAML::Node *body_doc);
- void reply(std::string request_id, YAML::Emitter *reply_message_yaml);
+ public: std::string agentName();
+ public: void receive(YAML::Node *msg_doc, YAML::Node *body_doc);
- };
+ };
}
View
89 libstomp-src/agent/puppetd.cpp
@@ -0,0 +1,89 @@
+#include "puppetd.h"
+#include "stomp.h"
+#include "yaml.h"
+#include <openssl/md5.h>
+
+#include <fstream>
+#include <sstream>
+#include <iostream>
+#include <string>
+
+#include <pthread.h>
+
+#include <stdio.h>
+#include <stdlib.h>
+#include <string.h>
+
+using namespace std;
+
+namespace Mcollective
+{
+
+ PuppetdAgent::PuppetdAgent (): BaseAgent::BaseAgent()
+ {
+ };
+
+ string PuppetdAgent::agentName() {
+ std::string name = "puppetd";
+ return name;
+ };
+ void PuppetdAgent::receive(YAML::Node *msg_doc, YAML::Node *body_doc)
+ {
+ printf("yes we are receiving");
+ std::string requestid;
+ std::string senderid;
+ std::string msgtarget;
+ (*msg_doc)[":msgtarget"] >> msgtarget;
+ (*msg_doc)[":requestid"] >> requestid;
+ (*msg_doc)[":senderid"] >> senderid;
+
+ ///////////// Construct body
+ // Construct YAML body
+
+ YAML::Emitter data;
+ data << YAML::BeginMap;
+ data << YAML::Key << ":status";
+ data << YAML::Value << "running";
+ data << YAML::Key << ":running";
+ data << YAML::Value << "1";
+ data << YAML::Key << ":disabled";
+ data << YAML::Value << "0";
+ data << YAML::Key << ":idling";
+ data << YAML::Value << "0";
+ data << YAML::Key << ":stopped";
+ data << YAML::Value << "0";
+ data << YAML::Key << ":lastrun";
+ data << YAML::Value << "0";
+ data << YAML::Key << ":output";
+ data << YAML::Value << "Houston we got a lift off!";
+ data << YAML::EndMap;
+
+ YAML::Emitter r;
+ r << YAML::BeginMap;
+ r << YAML::Key << ":statuscode";
+ r << YAML::Value << "0";
+ r << YAML::Key << ":statusmsg";
+ r << YAML::Value << "Houston we got a lift off!";
+ r << YAML::Key << ":data";
+ r << YAML::Value << data.c_str();
+ r << YAML::EndMap;
+
+
+ reply(requestid,&r);
+ };
+
+
+}
+
+/*
+ reply[:status] = puppet_daemon_status
+ reply[:running] = reply[:status] == 'running' ? 1 : 0
+ reply[:enabled] = reply[:status] == 'disabled' ? 0 : 1
+ reply[:idling] = reply[:status] == 'idling' ? 1 : 0
+ reply[:stopped] = reply[:status] == 'stopped' ? 1 : 0
+ reply[:lastrun] = 0
+ reply[:lastrun] = File.stat(@statefile).mtime.to_i if File.exists?(@statefile)
+ reply[:output] = "Currently #{reply[:status]}; last completed run #{Time.now.to_i - reply[:lastrun]} seconds ag
+"
+ end
+*/
View
17 libstomp-src/agent/puppetd.h
@@ -0,0 +1,17 @@
+#include "stomp.h"
+#include "base.h"
+#include "yaml.h"
+
+namespace Mcollective
+{
+ class PuppetdAgent : public BaseAgent {
+
+ public: PuppetdAgent ();
+
+ public: std::string agentName();
+
+ public: void receive(YAML::Node *msg_doc, YAML::Node *body_doc);
+
+ };
+}
+
View
16 libstomp-src/listener.cpp
@@ -1,4 +1,5 @@
#include "agent/discovery.h"
+#include "agent/puppetd.h"
#include "listener.h"
#include "stomp.h"
#include <pthread.h>
@@ -32,11 +33,22 @@ namespace Mcollective
void *DiscoveryThread(void *threadarg) {
struct thread_data *my_data;
my_data = (struct thread_data *) threadarg;
- DiscoveryAgent agent(my_data->connection,my_data->pool);
+ DiscoveryAgent agent;
+ agent.init(my_data->connection,my_data->pool);
printf("starting");
agent.start();
}
+ void *PuppetdThread(void *threadarg) {
+ struct thread_data *my_data;
+ my_data = (struct thread_data *) threadarg;
+ PuppetdAgent agent;
+ agent.init(my_data->connection,my_data->pool);
+ printf("starting");
+ agent.start();
+ }
+
+
Listener::Listener ()
{
}
@@ -59,7 +71,7 @@ namespace Mcollective
thread_data_array[0].thread_id = 0;
rc = pthread_create(&threads[0], NULL, DiscoveryThread, (void *) &thread_data_array[0] );
thread_data_array[1].thread_id = 1;
- rc = pthread_create(&threads[1], NULL, DiscoveryThread, (void *) &thread_data_array[1] );
+ rc = pthread_create(&threads[1], NULL, PuppetdThread, (void *) &thread_data_array[1] );
}

No commit comments for this range

Something went wrong with that request. Please try again.