Permalink
Browse files

talk, code, etc

  • Loading branch information...
1 parent 5bd3c24 commit 2f392ee52f28d038b6e8fb199c42e513303402a7 @neophenix committed Jul 20, 2012
View
8 README.md
@@ -1,4 +1,10 @@
StateOfTheMQ
============
-Slides / code from my State of the MQ talk for OSCON 2012
+Slides / code from my State of the MQ talk for OSCON 2012
+
+Other Repos
+===========
+
+fq - https://github.com/postwait/fq
+Perl Kafka lib (experimental) - https://github.com/neophenix/Kafka
View
64 clients/kafka/consumer.pl
@@ -0,0 +1,64 @@
+#!/usr/bin/perl
+
+$| = 1;
+
+use lib './lib';
+use strict;
+use Kafka::Consumer;
+use Kafka::FetchRequest;
+use Kafka::OffsetRequest;
+use Data::Dumper;
+use Getopt::Long;
+use Time::HiRes qw(gettimeofday);
+
+my $host = undef;
+GetOptions(
+ "host|h=s" => \$host
+);
+
+my $c = Kafka::Consumer->new(host => $host, port => 9092);
+my $f = Kafka::FetchRequest->new(topic => 'test', partition => 0, offset => 0);
+my $o = Kafka::OffsetRequest->new(topic => 'test', partition => 0);
+
+my $lng_offset = 0;
+my $offset = 0;
+
+my $msg_cnt = 0;
+my ($low, $high, $run, $total_s, $total_m, $avg) = (50000, 0, 0, 0, 0, 0);
+my $t0 = gettimeofday();
+my $t1 = gettimeofday();
+my $per_sec = 0;
+while (1) {
+ eval {
+ my $m = $c->fetch($f);
+ if ( $m && ! $m->{'error'} ) {
+ $t1 = gettimeofday();
+ if ( $t1 - $t0 >= 1 ) {
+ $per_sec = ($msg_cnt / ($t1 - $t0));
+ $high = $per_sec if ( $run > 1 && $per_sec > $high );
+ $low = $per_sec if ( $run > 1 && $per_sec < $low );
+ $total_s += ($t1 - $t0) if ( $run > 1 );
+ $total_m += $msg_cnt if ( $run > 1 );
+ $avg = ($total_m / $total_s) if ( $run > 1 );
+ $run++;
+ print "MSG/s: $per_sec\t\tAvg: $avg\n";
+
+ $t0 = gettimeofday();
+ $msg_cnt = 0;
+ }
+
+ foreach my $msg ( @{$m->get_messages()} ) {
+ $msg_cnt++;
+ # print $msg->message() . "\n";
+ }
+ #print $m->get_messages()->[0]->message() . "\n";
+ $lng_offset = $offset;
+ $offset += $m->total_bytes();
+ $f->offset($offset);
+ }
+ };
+ if ( $@ ) {
+ #warn $@;
+ $offset = $lng_offset;
+ }
+}
View
80 clients/kafka/producer.pl
@@ -0,0 +1,80 @@
+#!/usr/bin/perl
+
+$| = 1;
+
+use lib './lib';
+use strict;
+
+use Data::Dumper;
+use Kafka::Producer;
+use POSIX;
+use Getopt::Long;
+use Time::HiRes qw(gettimeofday);
+use JSON;
+
+my %children;
+my $g_run_forrest_run = 1;
+$SIG{TERM} = \&sigterm;
+$SIG{CHLD} = \&REAPER;
+
+my $children = 1;
+my $limit = 0;
+my $host = undef;
+GetOptions(
+ "children|c=i" => \$children,
+ "limit|l=i" => \$limit,
+ "host|h=s" => \$host
+);
+
+my $json;
+#open IN,"<../messages.json";
+open IN,"<../sensor.json";
+while (<IN>) { $json .= $_ }
+close IN;
+my $messages = decode_json($json);
+
+for ( my $i = 0; $i < $children; ++$i ) {
+ &saturate();
+}
+pause() while(scalar(keys(%children)));
+
+sub saturate {
+ my $pid = fork();
+ if($pid) {
+ print "Child $pid started\n";
+ $children{$pid}++;
+ return $pid;
+ }
+
+ my $cnt = 0;
+ my $p = Kafka::Producer->new(host => $host, port => 9092);
+ my $t0 = gettimeofday();
+ while ($g_run_forrest_run ) {
+ $p->send(topic => 'test', messages => [ $messages->[int(rand(4))] ]);
+ if ( $limit ) {
+ ++$cnt;
+ if ( $cnt == $limit ) {
+ my $t1 = gettimeofday();
+ print "reached my limit, sleeping (".($t1 - $t0).")\n";
+ sleep(1);
+ $cnt = 0;
+ $t0 = gettimeofday();
+ }
+ }
+ }
+ exit 0;
+}
+
+sub sigterm {
+ $g_run_forrest_run = 0;
+}
+
+sub REAPER {
+ my $child;
+ while ($child = waitpid(-1, WNOHANG)){
+ last if $child == -1;
+ print "Child $child exited\n";
+ delete $children{$child};
+ }
+ $SIG{CHLD} = \&REAPER;
+}
View
100 clients/kestrel/consumer.pl
@@ -0,0 +1,100 @@
+#!/usr/bin/perl
+
+use strict;
+
+$| = 1;
+
+use Data::Dumper;
+use Net::Kestrel;
+use Getopt::Long;
+use Time::HiRes qw(gettimeofday);
+use POSIX;
+use JSON;
+
+my $children = 10;
+my $host = undef;
+GetOptions(
+ "children|c=i" => \$children,
+ "host|h=s" => \$host
+);
+
+my $DEBUG = 1;
+
+my %children;
+my $g_run_forrest_run = 1;
+$SIG{TERM} = \&sigterm;
+$SIG{CHLD} = \&REAPER;
+$SIG{INT} = \&sigterm;
+
+for ( my $i = 0; $i < $children; ++$i ) {
+ &run($i);
+}
+pause() while(scalar(keys(%children)));
+
+exit 0;
+
+sub run {
+ my $id = shift;
+ my $pid = fork();
+ if($pid) {
+ print "Child $pid started\n";
+ $children{$pid}++;
+ return $pid;
+ }
+
+ my ($mq, $q);
+ my ($low, $high, $run, $total_s, $total_m, $avg) = (50000, 0, 0, 0, 0, 0);
+
+ my $queue = 'testing-kestrel';
+ my $k = Net::Kestrel->new(host => $host);
+
+ print "RUNNING...\n" if $DEBUG;
+ my $i = 0;
+ my $t0 = gettimeofday();
+ my $t1 = gettimeofday();
+ my $per_sec = 0;
+ eval {
+ while ( $g_run_forrest_run ) {
+ my $v = $k->get($queue);
+ if ( $v ) {
+ $i++;
+ $t1 = gettimeofday();
+ if ( $t1 - $t0 >= 1 ) {
+ $per_sec = ($i / ($t1 - $t0));
+ $high = $per_sec if ( $run > 1 && $per_sec > $high );
+ $low = $per_sec if ( $run > 1 && $per_sec < $low );
+ $total_s += ($t1 - $t0) if ( $run > 1 );
+ $total_m += $i if ( $run > 1 );
+ $avg = ($total_m / $total_s) if ( $run > 1 );
+ $run++;
+ print "MSG/s: $per_sec\t\tAvg: $avg\n";
+
+ $t0 = gettimeofday();
+ $i = 0;
+ }
+ }
+ }
+ };
+warn $@;
+ &stop($low, $high, $avg);
+}
+
+sub stop {
+ my ($low, $high, $avg) = @_;
+ print "Totals: [$low] [$high] [$avg]\n";
+ exit 0;
+}
+
+sub sigterm {
+ $g_run_forrest_run = 0;
+}
+
+sub REAPER {
+ my $child;
+ while ($child = waitpid(-1, WNOHANG)){
+ last if $child == -1;
+ print "Child $child exited\n";
+ delete $children{$child};
+ }
+ $SIG{CHLD} = \&REAPER;
+}
View
84 clients/kestrel/producer.pl
@@ -0,0 +1,84 @@
+#!/usr/bin/perl
+
+$| = 1;
+
+use lib './lib';
+use strict;
+
+use Data::Dumper;
+use Net::Kestrel;
+use POSIX;
+use Getopt::Long;
+use Time::HiRes qw(gettimeofday);
+use JSON;
+
+my %children;
+my $g_run_forrest_run = 1;
+$SIG{TERM} = \&sigterm;
+$SIG{CHLD} = \&REAPER;
+
+my $children = 1;
+my $limit = 0;
+my $host = undef;
+GetOptions(
+ "children|c=i" => \$children,
+ "limit|l=i" => \$limit,
+ "host|h=s" => \$host
+);
+
+my $json;
+#open IN,"<../messages.json";
+open IN,"<../sensor.json";
+while (<IN>) { $json .= $_ }
+close IN;
+my $messages = decode_json($json);
+
+for ( my $i = 0; $i < $children; ++$i ) {
+ &saturate();
+}
+pause() while(scalar(keys(%children)));
+
+sub saturate {
+ my $pid = fork();
+ if($pid) {
+ print "Child $pid started\n";
+ $children{$pid}++;
+ return $pid;
+ }
+
+ my $cnt = 0;
+
+ my $queue = 'testing-kestrel';
+
+ my $k = Net::Kestrel->new(host => $host);
+ $k->flush($queue);
+ my $t0 = gettimeofday();
+ while ($g_run_forrest_run ) {
+ $k->put($queue, $messages->[int(rand(4))]);
+ if ( $limit ) {
+ ++$cnt;
+ if ( $cnt == $limit ) {
+ my $t1 = gettimeofday();
+ print "reached my limit, sleeping (".($t1 - $t0).")\n";
+ sleep(1);
+ $cnt = 0;
+ $t0 = gettimeofday();
+ }
+ }
+ }
+ exit 0;
+}
+
+sub sigterm {
+ $g_run_forrest_run = 0;
+}
+
+sub REAPER {
+ my $child;
+ while ($child = waitpid(-1, WNOHANG)){
+ last if $child == -1;
+ print "Child $child exited\n";
+ delete $children{$child};
+ }
+ $SIG{CHLD} = \&REAPER;
+}
View
6 clients/messages.json
@@ -0,0 +1,6 @@
+[
+ "z015sLYW_utbx2Jw6dssFsR3EYL59JlAbN8CYe9GAX0yp8ZIEMDDuXkJpoBgxzmNlNMeqfQMorU2kfKkMPd7PgxRk6uiRAFSPWa6OyHI_i92SkVWy9c8zRldeMH77GAwtBpM5JBfu8ZnEmaqbXibgkKdoVrIdPCL7P6fdAdaHXoV7rgztkDGsNpXsQJmOtfDglEcwogWAuLrZFVjkVrYkc3r0dxtWfcr8W26dDDBgMtyZB6HN9lBgvdua8xvjsBuiLczwAoGtX5UaYMLaHJYAUVB7rbKyLP5OYWBUsHito_rDHjOxzt5kVBvj4d3M06lQknUGprbH2VXGe9Knb0Oaw0NSfEo_ZPaR5Vb92X55WlBqg7TJdHci5k7EO5s4gGVnITIEKnSO_bN1B9BfwqnwlrwdvxX2kpl4fxtvG4e1Vs3YAAFYEDhkhcmXeH6ckmezcRVKpEZROH_sBRmTDqf_1MmFpzKOoLhI4CbxifQsxXsv4K_dbhkCro8vatDZDOFkBfTNAg0eJke7CZpBdMmV8LVqQFoFolTrx7q83b8d6iivMCoRTPUdL41N0_yTAthcbmOaye9DGAO4EwrGN1SgWC0K4brZZCO9lk6f6SksHX1ylIs0Px4Wx6QumS0jBIs3_CecEamm_gzVfTue63lYmsZvCvo0hM4ggoETG1BxeLur5HJ5x_KHpi1EcaNW9oDL2H2CodlUtrOtNdqqE1aZj4w4tMLZdX4hQR5gvvgdBBTugiqDNjb3CEra5xOpaxtzzPlt0dVDdHhKVVwWHtdB5OwknXDs0AOugK5idak1_9ycQvnkziQQKXuAE1ufRm9hMm5b32FRIzjbfooxiJHMcPc7z6gLX0gTGGAgkR9SpZn8_02mj6glxzoSr4G5ruU0dylTrgL_Sjaaqf7aEumSrxDIFW7QxdbzunqKaEt612bT_NFVsCvq4BIQVQGWqsWUBIqYB67PmUIIiqC2OJXr0hjLLAVhDeY98elTlmJrW3yCIc125aC94b8uJtA20rprvpQUStHHT8dtnTj1qQJq7n_iVFeF8cVsSqcOaplzYzPya0YGYjmqF5yKbQPEDiREx8EliubcKcBXCj4rZ0wxHdDaHu4ZD_EFaYpgALGIrUgSjQx1RLm_9rpu9oWg7IIOz24rv3MyJrI0lfcZ5MORuPexcIOAARQq0cHGc_Qw7zINlWgYUGjma38EUnK3BL8lj7krUGmnacYzwl0N838Mas4L42u7vQbovw7jDN3etQaVk1B5rbwO_s0MBdzs6A7R8nNs9OOpj9gDbVi178J70HQu0f7PxVz11f7F2S088DVXLrJuX7GHQJcJEbNd6uMA4xezS5DBPH6teKooO8wtsw32lzrafDvr_VbfURK8AAKnnIBkl0WJec7ByhMqw400bKt_xYRkKNR9madj9zbWmRjR07SyHfUUiAuthwxG9onIm2_com8jJxP1VrL2QIbC9IewKK4YtNAaflCSJkET3faxBJEmDOZ4iGOUVFa1_plMCnU6zbm9v_ZuIXltwsodU0cAzCRCbMNB8N9mOIDdUQTMR1kh_trSkOPFVJoqtyUnkyBhWHLS8JOfQ3YAINEbWAo7m4kcjZw_tcPYmQDG9yGLgBpTrc1w82xazTG0_ih4YGlQrjvdr2o7RSFO75w4lJKTBx5HxwQNWxmGth8uOxaM0HhkKB4Fof25nmdtr_dkeROZrZ8aI3hL3Egz9VVwBWYtambFImmmEhxuUXPhZX12YRvtLXB0uYprrQ8rW7JVw4M5nD2x3hbbD3BZnWfqdnB",
+ "PKOH3jAcEC0NP1ducpOOIm57_9HtVoa1dXjXJ6E2qqoRxNVhFVoaomT7JgLGcTqrxvz6dYM2yx8cfTkMl6jiodYmIg0PyPxXW45p1XJRcGszgChCucWOqg2EgUgJxIq357ohakQwCH5ykDXkhtbQBhoJOkAL2GTIFY2lN49Ypd0z7B_MM_vOtGxyej776QRCoQWD9_VXIfuJU58j7zuYCff1ryTnXNTebb8ZuGbRowVaJ5tYEW5Ors5X6SmPQdw7J7UmmBE04HMptBp0pvupB7pNjMqU8LW4tadrRhsKigo9DZsIk9ypBDDj5RzrcGgyJ7y94RA3txZXVebDOkWbXO88ghn_BarSn64qFeVUV6jEw4zPbJEsA3khnt2JUdFFGMbYKZYlKmLqm4jGuXu8R8uw8LshU5d8AXxcAOl5PNYBNpPLZ8TDDuFKv1crDx9EcZadMZIOv0LcqQ6b9tqs_Tf4OD3GuB95PReUkiOgECW_o8h4lH1cXST73bsqjWjnZZ1666AO826oFOfq3ACKriQUtBzfATa1fCcW6ACLMSkqSqKdBez5AQHWrS8YrdvFU7LbgsGkXsaSbVsQytG7lBEUdq2M4jtGhBVM9MnjbPayFMQEJ0Gacv5Dk0HhEaBSiJHkMXaiPNOnE5Q_DIxBe0BCM01FNVV2oJEVMdkHwJ5YHre1FI_SvrxGPFhuC07MEqaJxaihMapVrRYPMy0SXOSrAxDV71JasQU6tG0urebS8g8JN3TPw4uNfGpxf_sDitNjFUxi2UDk3jUGcsR_VR276h3ludrB5idQhNWXgZl4vz4KIci7q0VA45YEWqByytOlwB80e_9kQ7CQ_RYmw5bhg7YYHA7HmId8rPYqQrxSYsRCeOjmtZgjZ_uQfcN53nJh1hJ0VYDwVVCPB7MzIvECtHzKRZInQ9NplApIA1xwTjVIH3ltdRC0sxBLQ4F2hChU4DuvfrbOl1O14P3ELkGPoNDw6ST9GiKSGGJWUnRADvHRo4IjZN9JhGLNrzBCsAtEphbf1fP6B8GiRkKcrmscqIQibbRd__sRMVoqK8WJGpvYpy7MyTEazuilpjbLxk5cGqLwOme8aAzZlqiKkFsha9qADvA8gQWEk6Z3wAvNQFrdDHz5aTDqTdAube_GCwbqCFXDBZUECGrUHs58nqYNfd29QwhhPy8rhUsIaWn3SBKhWhrmKfm8FHAHfanMsycqC6ovoQnNDhDUFt2bOaOMmNRZEF64u5nAX9S3uweEj6cON2MpimgGTEx0gFcGj9TUxAKvdCQJzH6qROHI95s_wXZv66XZnaf2sc7uOyWtvqXhYkUMH8FS3t0kn10F1SQMHvbGnGRw5NGtZQoyAFijFPZ0mQR_Eww9mO7tMQP7TDt0z2f1EHDEd440Tz1_hf548na0W6xl3nOJAWG7CL93pCOi7fk4uLsBBU5YxYv7AXUnhBrfQw_fy1PC8lpDnSZV_8n5pOHMOOn0TO6XMw1O9WUOPD3slKkGBUac2DKxezpvVAD0tDArX499wLDINOp8ciV2P_xxxRSOXW2NQUe6chyELhNDmebKI_lGCfyH0D6S884cX6e8WGDe4GSsqn9yHJPzRAliYOHn3l3B6mE_zRv7LxFT1BTXPGqqXNObUqTqsTVuRTo8P3VNOO04T5p_tkMqoDDTjUTKjFe7XCbXzBRpDjyc_0mfj6sv3TH_o0RUBfpN9Rmjcc5MUbyA3MXXfc7Ub8nDsSVaCkZmNXFgWv8zxIKf_B7FbNjJGbRO9KGhVC76DZXOxtplwZ77T62vnYYRWGy3nOSPJ7XN4tNf69DHR0YEnd2EsuxJZ0n96xsC93vuIZ6F86papAN7AeTsYlOHot3GPmRDETceAUu2Ko_o3LtkS7mZNUs7g4n3QRF1tdTtj1PAe_5YoTaYNNmp10_TZ7Aij3wvjDJuakoHBBHs2kClj33EvFv2_Hyv4oo2Otnv6Gk4zMhliRdSg4P26DwunZ6dcR2IrF7TtuEruLJSAvoPeF_7yBSXZbb8rGU3eggQYnuDSwAV5kKUEngs1iG5ECf1e_buHcyj44Z3B_qX9rv9pZJM8_fAhiiVGbnaNKGtJOqtU1eD_4SpAVNPU07PnxgoaON435ZfKoWx41mRNQvsFgLK985aNe5hPuSfyGfx8FtdCUnh34VvZz7btxJV98YiJufIoLAmdmq_GHogumcnH6tPcAL0rFxiOOC7NyoOZMl0u3wph8r3H9zDyBPbkV4lJmubyDv0Rywv5TFlznsYmFTGO8i9L5thfIx7x1twJjwLiSlkuBuDdSgAS5oW41IqCW5perNeMAw2U_6WxO0X9ltqGlequqCsy9XWmouo5jSQZ2ExJyjWR1nu4Es7nWjD69BkB_A2gRbFpMU1YlI_KzjirvBLqX54bFBCmKRV0uGNNoyabX4p1TWBYEWEQqCn4Ni9LTBBiogPfIwoRW2i_QhCxwmxwi1ygWXOqPxZWdHpMa5X3iS1N1x__8w_9RVPL4XCJaQ8r1oxv0IG3lEVN3C7k_0vfRx9kqBmLzjyjPz5ogOcvqdnLBGIHcVHbxJshuXPBuxGRGXVZaTw1btxW6ChBvmCcmXBqt92BJpQa8Ajs5Tcjwt78a4HaDm0eCs3d2N0P3noJevQN7MXS4OfsHJg8JIqXM7dpv4zkFjubbszhhjIdsJqXCbp3FiMsSzJBpAJXJXQNfhT58ecx1JlPoPKAVbwbl86wt70ECTbUmAm85oUZjNXOhnY38mpB6Ps4O2lsppNd9Xw7_qBl_Wj_ZF_Lrfe00iG9HG85ZEv9vpvRyrlrhDQztON3IEL3vRAiDszUpy4HDDzBn7PJFv6P0IhThFKzThtYMGkPXnOKC8HRoQnQzP9taZa5G6jZsXOJKjsljTIJohQ_XkhbXet4x9RkSyfYkx9_SsBFu0IzvxmdMeGwQrmuWj5ByhCSr8gtJHvVWcthV97_6dRC11SWq7GvVHugQXF2a6OllxvEQdH8x36OF8iS65m1kpPfkoZvzg6MBbsCDukNm_SKfvgAogI1IduWuOYPGUfXwC2Ju1YwzbJ1rjcXeu4qJsAFPnr3xp9aEtZkqPZib6iTh8tAMyI2AmBowZCK_9sMSUGSXA68w5eXlW1fSb2MPIdTb3wmcz0W_KQM90SxDcaYaewKfaRZABIWekQgp69i12KzcmtqFQtuF85RoPMYSs3rfNEAYUD13kFzAQZQuIiKRxt0ZULsp4JJ2mOKKhAroBSPEM0h9QfnpmTRgWAvn04kBK_Xu_YobKDvDVJXz4hjOPiCRDJiK1oiObcaKY54MCYHtyprgpru3Jeht9UfuOJY4jVdKMCi6OnAFyjo_cER1UdE85d7yLd3wJRgdTLNNXdZ9rSAiPl5AwzadwUov5arsYjo1mNuvJao_ax1VhiC7J1FaQBOXON963ULwjFWPjjd88KohybtT2vRShnQMny1zkfCg_mVvFiGBLBIRthdPZTEEE8A24fCs8XZXiWReb6Q0nE2gI2TcFNv_zQDKIFQksGkQ6oGLjPeHGnyD4H3EBmeIVa4cB0HXIEFz8pnChgIGqQNESPC2ch5BKTLlDPUnn_NfLH42ZpUHLS35ST6OQw_IuHExoOvUArY86nDx3xNVIG1W0Gl6ppffEOeuN0lJUKGlXO1RLz06tPE4PWVmqtURF1eyrWMuTzQoyD2kwQ8_HyfyIzQHPDQAlrk3EaSJIb5E87JGW4PtAHEqPkUK04LqtL69_J5RQmxz7JwlrT1t7PJ8L2HhOpmJYg6LrZrlBgE0qdy4KxCLST5mUZKlLQU2b_2PUm7nJIt5cz7by1ATgeVNKiBbYeOUSP_KpBpC4t2nII78juwt_78UyjawwjVa7pK_UNkqYWdwK8DuacoUQqzTaGENzYM83fKWgRzn4SA52ZHqTs0uD9CHhP0HMnv7AAxtg1P5CESYfrNG4lsfGNPXC2VIutM8KcX4TyMeagi48mWRgnqWnIQ9DLkRf6mQdrdQrCmf09BoZ4ldvRrV2rAKgHqrbsNznWjCmeBOKzWxrEDZxiUe8Iyx837awwtzwOciItO7HTpuzoGVBq9rTEkNEvb85u7vGFvfbTlaysxSbCXvYjjFlDkLLvM3X9t9yXZf5PHDEehAEBAyS5brKkQZz8AC5ur44_tlFefEBf5II2M4iIMRuh3Wvil7Y_qV0mIgon8qDxpXXzaP9t1loSxurF5Nq9_niEARKX0I85miaMlabq8OvFJAiGDQqgJRrNaYG0OVVhMeufhrxs6sacOXHtY4NWXOrUhoUrrgTfi9md9zPDNXIcxQ3TI6iHhPUmShIjsBbK1ga_RvRRkdm8GSAeygt6DmSBNzsAz8OUcSUX3F2OVBLc9Z3lJ9Gh2_ybvTULoLLEo1p9M_buVIOPOj7hxE70apOV_zXYLFGTDmzBIhlLEAWsajzdRNW7BvL1yYmTpaXgaOepUtHy9vRKxMicDapM3ETpw0TafUfUd3Y5cKy_L_Hh56MZwR96fTndc4hKSmLB_UwUfyH5IyxBpnFEo9faSd39HZNtQGzWREkTV4o7u8tG_TMtYLN050h8zrxM4STZt8uGzX3NbDWKPg2qoUZy7hK0RskFW9tQcUNUDusbDhFHP702XAKGuN5mWotGMQMMGZqP4iF7FHSFSjnNA00kfBhlFQQdKpofiOBa1ftwS01edNRZJ_nwSl_IB65MdV9r5NLps_quNMPSBYc4SqzA7vsQsK3CJGtLxWT89xHgnc1VUDynmv6bzSZ0Ctjo7nAMaegOdbbXYY4TquYu4bRMk_uzRa14olNeP1rXsPEeCrOg1gdSazXy42zr6pFLYnzpD6Ad4x8RNE6BJjOcV7jZlWbruRndfxXw2SWhWk8TvnabrQhYs5FWrzj0rj3SOEyCdfXsKrnZO9gSQJBIzUcai8fJ1JSu0MUlGud7mNipIlODtN4wANIA36XeaGUOXirjbUfX49PRsiMebO7k8roOwV2YtTXi73VnSq1V7sfdeiRyyTGj8SUpw_ScHctd8YhkM6e48YbbfHH5IAx6ymsGZyk4KCQJqX1IrnI4SnECgZM3hs69sBXTUyBKxQw8S88V4p4BPOkNlPxRyneRMP23bNCGjKKpgfnru6EuYULEhfdSO5yXpismaXCYIp4O4Hgu020DEEUz8CMJXCIw5CgXdCmMncjEBufAVsiDAoynmOv3ea_imv9rSClWndyaQUFHMVssIv8PfeRnoATIGpLL5aKx4UmNw7YewU5xujhfO3ZRAdI5oreL6BpIL2_bHpAFglfYVAVOT7jZ6IvzHkz8gWurXOQSYycW2Z7fVq5MZeX1GXUqRCcZGYbygkqEPByvXC75zB9v7baCCVZP9NxZAd4FUf5tMQBhEgnzGjZqnUC6xTz05PFWGpzrWJ6cHgu5X6pD7sjHEtDOPki8pxSVeN3E3xfQPbyVIq9RAb9SXsofTdpmojA713ZJ5Gay0me1fnck88kgSO0iZ9jEPTVP8VLZ4P5fA8cyBEyd2OekM9t7WHOJlbKsuxDf8unZEd80pTq4yKAIZQMagNbSxpiDssbtJ6QINBNrUeuXWNejM_z0s6VTe8peVl4sTuA9dOWhewsqj4l1sn96orbJFCbwxo9AHByLtue1WtMaS9jtY2Ym5A5rPghjCmVVQuQZum8pUHSwY0ezPUzXvzCQD6QaUsz0tqvAanRJkIsIqZ4QCkRsMEofmrHC8GX7Txk4mibifBoYNCbCAgNBoT6bOKvqy9krMjVrottd3OwhRyYc0wKiK4B7Oy3R7JhmaOO1ValqoCba0asGsOT1Tx2Uoqp5CQ0S0HWjoSILuGIVYu8O86oOndeIljtsAuW5iT9iNdvssaoVhyVsOzLtwwnkuG828lZM2_TG61AN1cHyfj_gU1MkABUU_8RQmcEeGccgNNrUDEO6yTMMmQKftjznxO6z5u3QgTJJmqaTykr3kdtKpnkflEX5jhWRL5O9eKXhJi9ayEYJZpeZfXzmIOR6Vcu2R_OJRDJIwFPVzrOct5GlAdzZuLlRboOvyqNphYMn",
+ "xjzUQtijeFKn8fsCstGtNRgAFoMgiIxzYBnmeZwhBFjRWOQ5LAKaO5AV7Xs_G86RFRzlSWMh0Nb_GrCZMNvBefqmsXmN_lkLhSHZUk_DnaOzKTkN3AUiETnyZjVwcBcoz43gQfwRJgykl3P8zTMTFiHNYUmH_iiumzyMKTFyA2GznqoD3OfsGtrHr1zriJoiK2lU8mV1HHM6icmaHr8JoLgb4EBnmHoZZiz1HuVxMQZ3Px5qVSLpx3EaznkyRGgUFTZbMnOMd0JlAIeTrWVivIQCi3KjO4ibtkEIEZBz9E44AvSoro7v519jmXetrJnqeaylpn1ObKFvHhtT9XXC3o1YKXAn",
+ "cKDAH6nfU8mTIvroNlOHfpZO4Zm9c17gIC2WorWC_j68PSJyb5Tc5TtVhgPj4Niw2ppmkYZdPGbPFa7MuiH0naJzeQ9UCsFe0rpAT5Ei6llaGtPjKrXcLVnatixuOD_CLYPMj_eDLN0Y19TidRGx5faZvhcL_MJOhqW8vrBXGi55W1F0h1Vvc5J1zzKW5jsvmzaMy0ThiDBEsUe2WQ3eQOO5urPKCuOTjeQFkcZZGm5cTE_5wUjq"
+]
View
107 clients/rabbitmq/consumer.pl
@@ -0,0 +1,107 @@
+#!/usr/bin/perl
+
+use strict;
+
+$| = 1;
+
+use Data::Dumper;
+use Net::RabbitMQ;
+use Getopt::Long;
+use Time::HiRes qw(gettimeofday);
+use POSIX;
+use JSON;
+
+my $children = 10;
+my $etype = 'topic';
+my $rkey = '#';
+my $host = undef;
+GetOptions(
+ "children|c=i" => \$children,
+ "exchange|e=s" => \$etype,
+ "routing|r=s" => \$rkey,
+ "host|h=s" => \$host
+);
+
+my $DEBUG = 1;
+
+my %children;
+my $g_run_forrest_run = 1;
+$SIG{TERM} = \&sigterm;
+$SIG{CHLD} = \&REAPER;
+$SIG{INT} = \&sigterm;
+
+for ( my $i = 0; $i < $children; ++$i ) {
+ &run($i);
+}
+pause() while(scalar(keys(%children)));
+
+exit 0;
+
+sub run {
+ my $id = shift;
+ my $pid = fork();
+ if($pid) {
+ print "Child $pid started\n";
+ $children{$pid}++;
+ return $pid;
+ }
+
+ my ($mq, $q);
+ my ($low, $high, $run, $total_s, $total_m, $avg) = (50000, 0, 0, 0, 0, 0);
+
+ print "CONNECTING...\n" if $DEBUG;
+ $mq = Net::RabbitMQ->new();
+ $mq->connect($host, { user => 'guest', password => 'guest' });
+ $mq->channel_open(1);
+ $mq->exchange_declare(1, 'saturate', { exchange_type => $etype, durable => 0, auto_delete => 1 });
+ $q = $mq->queue_declare(1, 'getting.saturated.'.$$, {exclusive => 1, auto_delete => 1});
+ $mq->queue_bind(1, $q, 'saturate', '#');
+ $mq->consume(1, $q);
+
+ print "RUNNING...\n" if $DEBUG;
+ my $i = 0;
+ my $t0 = gettimeofday();
+ my $t1 = gettimeofday();
+ my $per_sec = 0;
+ eval {
+ while ( my $p = $mq->recv() ) {
+ $i++;
+ $t1 = gettimeofday();
+ if ( $t1 - $t0 >= 1 ) {
+ $per_sec = ($i / ($t1 - $t0));
+ $high = $per_sec if ( $run > 1 && $per_sec > $high );
+ $low = $per_sec if ( $run > 1 && $per_sec < $low );
+ $total_s += ($t1 - $t0) if ( $run > 1 );
+ $total_m += $i if ( $run > 1 );
+ $avg = ($total_m / $total_s) if ( $run > 1 );
+ $run++;
+ print "MSG/s: $per_sec\t\tAvg: $avg\n";
+
+ $t0 = gettimeofday();
+ $i = 0;
+ }
+ last if ( ! $g_run_forrest_run );
+ }
+ };
+ &stop($low, $high, $avg);
+}
+
+sub stop {
+ my ($low, $high, $avg) = @_;
+ print "Totals: [$low] [$high] [$avg]\n";
+ exit 0;
+}
+
+sub sigterm {
+ $g_run_forrest_run = 0;
+}
+
+sub REAPER {
+ my $child;
+ while ($child = waitpid(-1, WNOHANG)){
+ last if $child == -1;
+ print "Child $child exited\n";
+ delete $children{$child};
+ }
+ $SIG{CHLD} = \&REAPER;
+}
View
82 clients/rabbitmq/producer.pl
@@ -0,0 +1,82 @@
+#!/opt/OMNIperl/bin/perl
+
+$| = 1;
+
+use strict;
+
+use Data::Dumper;
+use Net::RabbitMQ;
+use POSIX;
+use Getopt::Long;
+use Time::HiRes qw(gettimeofday);
+use JSON;
+
+my %children;
+my $g_run_forrest_run = 1;
+$SIG{TERM} = \&sigterm;
+$SIG{CHLD} = \&REAPER;
+
+my $json;
+#open MSGS, "<../messages.json";
+open MSGS, "<../sensor.json";
+while (<MSGS>) { $json .= $_ }
+close MSGS;
+$json = decode_json($json);
+
+my $children = 10;
+my $limit = 0;
+my $host = undef;
+GetOptions(
+ "children|c=i" => \$children,
+ "limit|l=i" => \$limit,
+ "host|h=s" => \$host
+);
+
+for ( my $i = 0; $i < $children; ++$i ) {
+ &saturate();
+}
+pause() while(scalar(keys(%children)));
+
+sub saturate {
+ my $pid = fork();
+ if($pid) {
+ print "Child $pid started\n";
+ $children{$pid}++;
+ return $pid;
+ }
+
+ my $mq = Net::RabbitMQ->new();
+ $mq->connect($host, { user => 'guest', password => 'guest' });
+ $mq->channel_open(1);
+
+ my $cnt = 0;
+ my $t0 = gettimeofday();
+ while ($g_run_forrest_run ) {
+ $mq->publish(1, "saturate.".(int(rand(3))), $json->[int(rand(4))], { exchange => "saturate" });
+ if ( $limit ) {
+ ++$cnt;
+ if ( $cnt == $limit ) {
+ my $t1 = gettimeofday();
+ print "reached my limit, sleeping (".($t1 - $t0).")\n";
+ sleep(1);
+ $cnt = 0;
+ $t0 = gettimeofday();
+ }
+ }
+ }
+ exit 0;
+}
+
+sub sigterm {
+ $g_run_forrest_run = 0;
+}
+
+sub REAPER {
+ my $child;
+ while ($child = waitpid(-1, WNOHANG)){
+ last if $child == -1;
+ print "Child $child exited\n";
+ delete $children{$child};
+ }
+ $SIG{CHLD} = \&REAPER;
+}
View
37 clients/redis/consumer.js
@@ -0,0 +1,37 @@
+#!/usr/bin/node
+
+var redis = require('redis'),
+ client = redis.createClient("6379", "");
+
+var low = 50000,
+ high = 0,
+ run = 0,
+ total_m = 0,
+ total_s = 0,
+ avg = 0,
+ msgs = 0,
+ per_sec = 0;
+
+setInterval(function() {
+ if ( msgs > 0 ) {
+ if ( run > 1 && msgs > high ) high = msgs;
+ if ( run > 1 && msgs < low ) low = msgs;
+ total_m += msgs;
+ total_s++;
+ avg = total_m / total_s;
+ run++;
+
+ console.log("Totals ["+low+"] ["+high+"] ["+avg+"]");
+ msgs = 0;
+ }
+}, 1000);
+
+client.on("error", function (err) {
+ console.log("Error " + err);
+});
+client.on("message", function (channel, message) {
+ msgs++;
+});
+client.on("ready", function () {
+ client.subscribe("1");
+});
View
81 clients/redis/producer.pl
@@ -0,0 +1,81 @@
+#!/usr/bin/perl
+
+$| = 1;
+
+use strict;
+
+use Data::Dumper;
+use Redis::hiredis;
+use POSIX;
+use Getopt::Long;
+use Time::HiRes qw(gettimeofday);
+use JSON;
+
+my %children;
+my $g_run_forrest_run = 1;
+$SIG{TERM} = \&sigterm;
+$SIG{CHLD} = \&REAPER;
+
+my $json;
+#open MSGS, "<../messages.json";
+open MSGS, "<../sensor.json";
+while (<MSGS>) { $json .= $_ }
+close MSGS;
+$json = decode_json($json);
+
+my $children = 10;
+my $limit = 0;
+my $host = undef;
+GetOptions(
+ "children|c=i" => \$children,
+ "limit|l=i" => \$limit,
+ "host|s=s" => \$host
+);
+
+for ( my $i = 0; $i < $children; ++$i ) {
+ &saturate();
+}
+pause() while(scalar(keys(%children)));
+
+sub saturate {
+ my $pid = fork();
+ if($pid) {
+ print "Child $pid started\n";
+ $children{$pid}++;
+ return $pid;
+ }
+
+ my $redis = Redis::hiredis->new();
+ $redis->connect($host);
+
+ my $cnt = 0;
+ my $t0 = gettimeofday();
+ while ($g_run_forrest_run ) {
+ my $res = $redis->publish(1, $json->[int(rand(4))]);
+ if ( $limit ) {
+ ++$cnt;
+ if ( $cnt == $limit ) {
+ my $t1 = gettimeofday();
+ print "reached my limit, sleeping (".($t1 - $t0).")\n";
+ sleep(1);
+ $cnt = 0;
+ $t0 = gettimeofday();
+ }
+ }
+ }
+ exit 0;
+}
+
+sub sigterm {
+ $g_run_forrest_run = 0;
+}
+
+sub REAPER {
+ my $child;
+ while ($child = waitpid(-1, WNOHANG)){
+ last if $child == -1;
+ print "Child $child exited\n";
+ delete $children{$child};
+ }
+ $SIG{CHLD} = \&REAPER;
+}
View
6 clients/sensor.json
@@ -0,0 +1,6 @@
+[
+ "1321037501.774294 Sensor1 9472",
+ "1321037502.057538 Sensor2 32",
+ "1321037502.059937 Sensor3 78123123",
+ "1321037502.062430 Sensor4 2147483647"
+]
View
BIN state_of_the_mq.pdf
Binary file not shown.

0 comments on commit 2f392ee

Please sign in to comment.