From cf4038fcf2dfad23f42005caa88dbf99a9dceeae Mon Sep 17 00:00:00 2001 From: Travis Jeffery Date: Sun, 23 Oct 2016 19:40:39 -0400 Subject: [PATCH] add api to append messages --- cluster/broker.go | 15 ------ cluster/controller.go | 44 ----------------- cluster/partition.go | 17 ------- cluster/topic_partition.go | 43 +++++++++++++++++ commitlog/commitlog.go | 44 ++++++++++++----- commitlog/commitlog_test.go | 32 +++++++++---- commitlog/index.go | 18 +++---- commitlog/index_test.go | 6 +-- commitlog/reader.go | 4 +- commitlog/segment.go | 67 ++++++++++---------------- commitlog/util.go | 4 +- main.go | 1 + server/server.go | 52 +++++++++++++------- store/store.go | 95 +++++++++++++++++++++++++++---------- 14 files changed, 246 insertions(+), 196 deletions(-) delete mode 100644 cluster/broker.go delete mode 100644 cluster/controller.go delete mode 100644 cluster/partition.go create mode 100644 cluster/topic_partition.go diff --git a/cluster/broker.go b/cluster/broker.go deleted file mode 100644 index ffdacd4f..00000000 --- a/cluster/broker.go +++ /dev/null @@ -1,15 +0,0 @@ -package cluster - -type BrokerOptions struct { - ID string -} - -type Broker struct { - BrokerOptions -} - -func NewBroker(opts BrokerOptions) *Broker { - return &Broker{ - BrokerOptions: opts, - } -} diff --git a/cluster/controller.go b/cluster/controller.go deleted file mode 100644 index afe59d91..00000000 --- a/cluster/controller.go +++ /dev/null @@ -1,44 +0,0 @@ -package cluster - -import ( - "errors" - - "github.com/travisjeffery/jocko/store" -) - -type Controller struct { - Store *store.Store -} - -func (c *Controller) CreateTopic(topic string, partitions int) error { - for _, t := range c.Store.Topics() { - if t == topic { - return errors.New("topic exists already") - } - } - numPartitions, err := c.Store.NumPartitions() - if err != nil { - return err - } - if partitions != 0 { - numPartitions = partitions - } - brokers, err := c.Store.Brokers() - if err != nil { - return err - } - for i := 0; i < numPartitions; i++ { - broker := brokers[i%len(brokers)] - partition := store.TopicPartition{ - Partition: i, - Topic: topic, - Leader: broker, - PreferredLeader: broker, - Replicas: []string{broker}, - } - if err := c.Store.AddPartition(partition); err != nil { - return err - } - } - return nil -} diff --git a/cluster/partition.go b/cluster/partition.go deleted file mode 100644 index 35b27378..00000000 --- a/cluster/partition.go +++ /dev/null @@ -1,17 +0,0 @@ -package cluster - -import "github.com/travisjeffery/jocko/commitlog" - -type PartitionOptions struct { - Topic string -} - -type Partition struct { - PartitionOptions - ID int - CommitLog *commitlog.CommitLog -} - -func NewPartition(opts PartitionOptions) *Partition { - return &Partition{} -} diff --git a/cluster/topic_partition.go b/cluster/topic_partition.go new file mode 100644 index 00000000..99a0109a --- /dev/null +++ b/cluster/topic_partition.go @@ -0,0 +1,43 @@ +package cluster + +import ( + "fmt" + "path" + + "github.com/travisjeffery/jocko/commitlog" +) + +type TopicPartition struct { + Topic string `json:"topic"` + Partition int `json:"partition"` + + // broker ids + Replicas []string `json:"replicas"` + Leader string `json:"leader"` + PreferredLeader string `json:"preferred_leader"` + + CommitLog *commitlog.CommitLog `json:"-"` +} + +func (p TopicPartition) String() string { + return fmt.Sprintf("%s-%d", p.Topic, p.Partition) +} + +// OpenCommitLog opens a commit log for the partition at the path. +func (partition *TopicPartition) OpenCommitLog(logDir string) error { + var err error + partition.CommitLog, err = commitlog.New(commitlog.Options{ + Path: path.Join(logDir, partition.String()), + SegmentBytes: 1024, + }) + if err != nil { + return err + } + if err = partition.CommitLog.Init(); err != nil { + return err + } + if err = partition.CommitLog.Open(); err != nil { + return err + } + return nil +} diff --git a/commitlog/commitlog.go b/commitlog/commitlog.go index f0f05120..04f16ceb 100644 --- a/commitlog/commitlog.go +++ b/commitlog/commitlog.go @@ -6,15 +6,28 @@ import ( "path/filepath" "sync" "sync/atomic" + "time" "github.com/pkg/errors" ) +type Message struct { + Offset int64 `json:"offset"` + Value []byte `json:"value"` + Timestamp time.Time `json:"timestamp"` +} + +type MessageSet struct { + Offset int64 `json:"offset"` + MessageSize int32 `json:"message_size"` + Payload []byte `json:"payload"` +} + type CommitLog struct { Options name string mu sync.RWMutex - segments []*segment + segments []*Segment vActiveSegment atomic.Value } @@ -41,7 +54,7 @@ func New(opts Options) (*CommitLog, error) { return l, nil } -func (l *CommitLog) init() error { +func (l *CommitLog) Init() error { err := os.MkdirAll(l.Path, 0755) if err != nil { return errors.Wrap(err, "mkdir failed") @@ -49,7 +62,7 @@ func (l *CommitLog) init() error { return nil } -func (l *CommitLog) open() error { +func (l *CommitLog) Open() error { _, err := ioutil.ReadDir(l.Path) if err != nil { return errors.Wrap(err, "read dir failed") @@ -66,21 +79,26 @@ func (l *CommitLog) open() error { return nil } -func (l *CommitLog) deleteAll() error { +func (l *CommitLog) DeleteAll() error { return os.RemoveAll(l.Path) } -func (l *CommitLog) Write(p []byte) (n int, err error) { +func (l *CommitLog) Append(m MessageSet) error { l.mu.Lock() defer l.mu.Unlock() - if l.checkSplit() { - if err = l.split(); err != nil { - return 0, err + if err := l.split(); err != nil { + return err } } - - return l.activeSegment().Write(p) + position := l.activeSegment().Position + if _, err := l.activeSegment().Write(m.Payload); err != nil { + return err + } + return l.activeSegment().Index.WriteEntry(Entry{ + Offset: m.Offset, + Position: position, + }) } func (l *CommitLog) Read(p []byte) (n int, err error) { @@ -104,9 +122,9 @@ func (l *CommitLog) split() error { } func (l *CommitLog) newestOffset() int64 { - return l.activeSegment().NextOffset() + return l.activeSegment().NextOffset } -func (l *CommitLog) activeSegment() *segment { - return l.vActiveSegment.Load().(*segment) +func (l *CommitLog) activeSegment() *Segment { + return l.vActiveSegment.Load().(*Segment) } diff --git a/commitlog/commitlog_test.go b/commitlog/commitlog_test.go index dc06003e..7181f890 100644 --- a/commitlog/commitlog_test.go +++ b/commitlog/commitlog_test.go @@ -8,11 +8,15 @@ import ( "os" "path/filepath" "testing" + + "github.com/stretchr/testify/assert" ) func TestNewCommitLog(t *testing.T) { path := filepath.Join(os.TempDir(), fmt.Sprintf("commitlogtest%d", rand.Int63())) - fmt.Println(path) + os.RemoveAll(path) + os.MkdirAll(path, 0755) + opts := Options{ Path: path, SegmentBytes: 6, @@ -20,31 +24,43 @@ func TestNewCommitLog(t *testing.T) { l, err := New(opts) // remove old data - l.deleteAll() + assert.NoError(t, l.DeleteAll()) - l.init() - l.open() + assert.NoError(t, l.Init()) + assert.NoError(t, l.Open()) if err != nil { t.Fatal(err) } - _, err = l.Write([]byte("one")) + err = l.Append(MessageSet{ + Offset: 0, + Payload: []byte("one"), + }) if err != nil { t.Error(err) } - _, err = l.Write([]byte("two")) + err = l.Append(MessageSet{ + Offset: 1, + Payload: []byte("two"), + }) if err != nil { t.Error(err) } - _, err = l.Write([]byte("three")) + err = l.Append(MessageSet{ + Offset: 2, + Payload: []byte("three"), + }) if err != nil { t.Error(err) } - _, err = l.Write([]byte("four")) + err = l.Append(MessageSet{ + Offset: 3, + Payload: []byte("four"), + }) if err != nil { t.Error(err) } diff --git a/commitlog/index.go b/commitlog/index.go index 4b315e7c..f988dc4f 100644 --- a/commitlog/index.go +++ b/commitlog/index.go @@ -27,25 +27,25 @@ type index struct { offset int64 } -type entry struct { +type Entry struct { Offset int64 Position int64 } -// relEntry is an entry relative to the base offset +// relEntry is an Entry relative to the base offset type relEntry struct { Offset int8 Position int8 } -func newRelEntry(e entry, baseOffset int64) relEntry { +func newRelEntry(e Entry, baseOffset int64) relEntry { return relEntry{ Offset: int8(e.Offset - baseOffset), Position: int8(e.Position), } } -func (rel relEntry) fill(e *entry, baseOffset int64) { +func (rel relEntry) fill(e *Entry, baseOffset int64) { e.Offset = baseOffset + int64(rel.Offset) e.Position = int64(rel.Position) } @@ -68,6 +68,7 @@ func newIndex(opts options) (idx *index, err error) { idx = &index{ options: opts, } + idx.file, err = os.OpenFile(opts.path, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { return nil, errors.Wrap(err, "open file failed") @@ -83,8 +84,9 @@ func newIndex(opts options) (idx *index, err error) { if err != nil { return nil, errors.Wrap(err, "file truncate failed") } + } else { + idx.offset = size } - idx.offset = size idx.mmap, err = gommap.Map(idx.file.Fd(), gommap.PROT_READ|gommap.PROT_WRITE, gommap.MAP_SHARED) if err != nil { return nil, errors.Wrap(err, "mmap file failed") @@ -92,7 +94,7 @@ func newIndex(opts options) (idx *index, err error) { return idx, nil } -func (idx *index) WriteEntry(entry entry) (err error) { +func (idx *index) WriteEntry(entry Entry) (err error) { b := new(bytes.Buffer) relEntry := newRelEntry(entry, idx.baseOffset) if err = binary.Write(b, binary.BigEndian, relEntry); err != nil { @@ -105,7 +107,7 @@ func (idx *index) WriteEntry(entry entry) (err error) { return nil } -func (idx *index) ReadEntry(e *entry, offset int64) error { +func (idx *index) ReadEntry(e *Entry, offset int64) error { p := make([]byte, entryWidth) copy(p, idx.mmap[offset:offset+entryWidth]) b := bytes.NewReader(p) @@ -119,7 +121,7 @@ func (idx *index) ReadEntry(e *entry, offset int64) error { } func (idx *index) ReadAt(p []byte, offset int64) (n int, err error) { - n = copy(idx.mmap[idx.offset:idx.offset+entryWidth], []byte("hellohellomanman")) + n = copy(idx.mmap[idx.offset:idx.offset+entryWidth], p) return n, nil } diff --git a/commitlog/index_test.go b/commitlog/index_test.go index 29d07df1..cb2da721 100644 --- a/commitlog/index_test.go +++ b/commitlog/index_test.go @@ -23,9 +23,9 @@ func TestIndex(t *testing.T) { if err != nil { t.Fatal(err) } - entries := []entry{} + entries := []Entry{} for i := 0; i < 3; i++ { - entries = append(entries, entry{ + entries = append(entries, Entry{ int64(i), int64(i * 5), }) @@ -38,7 +38,7 @@ func TestIndex(t *testing.T) { if err = idx.Sync(); err != nil { t.Fatal(err) } - act := &entry{} + act := &Entry{} for i, exp := range entries { if err = idx.ReadEntry(act, int64(i*entryWidth)); err != nil { t.Fatal(err) diff --git a/commitlog/reader.go b/commitlog/reader.go index 7dc3f5d7..7b3727e4 100644 --- a/commitlog/reader.go +++ b/commitlog/reader.go @@ -8,8 +8,8 @@ import ( ) type Reader struct { - segment *segment - segments []*segment + segment *Segment + segments []*Segment idx int mu sync.Mutex offset int64 diff --git a/commitlog/segment.go b/commitlog/segment.go index 5afaa8ec..5b2d7b09 100644 --- a/commitlog/segment.go +++ b/commitlog/segment.go @@ -15,18 +15,18 @@ const ( indexNameFormat = "%020d.index" ) -type segment struct { +type Segment struct { writer io.Writer reader io.Reader log *os.File - index *index - baseOffset int64 - nextOffset int64 - bytes int64 + Index *index + BaseOffset int64 + NextOffset int64 + Position int64 maxBytes int64 } -func NewSegment(path string, baseOffset int64, maxBytes int64) (*segment, error) { +func NewSegment(path string, baseOffset int64, maxBytes int64) (*Segment, error) { logPath := filepath.Join(path, fmt.Sprintf(logNameFormat, baseOffset)) log, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) if err != nil { @@ -47,72 +47,53 @@ func NewSegment(path string, baseOffset int64, maxBytes int64) (*segment, error) return nil, err } - s := &segment{ + s := &Segment{ log: log, - index: index, + Index: index, writer: log, reader: log, - bytes: fi.Size(), + Position: fi.Size(), maxBytes: maxBytes, - baseOffset: baseOffset, - nextOffset: baseOffset, + BaseOffset: baseOffset, + NextOffset: baseOffset, } return s, nil } -func (s *segment) NextOffset() int64 { - return s.nextOffset -} - -func (s *segment) IsFull() bool { - return s.bytes >= s.maxBytes +func (s *Segment) IsFull() bool { + return s.Position >= s.maxBytes } -func (s *segment) Write(p []byte) (n int, err error) { +func (s *Segment) Write(p []byte) (n int, err error) { n, err = s.writer.Write(p) if err != nil { return n, errors.Wrap(err, "log write failed") } - - _, err = s.index.Write([]byte(fmt.Sprintf("%d,%d\n", s.nextOffset, s.bytes))) - if err != nil { - return 0, errors.Wrap(err, "index write failed") - } - - err = s.index.WriteEntry(entry{ - Offset: s.nextOffset, - Position: s.bytes, - }) - if err != nil { - return 0, err - } - - s.nextOffset += 1 - s.bytes += int64(n) - + s.NextOffset++ + s.Position += int64(n) return n, nil } -func (s *segment) Read(p []byte) (n int, err error) { +func (s *Segment) Read(p []byte) (n int, err error) { return s.reader.Read(p) } -func (s *segment) ReadAt(p []byte, off int64) (n int, err error) { +func (s *Segment) ReadAt(p []byte, off int64) (n int, err error) { return s.log.ReadAt(p, off) } -func (s *segment) Close() error { +func (s *Segment) Close() error { if err := s.log.Close(); err != nil { return err } - return s.index.Close() + return s.Index.Close() } -func (s *segment) findEntry(offset int64) (e *entry, err error) { - e = &entry{} - idx := sort.Search(int(s.index.bytes/entryWidth), func(i int) bool { - _ = s.index.ReadEntry(e, int64(i*entryWidth)) +func (s *Segment) findEntry(offset int64) (e *Entry, err error) { + e = &Entry{} + idx := sort.Search(int(s.Index.bytes/entryWidth), func(i int) bool { + _ = s.Index.ReadEntry(e, int64(i*entryWidth)) return int64(e.Offset) > offset || e.Offset == 0 }) if idx == -1 { diff --git a/commitlog/util.go b/commitlog/util.go index aa8e2822..268e8f3d 100644 --- a/commitlog/util.go +++ b/commitlog/util.go @@ -2,9 +2,9 @@ package commitlog import "sort" -func findSegment(segments []*segment, offset int64) (*segment, int) { +func findSegment(segments []*Segment, offset int64) (*Segment, int) { idx := sort.Search(len(segments), func(i int) bool { - return segments[i].baseOffset > offset + return segments[i].BaseOffset > offset }) - 1 if idx < 0 { diff --git a/main.go b/main.go index 57368a28..4e10a783 100644 --- a/main.go +++ b/main.go @@ -24,6 +24,7 @@ func main() { store := store.New(store.Options{ DataDir: *raftDir, BindAddr: *raftAddr, + LogDir: *logDir, }) if err := store.Open(); err != nil { fmt.Fprintf(os.Stderr, "Error opening raft store: %s\n", err) diff --git a/server/server.go b/server/server.go index 7a61433a..819a46a1 100644 --- a/server/server.go +++ b/server/server.go @@ -5,13 +5,12 @@ import ( "log" "net" "net/http" - "net/url" "os" "github.com/gorilla/handlers" "github.com/gorilla/mux" "github.com/pkg/errors" - "github.com/travisjeffery/jocko/cluster" + "github.com/travisjeffery/jocko/commitlog" "github.com/travisjeffery/jocko/store" ) @@ -48,21 +47,16 @@ type Server struct { addr string ln net.Listener - logger *log.Logger - store *store.Store - controller *cluster.Controller + logger *log.Logger + store *store.Store } func New(addr string, store *store.Store) *Server { - controller := &cluster.Controller{ - Store: store, - } logger := log.New(os.Stderr, "", log.LstdFlags) return &Server{ - addr: addr, - store: store, - logger: logger, - controller: controller, + addr: addr, + store: store, + logger: logger, } } @@ -196,12 +190,12 @@ type TopicRequest struct { func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request) { var topic TopicRequest if err := json.NewDecoder(r.Body).Decode(&topic); err != nil { - s.logger.Print(errors.Wrap(err, "json decode failed")) - w.WriteHeader(http.StatusInternalServerError) + s.logger.Printf("[ERR] jocko: Failed to decode json; %v", errors.Wrap(err, "json decode failed")) + w.WriteHeader(http.StatusBadRequest) return } if s.store.IsController() { - err := s.controller.CreateTopic(topic.Topic, topic.Partitions) + err := s.store.CreateTopic(topic.Topic, topic.Partitions) if err != nil { s.logger.Printf("[ERR] jocko: Failed to create topic %s: %v", topic.Topic, err) w.WriteHeader(http.StatusInternalServerError) @@ -213,8 +207,34 @@ func (s *Server) handleTopic(w http.ResponseWriter, r *http.Request) { } } -func (s *Server) handleProduce(w http.ResponseWriter, r *http.Request) { +type ProduceRequest struct { + RequiredAcks int `json:"required_acks"` + Timeout int `json:"timeout"` + Partition int `json:"partition"` + Topic string `json:"topic"` + MessageSet commitlog.MessageSet `json:"message_set"` +} +func (s *Server) handleProduce(w http.ResponseWriter, r *http.Request) { + var produce ProduceRequest + if err := json.NewDecoder(r.Body).Decode(&produce); err != nil { + s.logger.Printf("[ERR] jocko: Failed to decode json; %v", errors.Wrap(err, "json decode failed")) + w.WriteHeader(http.StatusBadRequest) + return + } + partition, err := s.store.Partition(produce.Topic, produce.Partition) + if err != nil { + s.logger.Printf("[ERR] jocko: Failed to find partition; %v", err) + w.WriteHeader(http.StatusBadRequest) + return + } + err = partition.CommitLog.Append(produce.MessageSet) + if err != nil { + s.logger.Printf("[ERR] jocko: Failed to append messages; %v", err) + w.WriteHeader(http.StatusInternalServerError) + return + } + w.WriteHeader(http.StatusOK) } // Addr returns the address on which the Server is listening diff --git a/store/store.go b/store/store.go index a5107303..fb13343d 100644 --- a/store/store.go +++ b/store/store.go @@ -12,6 +12,7 @@ import ( "github.com/hashicorp/raft" raftboltdb "github.com/hashicorp/raft-boltdb" "github.com/pkg/errors" + "github.com/travisjeffery/jocko/cluster" ) const ( @@ -43,19 +44,10 @@ func newCommand(cmd CmdType, data interface{}) (c command, err error) { }, nil } -type TopicPartition struct { - Topic string `json:"topic"` - Partition int `json:"partition"` - - // broker ids - Replicas []string `json:"replicas"` - Leader string `json:"leader"` - PreferredLeader string `json:"preferred_leader"` -} - type Options struct { DataDir string BindAddr string + LogDir string numPartitions int transport raft.Transport @@ -66,8 +58,8 @@ type Store struct { mu sync.Mutex - partitions []*TopicPartition - topics map[string][]*TopicPartition + partitions []*cluster.TopicPartition + topics map[string][]*cluster.TopicPartition peerStore raft.PeerStore transport raft.Transport @@ -78,7 +70,7 @@ type Store struct { func New(options Options) *Store { return &Store{ - topics: make(map[string][]*TopicPartition), + topics: make(map[string][]*cluster.TopicPartition), Options: options, } } @@ -141,14 +133,27 @@ func (s *Store) Brokers() ([]string, error) { return s.peerStore.Peers() } -func (s *Store) Partitions() ([]*TopicPartition, error) { +func (s *Store) Partitions() ([]*cluster.TopicPartition, error) { return s.partitions, nil } -func (s *Store) PartitionsForTopic(topic string) (found []*TopicPartition, err error) { +func (s *Store) PartitionsForTopic(topic string) (found []*cluster.TopicPartition, err error) { return s.topics[topic], nil } +func (s *Store) Partition(topic string, partition int) (*cluster.TopicPartition, error) { + found, err := s.PartitionsForTopic(topic) + if err != nil { + return nil, err + } + for _, f := range found { + if f.Partition == partition { + return f, nil + } + } + return nil, errors.New("partition not found") +} + func (s *Store) NumPartitions() (int, error) { // TODO: need to get to get from store if s.numPartitions == 0 { @@ -159,7 +164,7 @@ func (s *Store) NumPartitions() (int, error) { } -func (s *Store) AddPartition(partition TopicPartition) error { +func (s *Store) AddPartition(partition cluster.TopicPartition) error { return s.apply(addPartition, partition) } @@ -176,23 +181,29 @@ func (s *Store) apply(cmdType CmdType, data interface{}) error { return f.Error() } -func (s *Store) addPartition(partition TopicPartition) { +func (s *Store) addPartition(partition *cluster.TopicPartition) { s.mu.Lock() defer s.mu.Unlock() - s.partitions = append(s.partitions, &partition) + s.partitions = append(s.partitions, partition) if v, ok := s.topics[partition.Topic]; ok { - s.topics[partition.Topic] = append(v, &partition) + s.topics[partition.Topic] = append(v, partition) } else { - s.topics[partition.Topic] = []*TopicPartition{&partition} + s.topics[partition.Topic] = []*cluster.TopicPartition{partition} + } + if s.IsLeaderOfPartition(partition) { + // need to open log here + if err := partition.OpenCommitLog(s.LogDir); err != nil { + // log or panic + } } } -func (s *Store) IsLeaderOfPartition(partition TopicPartition) bool { +func (s *Store) IsLeaderOfPartition(partition *cluster.TopicPartition) bool { // TODO: switch this to a map for perf s.mu.Lock() defer s.mu.Unlock() - for _, p := range s.partitions { - if p.Topic == partition.Topic && p.Partition == partition.Partition { + for _, p := range s.topics[partition.Topic] { + if p.Partition == partition.Partition { if partition.Leader == s.BrokerID() { return true } @@ -222,12 +233,12 @@ func (s *Store) Apply(l *raft.Log) interface{} { } switch c.Cmd { case addPartition: - var p TopicPartition + p := new(cluster.TopicPartition) b, err := c.Data.MarshalJSON() if err != nil { panic(errors.Wrap(err, "json unmarshal failed")) } - if err := json.Unmarshal(b, &p); err != nil { + if err := json.Unmarshal(b, p); err != nil { panic(errors.Wrap(err, "json unmarshal failed")) } s.addPartition(p) @@ -236,6 +247,40 @@ func (s *Store) Apply(l *raft.Log) interface{} { return nil } +// CreateTopic creates topic with partitions count. +func (s *Store) CreateTopic(topic string, partitions int) error { + for _, t := range s.Topics() { + if t == topic { + return errors.New("topic exists already") + } + } + numPartitions, err := s.NumPartitions() + if err != nil { + return err + } + if partitions != 0 { + numPartitions = partitions + } + brokers, err := s.Brokers() + if err != nil { + return err + } + for i := 0; i < numPartitions; i++ { + broker := brokers[i%len(brokers)] + partition := cluster.TopicPartition{ + Partition: i, + Topic: topic, + Leader: broker, + PreferredLeader: broker, + Replicas: []string{broker}, + } + if err := s.AddPartition(partition); err != nil { + return err + } + } + return nil +} + func (s *Store) Restore(rc io.ReadCloser) error { return nil }