Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

udp json listener #477

Closed
wants to merge 1 commit into from
Closed
Changes from all commits
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.
+153 −1
Diff settings

Always

Just for now

@@ -35,6 +35,11 @@ read-timeout = "5s"
enabled = false
# port = 2003
# database = "" # store graphite data in this database

[input_plugins.udp]
enabled = true
port = 4444
database = "test"

# Raft configuration
[raft]
@@ -0,0 +1,104 @@
package udp

import (
"cluster"
log "code.google.com/p/log4go"
. "common"
"configuration"
"coordinator"
"net"
"encoding/json"
"protocol"
)

type Server struct {
listenAddress string
database string
coordinator coordinator.Coordinator
clusterConfig *cluster.ClusterConfiguration
conn *net.UDPConn
user *cluster.ClusterAdmin
shutdown chan bool
}

func NewServer(config *configuration.Configuration, coord coordinator.Coordinator, clusterConfig *cluster.ClusterConfiguration) *Server {
self := &Server{}

self.listenAddress = config.UdpInputPortString()
self.database = config.UdpInputDatabase
self.coordinator = coord
self.shutdown = make(chan bool, 1)
self.clusterConfig = clusterConfig

return self
}

func (self *Server) getAuth() {
// just use any (the first) of the list of admins.
names := self.clusterConfig.GetClusterAdmins()
self.user = self.clusterConfig.GetClusterAdmin(names[0])
}


func (self *Server) ListenAndServe() {
var err error

self.getAuth()

addr, err := net.ResolveUDPAddr("udp4", self.listenAddress)
if err != nil {
log.Error("UDPServer: ResolveUDPAddr: ", err)
return
}


if self.listenAddress != "" {
self.conn, err = net.ListenUDP("udp", addr)
if err != nil {
log.Error("UDPServer: Listen: ", err)
return
}
}
defer self.conn.Close()
self.HandleSocket(self.conn)
}

func (self *Server) HandleSocket(socket *net.UDPConn) {
buffer := make([]byte, 2048)

for {
n, _, err := socket.ReadFromUDP(buffer)
if err != nil || n == 0 {
log.Error("UDP ReadFromUDP error: %s", err)
continue
}

serializedSeries := []*SerializedSeries{}
err = json.Unmarshal(buffer[0:n], &serializedSeries)
if err != nil {
log.Error("UDP json error: %s", err)
continue
}

for _, s := range serializedSeries {
if len(s.Points) == 0 {
continue
}

series, err := ConvertToDataStoreSeries(s, SecondPrecision)
if err != nil {
log.Error("UDP cannot convert received data: %s", err)
continue
}

serie := []*protocol.Series{series}
err = self.coordinator.WriteSeriesData(self.user, "test", serie)
if err != nil {
log.Error("UDP cannot write data: %s", err)
continue
}
}

}

}
@@ -72,6 +72,11 @@ type GraphiteConfig struct {
Port int
Database string
}
type UdpInputConfig struct {
Enabled bool
Port int
Database string
}

type RaftConfig struct {
Port int
@@ -171,6 +176,7 @@ type WalConfig struct {

type InputPlugins struct {
Graphite GraphiteConfig `toml:"graphite"`
UdpInput UdpInputConfig `toml:"udp"`
}

type TomlConfiguration struct {
@@ -195,9 +201,15 @@ type Configuration struct {
ApiHttpCertPath string
ApiHttpPort int
ApiReadTimeout time.Duration

GraphiteEnabled bool
GraphitePort int
GraphiteDatabase string

UdpInputEnabled bool
UdpInputPort int
UdpInputDatabase string

RaftServerPort int
RaftTimeout duration
SeedServers []string
@@ -300,9 +312,15 @@ func parseTomlConfiguration(filename string) (*Configuration, error) {
ApiHttpCertPath: tomlConfiguration.HttpApi.SslCertPath,
ApiHttpSslPort: tomlConfiguration.HttpApi.SslPort,
ApiReadTimeout: apiReadTimeout,

GraphiteEnabled: tomlConfiguration.InputPlugins.Graphite.Enabled,
GraphitePort: tomlConfiguration.InputPlugins.Graphite.Port,
GraphiteDatabase: tomlConfiguration.InputPlugins.Graphite.Database,

UdpInputEnabled: tomlConfiguration.InputPlugins.UdpInput.Enabled,
UdpInputPort: tomlConfiguration.InputPlugins.UdpInput.Port,
UdpInputDatabase: tomlConfiguration.InputPlugins.UdpInput.Database,

RaftServerPort: tomlConfiguration.Raft.Port,
RaftTimeout: tomlConfiguration.Raft.Timeout,
RaftDir: tomlConfiguration.Raft.Dir,
@@ -410,6 +428,18 @@ func (self *Configuration) GraphitePortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.GraphitePort)
}

func (self *Configuration) UdpInputPortString() string {
if self.UdpInputPort <= 0 {
return ""
}

return fmt.Sprintf("%s:%d", self.BindAddress, self.UdpInputPort)
}

func (self *Configuration) ProtobufPortString() string {
return fmt.Sprintf("%s:%d", self.BindAddress, self.ProtobufPort)
}

func (self *Configuration) HostnameOrDetect() string {
if self.Hostname != "" {
return self.Hostname
@@ -3,6 +3,7 @@ package server
import (
"admin"
"api/graphite"
"api/udp"
"api/http"
"cluster"
"configuration"
@@ -20,6 +21,7 @@ type Server struct {
ClusterConfig *cluster.ClusterConfiguration
HttpApi *http.HttpServer
GraphiteApi *graphite.Server
UdpApi *udp.Server
AdminServer *admin.HttpServer
Coordinator coordinator.Coordinator
Config *configuration.Configuration
@@ -58,6 +60,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
httpApi := http.NewHttpServer(config.ApiHttpPortString(), config.ApiReadTimeout, config.AdminAssetsDir, coord, coord, clusterConfig, raftServer)
httpApi.EnableSsl(config.ApiHttpSslPortString(), config.ApiHttpCertPath)
graphiteApi := graphite.NewServer(config, coord, clusterConfig)
udpApi := udp.NewServer(config, coord, clusterConfig)
adminServer := admin.NewHttpServer(config.AdminAssetsDir, config.AdminHttpPortString())

return &Server{
@@ -66,6 +69,7 @@ func NewServer(config *configuration.Configuration) (*Server, error) {
ClusterConfig: clusterConfig,
HttpApi: httpApi,
GraphiteApi: graphiteApi,
UdpApi: udpApi,
Coordinator: coord,
AdminServer: adminServer,
Config: config,
@@ -124,10 +128,19 @@ func (self *Server) ListenAndServe() error {
go self.GraphiteApi.ListenAndServe()
}
}

if self.Config.UdpInputEnabled {
if self.Config.UdpInputPort <= 0 || self.Config.UdpInputDatabase == "" {
log.Warn("Cannot start udp server. please check your configuration")
} else {
log.Info("Starting UDP Listener on port %d", self.Config.UdpInputPort)
go self.UdpApi.ListenAndServe()
}
}

// start processing continuous queries
self.RaftServer.StartProcessingContinuousQueries()

log.Info("Starting Http Api server on port %d", self.Config.ApiHttpPort)
self.HttpApi.ListenAndServe()

ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.