Navigation Menu

Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove /data/process_continuous_queries endpoint #7431

Closed
mark-rushakoff opened this issue Oct 7, 2016 · 3 comments
Closed

Remove /data/process_continuous_queries endpoint #7431

mark-rushakoff opened this issue Oct 7, 2016 · 3 comments

Comments

@mark-rushakoff
Copy link
Contributor

The endpoint has been commented with a TODO to remove for over 6 months. I don't believe we intended to ship the endpoint with 1.0, and I don't believe removing it would qualify as a breaking change.

I removed the endpoint and began removing the ContinuousQuerier field from the http Handler, but then I stumbled across TestServer_ContinuousQuery which depended on the endpoint. I don't have time right now to dig in to porting the tests to the continuous_querier package, so ideally someone more familiar with CQ internals could port them.

Here's the patch of removing CQs from HTTP. The code compiles, but the Server CQ test obviously fails.

diff --git a/cmd/influxd/run/server.go b/cmd/influxd/run/server.go
index b313b95..225aab8 100644
--- a/cmd/influxd/run/server.go
+++ b/cmd/influxd/run/server.go
@@ -267,13 +267,6 @@ func (s *Server) appendHTTPDService(c httpd.Config) {
    srv.Handler.PointsWriter = s.PointsWriter
    srv.Handler.Version = s.buildInfo.Version

-   // If a ContinuousQuerier service has been started, attach it.
-   for _, srvc := range s.Services {
-       if cqsrvc, ok := srvc.(continuous_querier.ContinuousQuerier); ok {
-           srv.Handler.ContinuousQuerier = cqsrvc
-       }
-   }
-
    s.Services = append(s.Services, srv)
 }

diff --git a/services/httpd/handler.go b/services/httpd/handler.go
index f84a13d..9524ae3 100644
--- a/services/httpd/handler.go
+++ b/services/httpd/handler.go
@@ -24,7 +24,6 @@ import (
    "github.com/influxdata/influxdb/influxql"
    "github.com/influxdata/influxdb/models"
    "github.com/influxdata/influxdb/monitor"
-   "github.com/influxdata/influxdb/services/continuous_querier"
    "github.com/influxdata/influxdb/services/meta"
    "github.com/influxdata/influxdb/uuid"
 )
@@ -88,8 +87,6 @@ type Handler struct {
        WritePoints(database, retentionPolicy string, consistencyLevel models.ConsistencyLevel, points []models.Point) error
    }

-   ContinuousQuerier continuous_querier.ContinuousQuerier
-
    Config    *Config
    Logger    *log.Logger
    CLFLogger *log.Logger
@@ -143,11 +140,6 @@ func NewHandler(c Config) *Handler {
            "status-head",
            "HEAD", "/status", false, true, h.serveStatus,
        },
-       // TODO: (corylanou) remove this and associated code
-       Route{ // Tell data node to run CQs that should be run
-           "process-continuous-queries",
-           "POST", "/data/process_continuous_queries", false, false, h.serveProcessContinuousQueries,
-       },
    }...)

    return h
@@ -156,7 +148,6 @@ func NewHandler(c Config) *Handler {
 // Statistics maintains statistics for the httpd service.
 type Statistics struct {
    Requests                     int64
-   CQRequests                   int64
    QueryRequests                int64
    WriteRequests                int64
    PingRequests                 int64
@@ -182,7 +173,6 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
        Tags: tags,
        Values: map[string]interface{}{
            statRequest:                      atomic.LoadInt64(&h.stats.Requests),
-           statCQRequest:                    atomic.LoadInt64(&h.stats.CQRequests),
            statQueryRequest:                 atomic.LoadInt64(&h.stats.QueryRequests),
            statWriteRequest:                 atomic.LoadInt64(&h.stats.WriteRequests),
            statPingRequest:                  atomic.LoadInt64(&h.stats.PingRequests),
@@ -276,47 +266,6 @@ func (h *Handler) writeHeader(w http.ResponseWriter, code int) {
    w.WriteHeader(code)
 }

-func (h *Handler) serveProcessContinuousQueries(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
-   atomic.AddInt64(&h.stats.CQRequests, 1)
-
-   // If the continuous query service isn't configured, return 404.
-   if h.ContinuousQuerier == nil {
-       h.writeHeader(w, http.StatusNotImplemented)
-       return
-   }
-
-   q := r.URL.Query()
-
-   // Get the database name (blank means all databases).
-   db := q.Get("db")
-   // Get the name of the CQ to run (blank means run all).
-   name := q.Get("name")
-   // Get the time for which the CQ should be evaluated.
-   t := time.Now()
-   var err error
-   s := q.Get("time")
-   if s != "" {
-       t, err = time.Parse(time.RFC3339Nano, s)
-       if err != nil {
-           // Try parsing as an int64 nanosecond timestamp.
-           i, err := strconv.ParseInt(s, 10, 64)
-           if err != nil {
-               h.writeHeader(w, http.StatusBadRequest)
-               return
-           }
-           t = time.Unix(0, i)
-       }
-   }
-
-   // Pass the request to the CQ service.
-   if err := h.ContinuousQuerier.Run(db, name, t); err != nil {
-       h.writeHeader(w, http.StatusBadRequest)
-       return
-   }
-
-   h.writeHeader(w, http.StatusNoContent)
-}
-
 // serveQuery parses an incoming query and, if valid, executes the query.
 func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user *meta.UserInfo) {
    atomic.AddInt64(&h.stats.QueryRequests, 1)
diff --git a/services/httpd/service.go b/services/httpd/service.go
index 064bbb5..412c3c9 100644
--- a/services/httpd/service.go
+++ b/services/httpd/service.go
@@ -20,7 +20,6 @@ import (
 // statistics gathered by the httpd package.
 const (
    statRequest                      = "req"                // Number of HTTP requests served
-   statCQRequest                    = "cqReq"              // Number of CQ-execute requests served
    statQueryRequest                 = "queryReq"           // Number of query requests served
    statWriteRequest                 = "writeReq"           // Number of write requests serverd
    statPingRequest                  = "pingReq"            // Number of ping requests served
@jwilder
Copy link
Contributor

jwilder commented Oct 13, 2016

That endpoint was an internally used endpoint used in the clustering that was removed in 0.11. We can remove it.

@jwilder
Copy link
Contributor

jwilder commented Oct 18, 2016

@mark-rushakoff Do you have a branch you could push to PR this change?

@mark-rushakoff
Copy link
Contributor Author

@jwilder I don't have the code on a branch, just the inline patch.

server_test.go depends on being able to trigger a CQ at will, so I think we need to either keep the endpoint, find a way to only enable the endpoint during test, or find a non-HTTP way to trigger CQs in test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants