diff --git a/internal/producing/server.go b/internal/producing/server.go index 3a3e80f..3c018c1 100644 --- a/internal/producing/server.go +++ b/internal/producing/server.go @@ -19,8 +19,6 @@ import ( "github.com/barcostreams/barco/internal/utils" "github.com/julienschmidt/httprouter" "github.com/rs/zerolog/log" - "golang.org/x/net/http2" - "golang.org/x/net/http2/h2c" ) type Producer interface { @@ -81,14 +79,9 @@ func (p *producer) AcceptConnections() error { fmt.Fprint(w, "Producer server doesn't allow getting topic messages\n") }) - h2s := &http2.Server{} server := &http.Server{ Addr: address, - Handler: h2c.NewHandler(router, h2s), - } - - if err := http2.ConfigureServer(server, h2s); err != nil { - return err + Handler: router, } c := make(chan bool, 1) @@ -128,6 +121,7 @@ func (p *producer) OnReroutedMessage( func (p *producer) postMessage(w http.ResponseWriter, r *http.Request, ps httprouter.Params) error { metrics.ProducerMessagesReceived.Inc() metrics.ProducerMessagesBodyBytes.Add(float64(r.ContentLength)) + return p.handleMessage( ps.ByName("topic"), r.URL.Query(), r.ContentLength, r.Header.Get(types.ContentTypeHeaderKey), r.Body) } diff --git a/internal/test/integration/roundtrip_test.go b/internal/test/integration/roundtrip_test.go index 77311ae..7bc11e0 100644 --- a/internal/test/integration/roundtrip_test.go +++ b/internal/test/integration/roundtrip_test.go @@ -113,9 +113,6 @@ var _ = Describe("A 3 node cluster", func() { Expect(item.records[0].timestamp.UnixMilli()).To(BeNumerically("<=", time.Now().UnixMilli())) Expect(item.records[0].body).To(Equal(message)) - // Test with HTTP/1 - expectOk(NewTestClient(&TestClientOptions{HttpVersion: 1}).ProduceJson(0, "abc", message, "")) - client.Close() }) diff --git a/internal/test/integration/test_client.go b/internal/test/integration/test_client.go index 9cbfe12..030605e 100644 --- a/internal/test/integration/test_client.go +++ b/internal/test/integration/test_client.go @@ -20,11 +20,12 @@ import ( // Represents a Barco Client used for integration tests type TestClient struct { - client *http.Client + consumerClient *http.Client + producerClient *http.Client } type TestClientOptions struct { - HttpVersion int + } const ( @@ -38,7 +39,7 @@ func NewTestClient(options *TestClientOptions) *TestClient { options = &TestClientOptions{} } - client := &http.Client{ + consumerClient := &http.Client{ Transport: &http2.Transport{ StrictMaxConcurrentStreams: true, // Do not create additional connections AllowHTTP: true, @@ -53,16 +54,16 @@ func NewTestClient(options *TestClientOptions) *TestClient { }, } - if options.HttpVersion == 1 { - client = &http.Client{ - Transport: &http.Transport{ - MaxConnsPerHost: 1, - }, - } + + producerClient := &http.Client{ + Transport: &http.Transport{ + MaxConnsPerHost: 1, + }, } return &TestClient{ - client: client, + consumerClient: consumerClient, + producerClient: producerClient, } } @@ -76,7 +77,7 @@ func (c *TestClient) ProduceNDJson(ordinal int, topic string, message string, pa func (c *TestClient) produce(ordinal int, topic string, message string, partitionKey string, contentType string) *http.Response { url := c.ProducerUrl(ordinal, topic, partitionKey) - resp, err := c.client.Post(url, contentType, strings.NewReader(message)) + resp, err := c.producerClient.Post(url, contentType, strings.NewReader(message)) Expect(err).NotTo(HaveOccurred()) return resp } @@ -92,7 +93,7 @@ func (c *TestClient) ProducerUrl(ordinal int, topic string, partitionKey string) func (c *TestClient) RegisterAsConsumer(clusterSize int, message string) { for i := 0; i < clusterSize; i++ { url := fmt.Sprintf("http://127.0.0.%d:%d/%s", i+1, consumerPort, conf.ConsumerRegisterUrl) - resp, err := c.client.Post(url, "application/json", strings.NewReader(message)) + resp, err := c.consumerClient.Post(url, "application/json", strings.NewReader(message)) Expect(err).NotTo(HaveOccurred()) Expect(resp.StatusCode).To(Equal(http.StatusOK)) } @@ -103,7 +104,7 @@ func (c *TestClient) ConsumerPoll(ordinal int) *http.Response { var resp *http.Response for i := 0; i < 10; i++ { var err error - resp, err = c.client.Post(url, "application/json", strings.NewReader("")) + resp, err = c.consumerClient.Post(url, "application/json", strings.NewReader("")) Expect(err).NotTo(HaveOccurred()) Expect(resp.StatusCode).To(BeNumerically(">=", http.StatusOK)) Expect(resp.StatusCode).To(BeNumerically("<", 300)) @@ -119,7 +120,7 @@ func (c *TestClient) ConsumerPoll(ordinal int) *http.Response { func (c *TestClient) ConsumerCommit(ordinal int) *http.Response { url := fmt.Sprintf("http://127.0.0.%d:%d/%s", ordinal+1, consumerPort, conf.ConsumerManualCommitUrl) - resp, err := c.client.Post(url, "application/json", strings.NewReader("")) + resp, err := c.consumerClient.Post(url, "application/json", strings.NewReader("")) Expect(err).NotTo(HaveOccurred()) Expect(resp.StatusCode).To(BeNumerically(">=", http.StatusOK)) Expect(resp.StatusCode).To(BeNumerically("<", 300)) @@ -127,7 +128,8 @@ func (c *TestClient) ConsumerCommit(ordinal int) *http.Response { } func (c *TestClient) Close() { - c.client.CloseIdleConnections() + c.producerClient.CloseIdleConnections() + c.consumerClient.CloseIdleConnections() } func ReadBody(resp *http.Response) string {