diff --git a/diskwriter.go b/diskwriter.go index a465615c..e2d034c7 100644 --- a/diskwriter.go +++ b/diskwriter.go @@ -55,6 +55,7 @@ func NewDiskWriter(ctx context.Context, dest string, opt DiskWriterOpt) (*DiskWr eg: eg, ctx: ctx, cancel: cancel, + filter: opt.Filter, }, nil } diff --git a/receive.go b/receive.go index 233c28b7..8176f3e5 100644 --- a/receive.go +++ b/receive.go @@ -106,7 +106,12 @@ func (r *receiver) run(ctx context.Context) error { w := newDynamicWalker() - g.Go(func() error { + g.Go(func() (retErr error) { + defer func() { + if retErr != nil { + r.conn.SendMsg(&Packet{Type: PACKET_ERR, Data: []byte(retErr.Error())}) + } + }() destWalker := emptyWalker if !r.merge { destWalker = GetWalkerFn(r.dest) diff --git a/send.go b/send.go index c7b6382c..d721d1e2 100644 --- a/send.go +++ b/send.go @@ -90,6 +90,8 @@ func (s *sender) run(ctx context.Context) error { return err } switch p.Type { + case PACKET_ERR: + return errors.Errorf("error from receiver: %s", p.Data) case PACKET_REQ: if err := s.queue(p.ID); err != nil { return err diff --git a/wire.pb.go b/wire.pb.go index 263fb126..9d334bbd 100644 --- a/wire.pb.go +++ b/wire.pb.go @@ -29,6 +29,7 @@ const ( PACKET_REQ Packet_PacketType = 1 PACKET_DATA Packet_PacketType = 2 PACKET_FIN Packet_PacketType = 3 + PACKET_ERR Packet_PacketType = 4 ) var Packet_PacketType_name = map[int32]string{ @@ -36,12 +37,14 @@ var Packet_PacketType_name = map[int32]string{ 1: "PACKET_REQ", 2: "PACKET_DATA", 3: "PACKET_FIN", + 4: "PACKET_ERR", } var Packet_PacketType_value = map[string]int32{ "PACKET_STAT": 0, "PACKET_REQ": 1, "PACKET_DATA": 2, "PACKET_FIN": 3, + "PACKET_ERR": 4, } func (Packet_PacketType) EnumDescriptor() ([]byte, []int) { return fileDescriptorWire, []int{0, 0} } @@ -543,21 +546,22 @@ var ( func init() { proto.RegisterFile("wire.proto", fileDescriptorWire) } var fileDescriptorWire = []byte{ - // 253 bytes of a gzipped FileDescriptorProto + // 259 bytes of a gzipped FileDescriptorProto 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0x2a, 0xcf, 0x2c, 0x4a, 0xd5, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x4b, 0x2b, 0x2e, 0x2d, 0xc9, 0xcc, 0x91, 0xe2, - 0x2a, 0x2e, 0x49, 0x2c, 0x81, 0x88, 0x29, 0x9d, 0x65, 0xe4, 0x62, 0x0b, 0x48, 0x4c, 0xce, 0x4e, + 0x2a, 0x2e, 0x49, 0x2c, 0x81, 0x88, 0x29, 0xdd, 0x65, 0xe4, 0x62, 0x0b, 0x48, 0x4c, 0xce, 0x4e, 0x2d, 0x11, 0xd2, 0xe5, 0x62, 0x29, 0xa9, 0x2c, 0x48, 0x95, 0x60, 0x54, 0x60, 0xd4, 0xe0, 0x33, 0x92, 0xd4, 0x83, 0xa8, 0xd6, 0x83, 0xc8, 0x42, 0xa9, 0x90, 0xca, 0x82, 0xd4, 0x20, 0xb0, 0x32, 0x21, 0x05, 0x2e, 0x16, 0x90, 0x39, 0x12, 0x4c, 0x0a, 0x8c, 0x1a, 0xdc, 0x46, 0x3c, 0x30, 0xe5, 0xc1, 0x25, 0x89, 0x25, 0x41, 0x60, 0x19, 0x21, 0x3e, 0x2e, 0x26, 0x4f, 0x17, 0x09, 0x66, 0x05, 0x46, 0x0d, 0xde, 0x20, 0x26, 0x4f, 0x17, 0x21, 0x21, 0x2e, 0x96, 0x94, 0xc4, 0x92, 0x44, 0x09, - 0x16, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x30, 0x5b, 0xc9, 0x8f, 0x8b, 0x0b, 0x61, 0xb2, 0x10, 0x3f, + 0x16, 0x05, 0x46, 0x0d, 0x9e, 0x20, 0x30, 0x5b, 0x29, 0x8e, 0x8b, 0x0b, 0x61, 0xb2, 0x10, 0x3f, 0x17, 0x77, 0x80, 0xa3, 0xb3, 0xb7, 0x6b, 0x48, 0x7c, 0x70, 0x88, 0x63, 0x88, 0x00, 0x83, 0x10, 0x1f, 0x17, 0x17, 0x54, 0x20, 0xc8, 0x35, 0x50, 0x80, 0x11, 0x49, 0x81, 0x8b, 0x63, 0x88, 0xa3, - 0x00, 0x13, 0x92, 0x02, 0x37, 0x4f, 0x3f, 0x01, 0x66, 0x27, 0x9d, 0x0b, 0x0f, 0xe5, 0x18, 0x6e, - 0x3c, 0x94, 0x63, 0xf8, 0xf0, 0x50, 0x8e, 0xb1, 0xe1, 0x91, 0x1c, 0xe3, 0x8a, 0x47, 0x72, 0x8c, - 0x27, 0x1e, 0xc9, 0x31, 0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x8b, 0x47, 0x72, - 0x0c, 0x1f, 0x1e, 0xc9, 0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x90, 0xc4, 0x06, 0x0e, 0x04, 0x63, 0x40, - 0x00, 0x00, 0x00, 0xff, 0xff, 0xda, 0x30, 0x43, 0x22, 0x26, 0x01, 0x00, 0x00, + 0x00, 0x13, 0x92, 0x02, 0x37, 0x4f, 0x3f, 0x01, 0x66, 0x24, 0xbe, 0x6b, 0x50, 0x90, 0x00, 0x8b, + 0x93, 0xce, 0x85, 0x87, 0x72, 0x0c, 0x37, 0x1e, 0xca, 0x31, 0x7c, 0x78, 0x28, 0xc7, 0xd8, 0xf0, + 0x48, 0x8e, 0x71, 0xc5, 0x23, 0x39, 0xc6, 0x13, 0x8f, 0xe4, 0x18, 0x2f, 0x3c, 0x92, 0x63, 0x7c, + 0xf0, 0x48, 0x8e, 0xf1, 0xc5, 0x23, 0x39, 0x86, 0x0f, 0x8f, 0xe4, 0x18, 0x27, 0x3c, 0x96, 0x63, + 0x48, 0x62, 0x03, 0x07, 0x8a, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0x8b, 0xce, 0x55, 0x3b, 0x36, + 0x01, 0x00, 0x00, } diff --git a/wire.proto b/wire.proto index d1943209..f9b33f31 100644 --- a/wire.proto +++ b/wire.proto @@ -10,6 +10,7 @@ message Packet { PACKET_REQ = 1; PACKET_DATA = 2; PACKET_FIN = 3; + PACKET_ERR = 4; } PacketType type = 1; Stat stat = 2;