Skip to content

mpsc Channel and custom retry logic make for never-closing channel #775

@xrl

Description

@xrl

I am writing a AWS Kinesis client using the Rusoto AWS SDK and I have hit an issue where I cannot retry requests without cloning the futures::sync::mpsc::channel Sender and still have the right kind of drop semantics for closing the Receiver.

The code spawns a thread to handle receiving records, it batches them up, and will create inflight number of concurrent requests. This is great for throughput as Kinesis has relatively high latency (50ms) and small request size (500 records). These requests can fail for a number of reasons: the TCP connection sees an error, the kinesis stream is under-provisioned for the current message rate (bursts, etc), Kinesis has errors, etc. I need to simply retry requests when they fail.

My first stab at retrying requests has me cloning the Sender and moving that handle in to the Receiver thread. Now the issue is that when the original Sender is dropped the receiver does not automatically close -- my Receiver thread is now open forever and I am leaking threads in my code.

So my issue, is there a way to create a Sender which does NOT count for the life of the channel? Or is there a way to close() a Sender which includes closing all the other Senders? Either one of these solutions may yield data loss for requests which need to be retried. But perhaps it would be useful? Or, perhaps there is a better way to structure this retry logic? Could I do it in the AndThen to keep that initial request going?

Here is code which spawns the Receiver thread and attempts retries:

pub fn kinesis_tx(stream_name: String, chan_buf: usize, inflight: usize) -> RecordsChannel {
    info!("CREATING kinesis channel");
    use futures::sync::mpsc::channel;
    use futures::{Future, Sink, Stream};

    let (tx, rx) = channel(chan_buf);
    let client = Arc::new(KinesisClient::simple(Region::UsWest2));

    let mut retry_tx = tx.clone().wait();
    std::thread::spawn(move || {
        info!("STARTING kinesis batch put thread");
        let puts = rx.chunks(500)
            .map(|batch: Vec<PutRecordsRequestEntry>| {
                let input = PutRecordsInput {
                    records: batch,
                    stream_name: stream_name.clone(),
                };
                let chain = client.put_records(&input).then(|put_res| {
                    let retval: Result<
                        Result<PutRecordsOutput, (PutRecordsError, PutRecordsInput)>,
                        (),
                    > = match put_res {
                        Ok(res) => {
                            trace!("match put_res: it worked");
                            Ok(Ok(res))
                        }
                        Err(err) => {
                            trace!("match put_res: failed");
                            Ok(Err((err, input)))
                        }
                    };
                    retval
                });
                chain
            })
            .buffer_unordered(inflight);

        for put_res in puts.wait() {
            match put_res {
                Ok(Ok(put)) => {
                    if let Some(failed) = put.failed_record_count {
                        if failed > 0 {
                            error!("{} record(s) failed to commit to kinesis", failed);
                            let put: PutRecordsOutput = put;
                            for rec in put.records {
                                if rec.error_code.is_some() {
                                    error!("failed record: {:?}", rec);
                                }
                            }
                        }
                    }
                }
                Ok(Err((put_records_err, put_records_input))) => match put_records_err {
                    PutRecordsError::HttpDispatch(dispatch_err) => {
                        error!(
                            "http dispatch error: {:?}. retrying records...",
                            dispatch_err
                        );
                        for record in put_records_input.records {
                            match retry_tx.send(record) {
                                Ok(()) => debug!("Wait#send succeeded"),
                                Err(e) => error!("Wait#send error {:?}", e),
                            }
                        }
                    }
                    PutRecordsError::Unknown(raw_message) => {
                        error!("unknown error: '{:?}', retrying records...", raw_message);
                        for record in put_records_input.records {
                            match retry_tx.send(record) {
                                Ok(()) => debug!("Wait#send succeeded"),
                                Err(e) => error!("Wait#send error {:?}", e),
                            }
                        }
                    }
                    other => {
                        error!("unhandled kinesis error: {:?}", other);
                    }
                },
                other => {
                    error!("puts.wait() fallthrough: {:?}", other);
                }
            }
        }
        info!("STOPPING kinesis batch put thread");
    });

    return tx;
}

Metadata

Metadata

Assignees

No one assigned

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions