Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

- Some major api changes; improvements to subscriptions.

- Added 'syslogish' output
  • Loading branch information...
commit 789b3a9b5328505b0fe78ca6d3cf572e42d2f8e8 1 parent c946b93
@jordansissel authored
View
97 src/cabin/cabin.go
@@ -2,6 +2,8 @@ package cabin
import (
"time"
+ "fmt"
+ "os"
)
/* Make a new struct, but for now it has no members.
@@ -9,9 +11,32 @@ import (
*/
type Cabin struct {
/* Channels receive Event structs */
- channels []chan *Event
+ channels map[Subscription] EventChannel
+ notify_closed chan Subscription
}
+/* A channel which emits a sequence of identifiers */
+type Subscription int
+var idGeneratorChannel = make(chan Subscription, 1)
+
+func init() {
+ /* Start the 'id generator' process */
+ go generateIds(idGeneratorChannel)
+}
+
+func generateIds(channel chan Subscription) {
+ var i Subscription = 0
+ for ;; i++ {
+ channel <- i
+ }
+} /* generateIds */
+
+func nextId() Subscription {
+ return <- idGeneratorChannel
+}
+
+type EventChannel chan *Event
+
/* A cabin event. Simply a timestamp + an object */
type Event struct {
Timestamp time.Time
@@ -19,17 +44,49 @@ type Event struct {
Object interface{}
}
-/* Create a new Cabin instance */
-func New() *Cabin {
- cabin := new(Cabin)
- cabin.channels = make([] chan *Event, 0)
- return cabin
+type ReceiverFunc func(EventChannel)
+
+type Receiver interface {
+ Run(EventChannel) error
}
-func (cabin *Cabin) Subscribe(channel chan *Event) {
- cabin.channels = append(cabin.channels, channel)
+func (cabin *Cabin) Initialize() {
+ if cabin.channels == nil {
+ cabin.channels = make(map[Subscription] EventChannel)
+ }
+ if cabin.notify_closed == nil {
+ cabin.notify_closed = make(chan Subscription)
+ }
}
+func (cabin *Cabin) SubscribeFunc(receiver ReceiverFunc) Subscription {
+ cabin.Initialize()
+ channel := make(EventChannel)
+ id := nextId()
+ go cabin.activateSubscription(receiver, id, channel)
+ return id
+} /* Cabin.SubscribeFunc */
+
+func (cabin *Cabin) Subscribe(receiver Receiver) Subscription {
+ return cabin.SubscribeFunc(func(channel EventChannel) {
+ err := receiver.Run(channel)
+ /* TODO(sissel): We should really have an internal cabin that points at
+ * stderr */
+ fmt.Fprintf(os.Stderr, "Subscriber(%#v) failed: %s\n", receiver, err)
+ })
+} /* Cabin.Subscribe */
+
+func (cabin *Cabin) activateSubscription(receiver ReceiverFunc, id Subscription,
+ channel EventChannel) {
+ cabin.channels[id] = channel
+ defer func() {
+ cabin.notify_closed <- id
+ delete(cabin.channels, id)
+ }()
+
+ receiver(channel)
+} /* Cabin.activateSubscription */
+
/* Log an object */
func (cabin *Cabin) Log(object interface{}) {
event := &Event{Timestamp: time.Now().UTC(), Object: object}
@@ -37,4 +94,28 @@ func (cabin *Cabin) Log(object interface{}) {
for _, channel := range cabin.channels {
channel <- event
}
+} /* Cabin.Log */
+
+/* Formatted logging */
+func (cabin *Cabin) Logf(format string, args...interface{}) {
+ message := fmt.Sprintf(format, args...)
+ event := &Event{Timestamp: time.Now().UTC(), Object: message}
+
+ for _, channel := range cabin.channels {
+ channel <- event
+ }
+}
+
+/* Close. This will block until all subscribers have completed */
+func (cabin *Cabin) Close() {
+ count := 0
+ for _, channel := range(cabin.channels) {
+ close(channel)
+ count++
+ }
+
+ /* Wait for all the channels to close */
+ for ; count > 0 ; count-- {
+ <- cabin.notify_closed
+ }
}
View
30 src/cabin/output_syslogish.go
@@ -0,0 +1,30 @@
+package cabin
+
+import (
+ "net"
+ "fmt"
+ "os"
+)
+
+type Syslog struct {
+ Host string
+ Port uint
+}
+
+func (syslog Syslog) Run(channel EventChannel) error {
+ conn, err := net.Dial("tcp", fmt.Sprintf("%s:%d", syslog.Host, syslog.Port))
+ if err != nil {
+ return err
+ }
+
+ hostname, _ := os.Hostname()
+
+ for event := range(channel) {
+ /* TODO(sissel): Check for errors */
+ fmt.Fprintf(conn, "%s %s: %v\n",
+ formatTimestamp(event.Timestamp),
+ hostname, event.Object)
+ }
+
+ return nil /* No error, channel closed */
+}
View
10 src/cabin/stdout.go
@@ -4,10 +4,14 @@ import (
"os"
)
-func StdoutLogger(channel chan *Event) {
- for {
- event := <- channel
+type Stdout struct {
+}
+
+func (stdout Stdout) Run(channel EventChannel) error {
+ for event := range(channel) {
emit(os.Stdout, event)
}
+
+ return nil
}
Please sign in to comment.
Something went wrong with that request. Please try again.