Skip to content

Commit

Permalink
Drop support for HTTP/2 on the producer server
Browse files Browse the repository at this point in the history
HTTP/2 can break request and responses into multiple streamids
each, interleaving _partial_ request bodies.
If we don't allocate memory to read in parrallel, we can't
independently move forward a request body without moving the
whole stream forward. The resolution is to remove HTTP/2 support.

Fixes #56.
  • Loading branch information
jorgebay committed Oct 14, 2022
1 parent 155bbf0 commit 5e65e9b
Show file tree
Hide file tree
Showing 3 changed files with 19 additions and 26 deletions.
10 changes: 2 additions & 8 deletions internal/producing/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"github.com/barcostreams/barco/internal/utils"
"github.com/julienschmidt/httprouter"
"github.com/rs/zerolog/log"
"golang.org/x/net/http2"
"golang.org/x/net/http2/h2c"
)

type Producer interface {
Expand Down Expand Up @@ -81,14 +79,9 @@ func (p *producer) AcceptConnections() error {
fmt.Fprint(w, "Producer server doesn't allow getting topic messages\n")
})

h2s := &http2.Server{}
server := &http.Server{
Addr: address,
Handler: h2c.NewHandler(router, h2s),
}

if err := http2.ConfigureServer(server, h2s); err != nil {
return err
Handler: router,
}

c := make(chan bool, 1)
Expand Down Expand Up @@ -128,6 +121,7 @@ func (p *producer) OnReroutedMessage(
func (p *producer) postMessage(w http.ResponseWriter, r *http.Request, ps httprouter.Params) error {
metrics.ProducerMessagesReceived.Inc()
metrics.ProducerMessagesBodyBytes.Add(float64(r.ContentLength))

return p.handleMessage(
ps.ByName("topic"), r.URL.Query(), r.ContentLength, r.Header.Get(types.ContentTypeHeaderKey), r.Body)
}
Expand Down
3 changes: 0 additions & 3 deletions internal/test/integration/roundtrip_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,9 +113,6 @@ var _ = Describe("A 3 node cluster", func() {
Expect(item.records[0].timestamp.UnixMilli()).To(BeNumerically("<=", time.Now().UnixMilli()))
Expect(item.records[0].body).To(Equal(message))

// Test with HTTP/1
expectOk(NewTestClient(&TestClientOptions{HttpVersion: 1}).ProduceJson(0, "abc", message, ""))

client.Close()
})

Expand Down
32 changes: 17 additions & 15 deletions internal/test/integration/test_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,12 @@ import (

// Represents a Barco Client used for integration tests
type TestClient struct {
client *http.Client
consumerClient *http.Client
producerClient *http.Client
}

type TestClientOptions struct {
HttpVersion int

}

const (
Expand All @@ -38,7 +39,7 @@ func NewTestClient(options *TestClientOptions) *TestClient {
options = &TestClientOptions{}
}

client := &http.Client{
consumerClient := &http.Client{
Transport: &http2.Transport{
StrictMaxConcurrentStreams: true, // Do not create additional connections
AllowHTTP: true,
Expand All @@ -53,16 +54,16 @@ func NewTestClient(options *TestClientOptions) *TestClient {
},
}

if options.HttpVersion == 1 {
client = &http.Client{
Transport: &http.Transport{
MaxConnsPerHost: 1,
},
}

producerClient := &http.Client{
Transport: &http.Transport{
MaxConnsPerHost: 1,
},
}

return &TestClient{
client: client,
consumerClient: consumerClient,
producerClient: producerClient,
}
}

Expand All @@ -76,7 +77,7 @@ func (c *TestClient) ProduceNDJson(ordinal int, topic string, message string, pa

func (c *TestClient) produce(ordinal int, topic string, message string, partitionKey string, contentType string) *http.Response {
url := c.ProducerUrl(ordinal, topic, partitionKey)
resp, err := c.client.Post(url, contentType, strings.NewReader(message))
resp, err := c.producerClient.Post(url, contentType, strings.NewReader(message))
Expect(err).NotTo(HaveOccurred())
return resp
}
Expand All @@ -92,7 +93,7 @@ func (c *TestClient) ProducerUrl(ordinal int, topic string, partitionKey string)
func (c *TestClient) RegisterAsConsumer(clusterSize int, message string) {
for i := 0; i < clusterSize; i++ {
url := fmt.Sprintf("http://127.0.0.%d:%d/%s", i+1, consumerPort, conf.ConsumerRegisterUrl)
resp, err := c.client.Post(url, "application/json", strings.NewReader(message))
resp, err := c.consumerClient.Post(url, "application/json", strings.NewReader(message))
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(Equal(http.StatusOK))
}
Expand All @@ -103,7 +104,7 @@ func (c *TestClient) ConsumerPoll(ordinal int) *http.Response {
var resp *http.Response
for i := 0; i < 10; i++ {
var err error
resp, err = c.client.Post(url, "application/json", strings.NewReader(""))
resp, err = c.consumerClient.Post(url, "application/json", strings.NewReader(""))
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(BeNumerically(">=", http.StatusOK))
Expect(resp.StatusCode).To(BeNumerically("<", 300))
Expand All @@ -119,15 +120,16 @@ func (c *TestClient) ConsumerPoll(ordinal int) *http.Response {

func (c *TestClient) ConsumerCommit(ordinal int) *http.Response {
url := fmt.Sprintf("http://127.0.0.%d:%d/%s", ordinal+1, consumerPort, conf.ConsumerManualCommitUrl)
resp, err := c.client.Post(url, "application/json", strings.NewReader(""))
resp, err := c.consumerClient.Post(url, "application/json", strings.NewReader(""))
Expect(err).NotTo(HaveOccurred())
Expect(resp.StatusCode).To(BeNumerically(">=", http.StatusOK))
Expect(resp.StatusCode).To(BeNumerically("<", 300))
return resp
}

func (c *TestClient) Close() {
c.client.CloseIdleConnections()
c.producerClient.CloseIdleConnections()
c.consumerClient.CloseIdleConnections()
}

func ReadBody(resp *http.Response) string {
Expand Down

0 comments on commit 5e65e9b

Please sign in to comment.