Skip to content
This repository
  • 3 commits
  • 13 files changed
  • 0 comments
  • 1 contributor
May 17, 2012
Pierre-Alexandre Meyer jmx: add support for Arecibo
Expose certain beans and metrics to Arecibo.

Signed-off-by: Pierre-Alexandre Meyer <pierre@ning.com>
6026491
Pierre-Alexandre Meyer scribe: start in the background
Make the start() method non-blocking.

Signed-off-by: Pierre-Alexandre Meyer <pierre@ning.com>
296f3be
Pierre-Alexandre Meyer hadoop: allow dashes in event names
Signed-off-by: Pierre-Alexandre Meyer <pierre@ning.com>
225478d
8  pom.xml
@@ -170,6 +170,14 @@
170 170
             <artifactId>nagios</artifactId>
171 171
         </dependency>
172 172
         <dependency>
  173
+            <groupId>com.ning.arecibo</groupId>
  174
+            <artifactId>arecibo-jmx</artifactId>
  175
+        </dependency>
  176
+        <dependency>
  177
+            <groupId>com.ning.arecibo</groupId>
  178
+            <artifactId>arecibo-metrics</artifactId>
  179
+        </dependency>
  180
+        <dependency>
173 181
             <groupId>com.sun.jersey</groupId>
174 182
             <artifactId>jersey-core</artifactId>
175 183
         </dependency>
10  src/main/java/com/ning/metrics/collector/StandaloneCollectorServer.java
@@ -16,6 +16,8 @@
16 16
 
17 17
 package com.ning.metrics.collector;
18 18
 
  19
+import com.ning.arecibo.jmx.AreciboProfile;
  20
+import com.ning.arecibo.metrics.AreciboMetricsReporter;
19 21
 import com.ning.metrics.collector.binder.config.CollectorConfig;
20 22
 import com.ning.metrics.collector.endpoint.servers.JettyServer;
21 23
 import com.ning.metrics.collector.endpoint.servers.ScribeServer;
@@ -39,19 +41,22 @@
39 41
     private final ScribeServer scribeServer;
40 42
     private final ServiceCheck serviceCheck;
41 43
     private final ServiceMonitor serviceMonitor;
  44
+    private final AreciboProfile areciboProfile;
42 45
 
43 46
     @Inject
44 47
     public StandaloneCollectorServer(final CollectorConfig config,
45 48
                                      final JettyServer jettyServer,
46 49
                                      final ScribeServer scribeServer,
47 50
                                      final ServiceCheck serviceCheck,
48  
-                                     final ServiceMonitor serviceMonitor)
  51
+                                     final ServiceMonitor serviceMonitor,
  52
+                                     final AreciboProfile areciboProfile)
49 53
     {
50 54
         this.config = config;
51 55
         this.jettyServer = jettyServer;
52 56
         this.scribeServer = scribeServer;
53 57
         this.serviceCheck = serviceCheck;
54 58
         this.serviceMonitor = serviceMonitor;
  59
+        this.areciboProfile = areciboProfile;
55 60
     }
56 61
 
57 62
     private void start()
@@ -71,6 +76,9 @@ private void start()
71 76
 
72 77
         // Talk to Nagios
73 78
         serviceMonitor.registerServiceCheck(config.getNagiosServiceName(), serviceCheck);
  79
+
  80
+        // Report metrics to Arecibo
  81
+        AreciboMetricsReporter.enable(areciboProfile);
74 82
     }
75 83
 
76 84
     private void stop()
4  src/main/java/com/ning/metrics/collector/binder/config/CollectorConfig.java
@@ -392,4 +392,8 @@
392 392
     @Config("collector.shiro.configPath")
393 393
     @DefaultNull
394 394
     String getShiroConfigPath();
  395
+
  396
+    @Config("collector.arecibo.profile")
  397
+    @Default("com.ning.arecibo.jmx:name=AreciboProfile")
  398
+    String getAreciboProfile();
395 399
 }
27  src/main/java/com/ning/metrics/collector/endpoint/servers/ScribeServer.java
@@ -16,9 +16,10 @@
16 16
 
17 17
 package com.ning.metrics.collector.endpoint.servers;
18 18
 
  19
+import com.ning.metrics.collector.binder.config.CollectorConfig;
  20
+
19 21
 import com.google.inject.Inject;
20 22
 import com.mogwee.executors.FailsafeScheduledExecutor;
21  
-import com.ning.metrics.collector.binder.config.CollectorConfig;
22 23
 import org.apache.thrift.TProcessor;
23 24
 import org.apache.thrift.protocol.TBinaryProtocol;
24 25
 import org.apache.thrift.server.TNonblockingServer;
@@ -58,12 +59,26 @@ public ScribeServer(final Iface eventRequestHandler, final CollectorConfig confi
58 59
      */
59 60
     public void start() throws TTransportException
60 61
     {
61  
-        final TNonblockingServerTransport socket = new TNonblockingServerSocket(config.getScribePort());
62  
-        final TProcessor processor = new Processor(eventRequestHandler);
  62
+        final Executor executor = new FailsafeScheduledExecutor(1, "ScribeServer");
  63
+        executor.execute(new Runnable()
  64
+        {
  65
+            @Override
  66
+            public void run()
  67
+            {
  68
+                try {
  69
+                    final TNonblockingServerTransport socket = new TNonblockingServerSocket(config.getScribePort());
  70
+                    final TProcessor processor = new Processor(eventRequestHandler);
63 71
 
64  
-        server = new TNonblockingServer(new TNonblockingServer.Args(socket).processor(processor).protocolFactory(new TBinaryProtocol.Factory()));
65  
-        log.info(String.format("Starting terminal Scribe server on port %d", config.getScribePort()));
66  
-        server.serve();
  72
+                    server = new TNonblockingServer(new TNonblockingServer.Args(socket).processor(processor).protocolFactory(new TBinaryProtocol.Factory()));
  73
+                    log.info(String.format("Starting terminal Scribe server on port %d", config.getScribePort()));
  74
+                    server.serve();
  75
+                }
  76
+                catch (TTransportException e) {
  77
+                    log.warn("Unable to start the Scribe server", e);
  78
+                    Thread.currentThread().interrupt();
  79
+                }
  80
+            }
  81
+        });
67 82
     }
68 83
 
69 84
     /**
11  src/main/java/com/ning/metrics/collector/guice/ServerModule.java
@@ -16,6 +16,8 @@
16 16
 
17 17
 package com.ning.metrics.collector.guice;
18 18
 
  19
+import com.ning.arecibo.jmx.AreciboMonitoringModule;
  20
+import com.ning.arecibo.metrics.guice.AreciboMetricsModule;
19 21
 import com.ning.metrics.collector.binder.config.CollectorConfig;
20 22
 import com.ning.metrics.collector.binder.config.CollectorConfigurationObjectFactory;
21 23
 import com.ning.metrics.collector.endpoint.resources.ScribeModule;
@@ -59,6 +61,7 @@ protected void configureServlets()
59 61
         installStats();
60 62
         installHealthChecks();
61 63
         installJMX();
  64
+        installArecibo(config);
62 65
         installNagios(config);
63 66
         installF5();
64 67
         installJaxrsSupport(config);
@@ -101,6 +104,13 @@ protected void installJMX()
101 104
         install(new MBeanModule());
102 105
     }
103 106
 
  107
+    protected void installArecibo(final CollectorConfig config)
  108
+    {
  109
+        install(new AreciboMonitoringModule(config.getAreciboProfile()));
  110
+        // Expose metrics objects to Arecibo
  111
+        install(new AreciboMetricsModule());
  112
+    }
  113
+
104 114
     protected void installNagios(final CollectorConfig config)
105 115
     {
106 116
         bind(ServiceCheck.class).to(CollectorServiceCheck.class).asEagerSingleton();
@@ -111,7 +121,6 @@ protected void installNagios(final CollectorConfig config)
111 121
             final ServiceMonitor monitor = new FakeNagiosMonitor(config.getNagiosCheckRate());
112 122
             bind(ServiceMonitor.class).toInstance(monitor);
113 123
         }
114  
-
115 124
     }
116 125
 
117 126
     protected void installF5()
6  src/main/java/com/ning/metrics/collector/hadoop/processing/HadoopWriterFactory.java
@@ -18,6 +18,8 @@
18 18
 
19 19
 import com.google.inject.Inject;
20 20
 import com.mogwee.executors.FailsafeScheduledExecutor;
  21
+
  22
+import com.ning.arecibo.jmx.Monitored;
21 23
 import com.ning.metrics.collector.binder.config.CollectorConfig;
22 24
 import com.ning.metrics.serialization.hadoop.FileSystemAccess;
23 25
 import com.ning.metrics.serialization.writer.CallbackHandler;
@@ -200,7 +202,7 @@ private void incrementFlushCount(final HashMap<String, Integer> flushesPerEvent,
200 202
      *
201 203
      * @return cutoff time in milliseconds
202 204
      */
203  
-    @Managed(description = "Cutoff time for files to be sent to HDFS")
  205
+    @Monitored(description = "Cutoff time for files to be sent to HDFS")
204 206
     public long getCutoffTime()
205 207
     {
206 208
         return cutoffTime;
@@ -230,7 +232,7 @@ public void disableFlush()
230 232
         flushEnabled.set(false);
231 233
     }
232 234
 
233  
-    @Managed(description = "Number of local files not yet pushed to HDFS")
  235
+    @Monitored(description = "Number of local files not yet pushed to HDFS")
234 236
     public int nbLocalFiles()
235 237
     {
236 238
         return LocalSpoolManager.findFilesInSpoolDirectory(new File(config.getSpoolDirectoryName())).size();
2  src/main/java/com/ning/metrics/collector/hadoop/processing/LocalSpoolManager.java
@@ -46,7 +46,7 @@
46 46
 {
47 47
     private static final Logger log = LoggerFactory.getLogger(LocalSpoolManager.class);
48 48
 
49  
-    private static final Pattern filenamePattern = Pattern.compile("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d{1,5}-[a-zA-Z0-9]{4}-(\\d{4}-\\d{2}-\\d{2}T\\d{2}.\\d{2}.\\d{2}.\\d{3})\\.[a-zA-Z0-9]*\\.[a-zA-Z]*");
  49
+    private static final Pattern filenamePattern = Pattern.compile("\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}\\.\\d{1,3}-\\d{1,5}-[a-zA-Z0-9]{4}-(\\d{4}-\\d{2}-\\d{2}T\\d{2}.\\d{2}.\\d{2}.\\d{3})\\.[a-zA-Z0-9-]*\\.[a-zA-Z]*");
50 50
 
51 51
     // Can't use : in the pattern - Hadoop chokes on it when building the .crc Path
52 52
     protected static final DateTimeFormatter dateFormatter = DateTimeFormat.forPattern("yyyy-MM-dd'T'HH.mm.ss.SSS").withZone(DateTimeZone.UTC);
15  src/main/java/com/ning/metrics/collector/hadoop/processing/WriterStats.java
@@ -32,7 +32,8 @@
32 32
  * under the License.
33 33
  */
34 34
 
35  
-import org.weakref.jmx.Managed;
  35
+import com.ning.arecibo.jmx.Monitored;
  36
+import com.ning.arecibo.jmx.MonitoringType;
36 37
 
37 38
 import java.util.concurrent.atomic.AtomicLong;
38 39
 
@@ -80,37 +81,37 @@ public void registerHdfsFlush()
80 81
         hdfsFlushes.incrementAndGet();
81 82
     }
82 83
 
83  
-    @Managed(description = "Number of ignored events")
  84
+    @Monitored(description = "Number of ignored events", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
84 85
     public long getIgnoredEvents()
85 86
     {
86 87
         return ignoredEvents.get();
87 88
     }
88 89
 
89  
-    @Managed(description = "Number of locally enqueued events")
  90
+    @Monitored(description = "Number of locally enqueued events", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
90 91
     public long getEnqueuedEvents()
91 92
     {
92 93
         return enqueuedEvents.get();
93 94
     }
94 95
 
95  
-    @Managed(description = "Number of dropped events - queues being full")
  96
+    @Monitored(description = "Number of dropped events - queues being full", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
96 97
     public long getDroppedEvents()
97 98
     {
98 99
         return droppedEvents.get();
99 100
     }
100 101
 
101  
-    @Managed(description = "Number of successfully written events")
  102
+    @Monitored(description = "Number of successfully written events", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
102 103
     public long getWrittenEvents()
103 104
     {
104 105
         return writtenEvents.get();
105 106
     }
106 107
 
107  
-    @Managed(description = "Number of events that could not be written due to an I/O error")
  108
+    @Monitored(description = "Number of events that could not be written due to an I/O error", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
108 109
     public long getErroredEvents()
109 110
     {
110 111
         return erroredEvents.get();
111 112
     }
112 113
 
113  
-    @Managed(description = "Number of files written to Hadoop")
  114
+    @Monitored(description = "Number of files written to Hadoop", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
114 115
     public long getHdfsFlushes()
115 116
     {
116 117
         return hdfsFlushes.get();
5  src/main/java/com/ning/metrics/collector/processing/EventCollector.java
@@ -16,6 +16,8 @@
16 16
 
17 17
 package com.ning.metrics.collector.processing;
18 18
 
  19
+import com.ning.arecibo.jmx.Monitored;
  20
+import com.ning.arecibo.jmx.MonitoringType;
19 21
 import com.ning.metrics.collector.hadoop.processing.EventSpoolDispatcher;
20 22
 import com.ning.metrics.collector.realtime.EventListenerDispatcher;
21 23
 import com.ning.metrics.serialization.event.Event;
@@ -27,7 +29,6 @@
27 29
 import com.yammer.metrics.core.MetricName;
28 30
 import org.slf4j.Logger;
29 31
 import org.slf4j.LoggerFactory;
30  
-import org.weakref.jmx.Managed;
31 32
 
32 33
 import java.util.concurrent.TimeUnit;
33 34
 
@@ -102,7 +103,7 @@ public boolean collectEvent(final Event event)
102 103
         }
103 104
     }
104 105
 
105  
-    @Managed(description = "Number of events in memory (spool queue)")
  106
+    @Monitored(description = "Number of events in memory (spool queue)", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
106 107
     public int getQueueSizes()
107 108
     {
108 109
         int length = 0;
5  src/main/java/com/ning/metrics/collector/realtime/EventListenerDispatcher.java
@@ -17,10 +17,11 @@
17 17
 package com.ning.metrics.collector.realtime;
18 18
 
19 19
 import com.google.inject.Inject;
  20
+
  21
+import com.ning.arecibo.jmx.Monitored;
20 22
 import com.ning.metrics.serialization.event.Event;
21 23
 import org.slf4j.Logger;
22 24
 import org.slf4j.LoggerFactory;
23  
-import org.weakref.jmx.Managed;
24 25
 
25 26
 import java.util.Map;
26 27
 import java.util.concurrent.ConcurrentHashMap;
@@ -74,7 +75,7 @@ public synchronized void offer(final Event event)
74 75
         }
75 76
     }
76 77
 
77  
-    @Managed
  78
+    @Monitored
78 79
     public int getNbOfListeners()
79 80
     {
80 81
         return listeners.size();
18  src/main/java/com/ning/metrics/collector/realtime/EventQueueStats.java
@@ -16,7 +16,8 @@
16 16
 
17 17
 package com.ning.metrics.collector.realtime;
18 18
 
19  
-import org.weakref.jmx.Managed;
  19
+import com.ning.arecibo.jmx.Monitored;
  20
+import com.ning.arecibo.jmx.MonitoringType;
20 21
 
21 22
 import java.util.Collection;
22 23
 import java.util.concurrent.atomic.AtomicLong;
@@ -34,9 +35,9 @@
34 35
     private final Collection<Object> queue;
35 36
 
36 37
     private final long maxQueueLength;
37  
-    
  38
+
38 39
     public EventQueueStats(final Collection<Object> queue,
39  
-            final long maxQueueLength)
  40
+                           final long maxQueueLength)
40 41
     {
41 42
         this.queue = queue;
42 43
         this.maxQueueLength = maxQueueLength;
@@ -62,42 +63,31 @@ public void registerEventSendingErrored()
62 63
         erroredEvents.incrementAndGet();
63 64
     }
64 65
 
65  
-    @Managed(description = "Number of locally enqueued events")
66 66
     public long getEnqueuedEvents()
67 67
     {
68 68
         return enqueuedEvents.get();
69 69
     }
70 70
 
71  
-    @Managed(description = "Number of dropped events")
72 71
     public long getDroppedEvents()
73 72
     {
74 73
         return droppedEvents.get();
75 74
     }
76 75
 
77  
-    @Managed(description = "Number of successfully sent events")
78 76
     public long getSentEvents()
79 77
     {
80 78
         return sentEvents.get();
81 79
     }
82 80
 
83  
-    @Managed(description = "Number of events that could not be sent due to an error")
84 81
     public long getErroredEvents()
85 82
     {
86 83
         return erroredEvents.get();
87 84
     }
88 85
 
89  
-    @Managed(description = "Current length of the underlying queue")
90 86
     public long getQueueSize()
91 87
     {
92 88
         return queue.size();
93 89
     }
94 90
 
95  
-    @Managed(description = "Maximum length of the underlying queue")
96  
-    public long getMaxQueueSize()
97  
-    {
98  
-        return maxQueueLength;
99  
-    }
100  
-    
101 91
     /**
102 92
      * Unit test hook
103 93
      */
16  src/main/java/com/ning/metrics/collector/realtime/GlobalEventQueueStats.java
@@ -16,7 +16,8 @@
16 16
 
17 17
 package com.ning.metrics.collector.realtime;
18 18
 
19  
-import org.weakref.jmx.Managed;
  19
+import com.ning.arecibo.jmx.Monitored;
  20
+import com.ning.arecibo.jmx.MonitoringType;
20 21
 
21 22
 import java.util.Collection;
22 23
 import java.util.Map;
@@ -29,8 +30,7 @@
29 30
     private final Map<String, EventQueueStats> stats = new ConcurrentHashMap<String, EventQueueStats>();
30 31
     private final AtomicLong ignoredEvents = new AtomicLong(0);
31 32
 
32  
-    public EventQueueStats createLocalStats(final String eventType, final Collection<Object> queue,
33  
-            long maxQueueLength)
  33
+    public EventQueueStats createLocalStats(final String eventType, final Collection<Object> queue, final long maxQueueLength)
34 34
     {
35 35
         final EventQueueStats localStats = new EventQueueStats(queue, maxQueueLength);
36 36
         // We are guaranteed to have at most one stats object per event type (see EventQueueProcessorImpl)
@@ -46,13 +46,13 @@ public void registerEventIgnored()
46 46
         ignoredEvents.incrementAndGet();
47 47
     }
48 48
 
49  
-    @Managed(description = "Number of ignored events (type not enabled)")
  49
+    @Monitored(description = "Number of ignored events (type not enabled)", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
50 50
     public long getIgnoredEvents()
51 51
     {
52 52
         return ignoredEvents.get();
53 53
     }
54 54
 
55  
-    @Managed(description = "Number of enqueued events")
  55
+    @Monitored(description = "Number of enqueued events", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
56 56
     public long getEnqueuedEvents()
57 57
     {
58 58
         long enqueuedEvents = 0;
@@ -62,7 +62,7 @@ public long getEnqueuedEvents()
62 62
         return enqueuedEvents;
63 63
     }
64 64
 
65  
-    @Managed(description = "Number of dropped events")
  65
+    @Monitored(description = "Number of dropped events", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
66 66
     public long getDroppedEvents()
67 67
     {
68 68
         long droppedEvents = 0;
@@ -72,7 +72,7 @@ public long getDroppedEvents()
72 72
         return droppedEvents;
73 73
     }
74 74
 
75  
-    @Managed(description = "Number of successfully sent events")
  75
+    @Monitored(description = "Number of successfully sent events", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
76 76
     public long getSentEvents()
77 77
     {
78 78
         long sentEvents = 0;
@@ -82,7 +82,7 @@ public long getSentEvents()
82 82
         return sentEvents;
83 83
     }
84 84
 
85  
-    @Managed(description = "Number of events that could not be sent due to an error")
  85
+    @Monitored(description = "Number of events that could not be sent due to an error", monitoringType = {MonitoringType.VALUE, MonitoringType.RATE})
86 86
     public long getErroredEvents()
87 87
     {
88 88
         long erroredEvents = 0;
5  src/test/java/com/ning/metrics/collector/hadoop/processing/TestLocalSpoolManager.java
@@ -63,5 +63,10 @@ public void testReverseEngineerFilename() throws Exception
63 63
         Assert.assertEquals(spoolManager.getEventName(), "SpamDocumentClassified2");
64 64
         Assert.assertEquals(spoolManager.getSerializationType(), SerializationType.THRIFT);
65 65
         Assert.assertEquals(spoolManager.getTimeStamp(), LocalSpoolManager.dateFormatter.parseDateTime("2012-01-06T00.39.58.726"));
  66
+
  67
+        spoolManager = new LocalSpoolManager(config, new File("10.18.81.236-8080-I3wp-2012-03-23T01.08.32.469.aaa-bbb-ccc.thrift"));
  68
+        Assert.assertEquals(spoolManager.getEventName(), "aaa-bbb-ccc");
  69
+        Assert.assertEquals(spoolManager.getSerializationType(), SerializationType.THRIFT);
  70
+        Assert.assertEquals(spoolManager.getTimeStamp(), LocalSpoolManager.dateFormatter.parseDateTime("2012-03-23T01.08.32.469"));
66 71
     }
67 72
 }

No commit comments for this range

Something went wrong with that request. Please try again.