Skip to content

Commit

Permalink
Implement a working collector MVP
Browse files Browse the repository at this point in the history
  • Loading branch information
hellais committed Jun 1, 2018
1 parent 7111769 commit 922a71d
Show file tree
Hide file tree
Showing 10 changed files with 550 additions and 9 deletions.
37 changes: 36 additions & 1 deletion Gopkg.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions cmd/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ func init() {

RootCmd.PersistentFlags().StringVar(&cfgFile, "config", "", "config file (default is ./ooni-collector.toml)")
RootCmd.PersistentFlags().StringP("log-level", "", "info", "Set the log level")
RootCmd.PersistentFlags().StringP("data-root", "", "/var/ooni-collector", "In which directory we should be writing working files to")
viper.BindPFlag("core.log-level", RootCmd.PersistentFlags().Lookup("log-level"))
viper.BindPFlag("core.data-root", RootCmd.PersistentFlags().Lookup("data-root"))
}

// initConfig reads in config file and ENV variables if set.
Expand Down
12 changes: 9 additions & 3 deletions collector/api/v1/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,12 +13,18 @@ var log = apexLog.WithFields(apexLog.Fields{

// BindAPI bind all the request handlers and middleware
func BindAPI(router *gin.Engine) error {

// This is to support legacy clients
router.POST("/report", handler.CreateReportHandler)
router.PUT("/report", handler.DeprecatedUpdateReportHandler)
router.POST("/report/:reportID", handler.UpdateReportHandler)
router.POST("/report/:reportID/close", handler.CloseReportHandler)

v1 := router.Group("/api/v1")

v1.POST("/report", handler.CreateReportHandler)
v1.PUT("/report", handler.DeprecatedUpdateReportHandler)
v1.POST("/report/:report_id", handler.UpdateReportHandler)
v1.POST("/report/:report_id/close", handler.CloseReportHandler)
v1.POST("/report/:reportID", handler.UpdateReportHandler)
v1.POST("/report/:reportID/close", handler.CloseReportHandler)

return nil
}
33 changes: 32 additions & 1 deletion collector/collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,12 @@ package collector
import (
"fmt"
"net/http"
"os"

"github.com/ooni/collector/collector/api/v1"
"github.com/ooni/collector/collector/middleware"
"github.com/ooni/collector/collector/paths"
"github.com/ooni/collector/storage"

apexLog "github.com/apex/log"
"github.com/facebookgo/grace/gracehttp"
Expand All @@ -17,13 +21,40 @@ var log = apexLog.WithFields(apexLog.Fields{
"cmd": "ooni-collector",
})

// Start the registry server
func initDataRoot() error {
requiredDirs := []string{
paths.ReportDir(),
paths.TempReportDir(),
paths.BadgerDir(),
}
for _, path := range requiredDirs {
if _, err := os.Stat(path); os.IsNotExist(err) {
err := os.Mkdir(path, 0700)
if err != nil {
return err
}
}
}
return nil
}

// Start the collector server
func Start() {
var (
err error
)

if err = initDataRoot(); err != nil {
log.WithError(err).Error("failed to init data root")
}
storage, err := middleware.InitStorageMiddleware(storage.New(paths.BadgerDir()))
if err != nil {
log.WithError(err).Error("failed to init storage middleware")
return
}

router := gin.Default()
router.Use(storage.MiddlewareFunc())
err = apiv1.BindAPI(router)
if err != nil {
log.WithError(err).Error("failed to BindAPI")
Expand Down
214 changes: 210 additions & 4 deletions collector/handler/handler.go
Original file line number Diff line number Diff line change
@@ -1,37 +1,243 @@
package handler

import (
"encoding/json"
"errors"
"net/http"
"os"
"path/filepath"
"regexp"
"time"

apexLog "github.com/apex/log"
"github.com/gin-gonic/gin"
"github.com/ooni/collector/collector/paths"
"github.com/ooni/collector/collector/report"
"github.com/ooni/collector/storage"
)

var log = apexLog.WithFields(apexLog.Fields{
"pkg": "handler",
"cmd": "ooni-collector",
})

const backendVersion = "2.0.0-alpha"

func createNewReport(store *storage.Storage, req CreateReportRequest) (string, error) {
reportID := report.GenReportID(req.ProbeASN)
tmpPath := filepath.Join(paths.TempReportDir(), reportID)
meta := report.Metadata{
ReportID: reportID,
TestName: req.TestName,
ProbeASN: req.ProbeASN,
ProbeCC: "",
SoftwareName: req.SoftwareName,
SoftwareVersion: req.SoftwareVersion,
CreationTime: time.Now().UTC(),
LastUpdateTime: time.Now().UTC(),
ReportFilePath: tmpPath,
Closed: false,
EntryCount: 0,
}
store.SetReport(&meta)
os.OpenFile(tmpPath, os.O_RDONLY|os.O_CREATE, 0700)

return meta.ReportID, nil
}

// CreateReportRequest what a client sends as a request to create a new report
type CreateReportRequest struct {
SoftwareName string `json:"software_name"`
SoftwareVersion string `json:"software_version"`
TestName string `json:"test_name" binding:""`
TestVersion string `json:"test_version"`
ProbeASN string `json:"probe_asn"`
Content string `json:"content"`
}

var softwareVersionRegexp = regexp.MustCompile("^[0-9A-Za-z_.+-]+$")
var testNameRegexp = regexp.MustCompile("^[a-zA-Z0-9_\\- ]+$")
var probeASNRegexp = regexp.MustCompile("^AS[0-9]+$")
var probeCCRegexp = regexp.MustCompile("^[A-Z]{2}$")

func validateRequest(req *CreateReportRequest) error {
if softwareVersionRegexp.MatchString(req.SoftwareName) != true {
return errors.New("Invalid software_name")
}
if testNameRegexp.MatchString(req.TestName) != true {
return errors.New("Invalid test_name")
}
if probeASNRegexp.MatchString(req.ProbeASN) != true {
return errors.New("Invalid probe_asn")
}
return nil
}

// CreateReportHandler for report creation
func CreateReportHandler(c *gin.Context) {
store := c.MustGet("Storage").(*storage.Storage)

var req CreateReportRequest

if err := c.BindJSON(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
if err := validateRequest(&req); err != nil {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}

reportID, err := createNewReport(store, req)
if err != nil {
// XXX check this against the spec
c.JSON(http.StatusBadRequest, gin.H{
"error": "invalid request",
})
return
}

c.JSON(http.StatusOK, gin.H{
"backend_version": "XXX",
"report_id": "XXX",
"test_helper_address": "XXX",
"supported_formats": "XXX",
"backend_version": backendVersion,
"report_id": reportID,
"supported_formats": []string{"json"},
})
return
}

// DeprecatedUpdateReportHandler is for legacy clients
func DeprecatedUpdateReportHandler(c *gin.Context) {
c.JSON(http.StatusOK, gin.H{
"status": "success",
})
return
}

// ErrReportIsClosed indicates the report has already been closed
var ErrReportIsClosed = errors.New("Report is already closed")

func addBackendExtra(meta *report.Metadata, entry *report.MeasurementEntry) {
entry.BackendVersion = backendVersion
entry.BackendExtra.SubmissionTime = meta.LastUpdateTime
}

func writeEntry(store *storage.Storage, entry *report.MeasurementEntry) error {
meta, err := store.GetReport(entry.ReportID)
if err != nil {
return err
}
if meta.Closed == true {
return ErrReportIsClosed
}
if meta.ProbeCC == "" {
if probeCCRegexp.MatchString(entry.ProbeCC) != true {
log.Debugf("entry: %v", entry)
log.Debugf("Invalid probe cc: \"%s\"", entry.ProbeCC)
return errors.New("Invalid probe_cc")
}
meta.ProbeCC = entry.ProbeCC
}
meta.LastUpdateTime = time.Now().UTC()
meta.EntryCount++
addBackendExtra(meta, entry)

f, err := os.OpenFile(meta.ReportFilePath, os.O_APPEND|os.O_WRONLY, 0700)
if err != nil {
return err
}
defer f.Close()

enc := json.NewEncoder(f)
err = enc.Encode(entry)
if err != nil {
return err
}

if err = store.SetReport(meta); err != nil {
return err
}

return nil
}

type UpdateReportRequest struct {
Content report.MeasurementEntry `json:"content" binding:"required"`
Format string `json:"format"`
}

// UpdateReportHandler appends to an open report
func UpdateReportHandler(c *gin.Context) {
var err error

store := c.MustGet("Storage").(*storage.Storage)
reportID := c.Param("reportID")

var req UpdateReportRequest
if err = c.BindJSON(&req); err != nil {
log.WithError(err).Error("failed to bindJSON")
return
}
entry := req.Content

// We overwrite the reportID so that there cannot be any mismatch of th
entry.ReportID = reportID

err = writeEntry(store, &entry)
if err != nil {
if err == storage.ErrReportNotFound {
log.WithError(err).Debug("report not found error")
// XXX use the correct return value
c.JSON(http.StatusNotFound, gin.H{
"status": "not found",
})
}
log.WithError(err).Error("got an invalid request")
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, gin.H{
"status": "success",
})
return
}

func closeReport(store *storage.Storage, reportID string) error {
meta, err := store.GetReport(reportID)
if err != nil {
return err
}
dstPath := paths.ClosedReportPath(meta)
if meta.EntryCount > 0 {
err = os.Rename(meta.ReportFilePath, dstPath)
if err != nil {
return err
}
} else {
// There is no need to keep closed empty reports
os.Remove(meta.ReportFilePath)
}
meta.Closed = true

if err = store.SetReport(meta); err != nil {
return err
}

return nil
}

// CloseReportHandler moves the report to the report-dir
func CloseReportHandler(c *gin.Context) {
store := c.MustGet("Storage").(*storage.Storage)
reportID := c.Param("reportID")

err := closeReport(store, reportID)
if err != nil {
// XXX return proper error
c.JSON(http.StatusNotAcceptable, gin.H{
"error": "something",
})
return
}
c.JSON(http.StatusOK, gin.H{
"status": "success",
})
Expand Down
Loading

0 comments on commit 922a71d

Please sign in to comment.