Skip to content

Commit

Permalink
Merge 5d5b65f into 4daffbf
Browse files Browse the repository at this point in the history
  • Loading branch information
jcjones committed Feb 19, 2015
2 parents 4daffbf + 5d5b65f commit 6053426
Show file tree
Hide file tree
Showing 9 changed files with 745 additions and 7 deletions.
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,3 +22,7 @@ _testmain.go
*.exe
*.test
*.prof
*.coverprofile

boulder-start/boulder-start
activity-monitor/activity-monitor
11 changes: 5 additions & 6 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ language: go

go:
- 1.4.1
- tip

matrix:
fast_finish: true
Expand All @@ -12,12 +11,12 @@ before_install:
- go get golang.org/x/tools/cmd/cover
- go get github.com/golang/lint/golint
- go get github.com/mattn/goveralls
- go get github.com/modocache/gover

script:
- go vet -x ./...
- $HOME/gopath/bin/golint ./...
- go test -v ./...
- go test -covermode=count -coverprofile=profile.cov .

after_script:
- $HOME/gopath/bin/goveralls -coverprofile=profile.cov -service=travis-ci
- go test -covermode=count -coverprofile=main.coverprofile .
- go test -covermode=count -coverprofile=analysisengine.coverprofile ./analysisengine
- $HOME/gopath/bin/gover
- $HOME/gopath/bin/goveralls -coverprofile=gover.coverprofile -service=travis-ci
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ This is an initial implementation of an ACME-based CA. The [ACME protocol](http


[![Build Status](https://travis-ci.org/letsencrypt/boulder.svg)](https://travis-ci.org/letsencrypt/boulder)
[![Coverage Status](https://coveralls.io/repos/letsencrypt/boulder/badge.svg)](https://coveralls.io/r/letsencrypt/boulder)
[![Docker Repository on Quay.io](https://quay.io/repository/letsencrypt/boulder/status "Docker Repository on Quay.io")](https://quay.io/repository/letsencrypt/boulder)

Docker
Expand Down
178 changes: 178 additions & 0 deletions activity-monitor/activity-monitor.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
// Copyright 2014 ISRG. All rights reserved
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.

package main

import (
"github.com/codegangsta/cli"
"github.com/letsencrypt/boulder"
"github.com/letsencrypt/boulder/analysisengine"
"github.com/streadway/amqp"
"log"
"net/url"
"os"
)

const (
QueueName = "Monitor"
AmqpExchange = "boulder"
AmqpExchangeType = "topic"
AmqpInternal = false
AmqpDurable = false
AmqpDeleteUnused = false
AmqpExclusive = false
AmqpNoWait = false
AmqpNoLocal = false
AmqpAutoAck = false
AmqpMandatory = false
AmqpImmediate = false
)


func startMonitor(AmqpUrl string, logger *boulder.JsonLogger) {

ae := analysisengine.NewLoggingAnalysisEngine(logger)

// For convenience at the broker, identifiy ourselves by hostname
consumerTag, err := os.Hostname()
if err != nil {
log.Fatalf("Could not determine hostname")
}

conn, err := amqp.Dial(AmqpUrl)
if err != nil {
log.Fatalf("Could not connect to AMQP server: %s", err)
return
}

rpcCh, err := conn.Channel()
if err != nil {
log.Fatalf("Could not start channel: %s", err)
return
}

err = rpcCh.ExchangeDeclare(
AmqpExchange,
AmqpExchangeType,
AmqpDurable,
AmqpDeleteUnused,
AmqpInternal,
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not declare exchange: %s", err)
return
}

_, err = rpcCh.QueueDeclare(
QueueName,
AmqpDurable,
AmqpDeleteUnused,
AmqpExclusive,
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not declare queue: %s", err)
return
}

err = rpcCh.QueueBind(
QueueName,
"#", //wildcard
AmqpExchange,
false,
nil)
if err != nil {
log.Fatalf("Could not bind queue: %s", err)
return
}

deliveries, err := rpcCh.Consume(
QueueName,
consumerTag,
AmqpAutoAck,
AmqpExclusive,
AmqpNoLocal,
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not subscribe to queue: %s", err)
return
}

// Run forever.
for d := range deliveries {
// Pass each message to the Analysis Engine
ae.ProcessMessage(d)
// Only ack the delivery we actually handled (ackMultiple=false)
const ackMultiple = false
d.Ack(ackMultiple)
}
}

func main() {
app := cli.NewApp()
app.Name = "activity-monitor"
app.Usage = "Monitor Boulder's communications."
app.Version = "0.0.0"

// Specify AMQP Server
app.Flags = []cli.Flag{
cli.StringFlag{
Name: "amqp",
Value: "amqp://guest:guest@localhost:5672",
Usage: "AMQP Broker String",
},
cli.StringFlag{
Name: "jsonlog",
Usage: "JSON logging server and port (e.g., tcp://localhost:515)",
},
cli.BoolFlag{
Name: "stdout",
Usage: "Enable debug logging to stdout",
},
cli.IntFlag{
Name: "level",
Value: 4,
Usage: "Minimum Level to log (0-7), 7=Debug",
},
}

app.Action = func(c *cli.Context) {
logger := boulder.NewJsonLogger("am")

// Parse SysLog URL if one was provided
if c.GlobalString("jsonlog") == "" {
log.Println("No external logging server; defaulting to stdout.")
logger.EnableStdOut(true)
} else {
syslogU, err := url.Parse(c.GlobalString("jsonlog"))
if err != nil {
log.Fatalf("Could not parse Syslog URL: %s", err)
return
}

logger.SetEndpoint(syslogU.Scheme, syslogU.Host)
err = logger.Connect()
if err != nil {
log.Fatalf("Could not open remote syslog: %s", err)
return
}

logger.EnableStdOut(c.GlobalBool("stdout"))

}

logger.SetLevel(c.GlobalInt("level"))

startMonitor( c.GlobalString("amqp"), logger )
}

err := app.Run(os.Args)
if err != nil {
log.Fatalf("Could not start: %s", err)
return
}
}
34 changes: 33 additions & 1 deletion amqp-rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@ import (
// XXX: I *think* these constants are appropriate.
// We will probably want to tweak these in the future.
const (
AmqpExchange = ""
AmqpExchange = "boulder"
AmqpExchangeType = "topic"
AmqpInternal = false
AmqpDurable = false
AmqpDeleteUnused = false
AmqpExclusive = false
Expand All @@ -48,6 +50,19 @@ func amqpConnect(url string) (ch *amqp.Channel, err error) {

// A simplified way to declare and subscribe to an AMQP queue
func amqpSubscribe(ch *amqp.Channel, name string) (msgs <-chan amqp.Delivery, err error) {
err = ch.ExchangeDeclare(
AmqpExchange,
AmqpExchangeType,
AmqpDurable,
AmqpDeleteUnused,
AmqpInternal,
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not declare exchange: %s", err)
return
}

q, err := ch.QueueDeclare(
name,
AmqpDurable,
Expand All @@ -56,6 +71,18 @@ func amqpSubscribe(ch *amqp.Channel, name string) (msgs <-chan amqp.Delivery, er
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not declare queue: %s", err)
return
}

err = ch.QueueBind(
name,
name,
AmqpExchange,
false,
nil)
if err != nil {
log.Fatalf("Could not bind queue: %s", err)
return
}

Expand All @@ -67,6 +94,11 @@ func amqpSubscribe(ch *amqp.Channel, name string) (msgs <-chan amqp.Delivery, er
AmqpNoLocal,
AmqpNoWait,
nil)
if err != nil {
log.Fatalf("Could not subscribe to queue: %s", err)
return
}

return
}

Expand Down
33 changes: 33 additions & 0 deletions analysisengine/analysis-engine.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2014 ISRG. All rights reserved
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package analysisengine

import (
"github.com/letsencrypt/boulder"
"github.com/streadway/amqp"
)

// This file analyzes messages obtained from the Message Broker to determine
// whether the system as a whole is functioning correctly.

// Interface all Analysis Engines share
type AnalysisEngine interface {
ProcessMessage(amqp.Delivery)
}

// An Analysis Engine that just logs to the JSON Logger.
type LoggingAnalysisEngine struct {
jsonLogger *boulder.JsonLogger
}

func (eng *LoggingAnalysisEngine) ProcessMessage(delivery amqp.Delivery) {
// Send the entire message contents to the syslog server for debugging.
eng.jsonLogger.Debug("Message contents", delivery)
}

// Construct a new Analysis Engine.
func NewLoggingAnalysisEngine(logger *boulder.JsonLogger) AnalysisEngine {
return &LoggingAnalysisEngine{jsonLogger: logger}
}
23 changes: 23 additions & 0 deletions analysisengine/analysis-engine_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Copyright 2014 ISRG. All rights reserved
// This Source Code Form is subject to the terms of the Mozilla Public
// License, v. 2.0. If a copy of the MPL was not distributed with this
// file, You can obtain one at http://mozilla.org/MPL/2.0/.
package analysisengine

import (
"github.com/letsencrypt/boulder"
"github.com/streadway/amqp"
"testing"
)


func TestNewLoggingAnalysisEngine(t *testing.T) {
log := boulder.NewJsonLogger("newEngine")
ae := NewLoggingAnalysisEngine(log)

// Trivially check an empty mock message
d := &amqp.Delivery{}
ae.ProcessMessage(*d)

// Nothing to assert
}

0 comments on commit 6053426

Please sign in to comment.