Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Multiple threads writing to the same Pipe #33

Closed
francisminu opened this issue Apr 1, 2019 · 10 comments
Closed

Multiple threads writing to the same Pipe #33

francisminu opened this issue Apr 1, 2019 · 10 comments

Comments

@francisminu
Copy link

francisminu commented Apr 1, 2019

Hi Marc,

I just started using Pipes and it works great.
I also make use of dotnetty for the connection part.
Basic implementation is:

  1. Dotnetty makes the connection, gets the data and I write the data directly to Pipe.
  2. Dotnetty reads 1024 bytes at a time and if the message is huge, it makes a second read and continues until the stream is empty.
  3. If I have 2 clients connected, then, if there is a message that came in at the same time, then I am seeing issues at present.
    Message format:
    This is what is happening:
    At 10.03.30.000: (size of message: lets say 4000)
    Dotnetty reads at first :
    This is directly put into Pipe
    Dotnetty reads second bunch:
    This is directly put into pipe
    Now Pipe has
    Now, there is one more part of message1 that is to be read by Dotnetty -
    Now, at the exact same time, with just change in Microseconds, another message comes in which is smaller in size .
    Dotnetty connects to client 2 and gets the message

    This is put into Pipe
    So, Pipe now looks like below:

    Now, the custom filter condition that I have is, check if the data in pipe has STX and ETX... At this point in time, the condition is met, as it sees an STX and an ETX. But, as far as the messages are concerned, they are 2 different messages.

This happened because the clients sent the message at the exact same time and there is only one Pipe object all through the application.
Could you please suggest a solution for the above?
-Thanks in advance!

@mgravell
Copy link
Owner

mgravell commented Apr 2, 2019

can I ask some clarifying questions? it sounds like you have:

  • multiple writers
  • each writer might sometimes write multiple frames to represent a single payload (with some kind of complete/incomplete marker)
  • (the actual problem) currently, frames from different writers are becoming interleaved, making it hard to know which logical stream a second message is related to

is that right?

if so, there are two options:

  1. make sure each frame has some kind of sequence identifier in it, so that if the consume sees [1, incomplete] [1, incomplete] [2, complete] [1, complete] it knows that all the "1s" go together, and that the "2" is by itself (or however it works out)
  2. use a locking primitive such that you retain a logical lock over the stream for the duration of a multi-frame write, so that you write [1, incomplete] [1, incomplete] [1, complete] [2, complete]

The first obviously requires support in your framing protocol, so may not be possible if your frame semantics are baked and don't support it. The second option could use something like the MutexSlim that is in the library.

Obviously also: 1 puts most of the work on the consumer to parse frame streams correctly and dispatch them appropriately; 2 puts most of the work on the producer to write messages as contiguous frame sequences. 2 is simpler, in every way - but has the disadvantage that large multi-frame messages may "block" other writers (quotes because "block" here doesn't necessarily mean "block a thread" - it could block on an async-acquire of a mutex).

Note that if you have multiple writers, you should already be using a locking primitive anyway, since you can only have a single pipe writer. So in many ways, the second option just extends the logical scope of that synchronization to be "entire message sequence" rather than "single frame".

If I've misunderstood the question, please let me know!

@francisminu
Copy link
Author

francisminu commented Apr 2, 2019

Thank you so much for your prompt response. I am still in a hustle here.
If possible, could you please take a look at the below:
This is what I have now. I make use of the same PipeWriter instance all the time.

public class SocketPipe: ISocketPipe
{
        private readonly ILoggerService _loggerService;
        
        public SocketPipe(ILoggerService loggerService)
        {
            _loggerService = loggerService;
            if(_loggerService == null)
            {
                throw new ArgumentNullException($"ILoggerService object is required.");
            }
            Pipe = new Pipe();
        }

        private Pipe Pipe { get; set; }

        public Pipe GetPipe()
        {
            return Pipe ?? (Pipe = new Pipe());
        }

        public PipeReader GetPipeReader()
        {
            if(Pipe == null)
            {
                Pipe = new Pipe();
            }
            return Pipe.Reader;
        }

        public PipeWriter GetPipeWriter()
        {
            if (Pipe == null)
            {
                Pipe = new Pipe();
            }
            return Pipe.Writer;
        }

        public async Task WriteToPipe(object msg)
        {
            var data = (byte[])msg;
            var pipeWriter = GetPipeWriter();
            try
            {
                
                Memory<byte> memory = pipeWriter.GetMemory(data.Length);
                Memory<byte> dataMemory = new Memory<byte>(data);
                dataMemory.CopyTo(memory);

                // Tell the PipeWriter how much was read
                pipeWriter.Advance(data.Length);
            }
            catch(Exception ex)
            {
                _loggerService.Error(ex, new LogMessage { MessageText = "Exception occurred in Socket Pipes." });
            }
            // Make the data available to the PipeReader
            FlushResult result = await pipeWriter.FlushAsync();
            if (result.IsCompleted)
            {
                pipeWriter.Complete();
            }
        }

        public Task<string> ReadFromPipe()
        {
            var reader = GetPipeReader();
            var pipeHasData = reader.TryRead(out ReadResult inputMessage);
            if (!pipeHasData || inputMessage.Buffer.IsEmpty)
                return Task.FromResult("");
            var inboundMessage = "";
            ReadOnlySequence<byte> buffer = inputMessage.Buffer;

            var stxPosition = buffer.PositionOf(Convert.ToByte(0x02));
            var etxPosition = buffer.PositionOf(Convert.ToByte(0x03));

            if (stxPosition == null && etxPosition == null)
            {
                reader.AdvanceTo(buffer.End);
            }
            else
            {
                var labelNames = new[] { "Server" };
                var labelValues = new[] { Environment.MachineName };
                var discardedMessagesCountGauge = Metrics.CreateGauge("discarded_messages_count", "Number of messages discarded while being read from Pipe", labelNames);
                if (stxPosition != null & etxPosition != null)
                {
                    if (CheckMessageFrameSequence(stxPosition, etxPosition))
                    {
                        _loggerService.Info(new LogMessage
                        {
                            ClassName = "SocketPipe",
                            MethodName = "ReadFromPipe",
                            MessageText = "Valid message obtained."
                        });
                        var nextPosition = buffer.GetPosition(1, etxPosition.Value);
                        buffer = buffer.Slice(buffer.GetPosition(1, stxPosition.Value), etxPosition.Value);
                        if (buffer.IsSingleSegment)
                        {
                            inboundMessage = Encoding.ASCII.GetString(buffer.First.Span);
                        }
                        else
                        {
                            foreach (var segment in buffer)
                            {
                                inboundMessage = inboundMessage + Encoding.ASCII.GetString(segment.Span);
                            }
                        }
                        reader.AdvanceTo(nextPosition);
                    }
                    else
                    {
                        
                        discardedMessagesCountGauge.WithLabels(labelValues).Inc();
                        if (discardedMessagesCountGauge.Value > 10000)
                        {
                            discardedMessagesCountGauge.WithLabels(labelValues).Set(0);
                        }

                        // For logging discarded message
                        
                        buffer = buffer.Slice(buffer.Start, stxPosition.Value);
                        var discardedMessage = GetDiscardedMessage(buffer);
                        _loggerService.Warning(new LogMessage
                        {
                            ClassName = "SocketPipe",
                            MethodName = "ReadFromPipe",
                            MessageText = $"Discarded message: Current data in Pipe has ETX before the first occurrence of STX. Data till STX is discarded. Discarded data: {discardedMessage}"
                        });
                        reader.AdvanceTo(stxPosition.Value);
                    }
                }
                else if (stxPosition == null)
                {
                    discardedMessagesCountGauge.WithLabels(labelValues).Inc();
                    if (discardedMessagesCountGauge.Value > 10000)
                    {
                        discardedMessagesCountGauge.WithLabels(labelValues).Set(0);
                    }
                    // For logging discarded message
                    var discardedMessage = GetDiscardedMessage(buffer);

                    _loggerService.Warning(new LogMessage
                    {
                        ClassName = "SocketPipe",
                        MethodName = "ReadFromPipe",
                        MessageText = $"Discarded message: Buffer has only ETX and no corresponding STX. Removing entire data from Pipe. Discarded data: {discardedMessage}"
                    });
                    reader.AdvanceTo(buffer.End);
                }
                else
                {
                    _loggerService.Warning(new LogMessage
                    {
                        MessageText = "Only STX found in buffer. Acceptable message not received. Hence not reading from the buffer."
                    });
                    reader.AdvanceTo(stxPosition.Value, buffer.End);
                }
            }

            if (inputMessage.IsCompleted)
            {
                reader.Complete();
            }
            return Task.FromResult(inboundMessage);
        }

        #region PrivateMethods

        private bool CheckMessageFrameSequence(SequencePosition? stxPos, SequencePosition? etxPos)
        {
            if (stxPos != null && etxPos != null)
                return stxPos.Value.GetInteger() < etxPos.Value.GetInteger();
            _loggerService.Warning(new LogMessage
            {
                ClassName = "SocketPipe",
                MethodName = "CheckMessageFrameSequence",
                MessageText = $"Null value for stxPos/etxPos. stxPos: {stxPos} etxPos: {etxPos}"
            });
            return false;
        }

        private string GetDiscardedMessage(ReadOnlySequence<byte> buffer)
        {
            var discardedMessage = "";
            if (buffer.IsSingleSegment)
            {
                discardedMessage = Encoding.ASCII.GetString(buffer.First.Span);
            }
            else
            {
                foreach (var segment in buffer)
                {
                    discardedMessage = discardedMessage + Encoding.ASCII.GetString(segment.Span);
                }
            }
            return discardedMessage;
        }


        #endregion

    }

The caller function:

private async Task<string> ProcessData(object msg)
        {
            try
            {
                await _pipe.WriteToPipe(msg);

                var dataReadFromPipe = await _pipe.ReadFromPipe();
                return dataReadFromPipe;
            }
            catch(Exception ex)
            {
                _loggerService.Error(ex, new LogMessage { MessageText = "Error occured in ProcessData" });
                return "";
            }
        }

And the Processdate gets called whenever dotnetty receives data.
Could you please take a look to see if the implementation is correct at all? I just started using the Pipe feature and this is how I came about the implementation. Please let me know if this is not the correct implementation.

Thanks,
Minu

(edit by Marc: code formatting)

@mgravell
Copy link
Owner

mgravell commented Apr 2, 2019

right; looking at public async Task WriteToPipe(object msg) first; I'm guessing that whatever framing protocol you're using is already built into msg at this point? i.e. it already has some kind of delimiter to allow individual messages to be deframed?

It isn't clear to me whether how you are handling concurrency; the original question seemed to be talking about multiple writers, but this code doesn't protect against that at all. So: can you clarify what the expected behaviour is here? Note: pipelines itself provides zero protection against concurrent writes, so if you need to support multiple writers, you'll need to add that yourself. This would be easy for me to add with MutexSlim, so please let me know what you intend to happen here - I may be able to help.

You then do this:

                Memory<byte> memory = pipeWriter.GetMemory(data.Length);
                Memory<byte> dataMemory = new Memory<byte>(data);
                dataMemory.CopyTo(memory);

                // Tell the PipeWriter how much was read
                pipeWriter.Advance(data.Length);
...
            FlushResult result = await pipeWriter.FlushAsync();

The problem here is: sizes. GetMemory isn't guaranteed to support arbitrary sizes - the idea is that you ask for something, do what you can (exploiting what it gave you, which might be more than you asked for). Fortunately, the method to safely do everything here already exists. Everything above is simply:

var result = await pipeWriter.WriteAsync(data);

You then do this:

            if (result.IsCompleted)
            {
                pipeWriter.Complete();
            }

It isn't clear to me what you're doing here, but: calling Complete() on a PipeWriter is saying "I'm all done, no more data will be forthcoming". Since the original question suggested multiple frames, I suspect this is just... incorrect and should be removed.

next up: ReadFromPipe

@mgravell
Copy link
Owner

mgravell commented Apr 2, 2019

You start off by doing:

            var pipeHasData = reader.TryRead(out ReadResult inputMessage);
            if (!pipeHasData || inputMessage.Buffer.IsEmpty)
                return Task.FromResult("");

but... we expect there to not be a response yet (sockets being async), so... I can't see any point in even testing this - just lose it? At the moment, it looks like it will always incorrectly return an empty string? You almost always want to write a loop here, that keeps doing an async read until either it has the data it needs, or there isn't enough data and the incoming stream is terminated.

You also want to be very careful about how you consume data; for example:

            if (stxPosition == null && etxPosition == null)
            {
                reader.AdvanceTo(buffer.End);

that looks very bad to me; that appears to say "if we haven't found our frame delimiters, then consume the data we've peeked at", when what you really want to say is "if we haven't found our frame delimiters, consume nothing, but record that we've inspected everything"; my IDE is updating right now, but I believe that is something like:

reader.AdvanceTo(buffer.Start, buffer.End);

(i.e. the first parameter is "consumed" - nothing, and the second parameter is "inspected" - everything)

Again, inside a loop, this means that your next ReadAsync should get more data including the data you've already looked at once, hopefully now as a complete frame. Again, this would typically be in a loop that runs until you have dequeued a message.

The same issue of concurrency arises. In terms of timing, note that you want to acquire the read lock before you release the write lock, otherwise you get into a race condition where multiple writers can compete and get out-of-order.

Finally, we have:

            if (inputMessage.IsCompleted)
            {
                reader.Complete();
            }

which like above, looks incorrect. reader.Complete means "I'm done, I won't be reading any more", but you mention multiple frames, so this is probably incorrect.


Has that been helpful at all??

@francisminu
Copy link
Author

francisminu commented Apr 3, 2019

Thank you so much Marc. I will try to clean up the implementation as per the suggestions you have provided.
The reason why I have
reader.AdvanceTo(buffer.End);
is because:
Lets say pipe has the following at present: testdatatrial123
Now, there is no STX or ETX, now if a new data comes in (proper data), it will be STX test1232432 ETX and I do not need the data that is testdatatrial123 as I cannot have an STX before that data anyways.

Also, in ReadPipe, the reason I do not have a loop is because, ours is kind of synchronous implementation, The client connects and expects a response in the same connection. So, I need to return the data of a request in the same flow. It would be right to say, this is more like a synchronous implementation.

I have not worked with MutexSlim before, I will take a look.
Thank you once again for the inputs, they were really helpful.

Thanks,

@mgravell
Copy link
Owner

mgravell commented Apr 4, 2019

K; I'm going to close this down as I don't think it is really an "issues in this library" thing - happy to offer more guidance as needed though : stay in touch!

@mgravell mgravell closed this as completed Apr 4, 2019
@francisminu
Copy link
Author

@mgravell
Hi Marc,

I know this is not the place to ask questions, Please let me know if you want me to send this question via some other medium.

I was just going through the samples once again and now I am a bit more confused. The owner doesn't seem to respond, so I thought you might be the best person next.
As per the sample,
https://github.com/davidfowl/TcpEcho/blob/master/src/PipelinesServer/Program.cs

Does it mean that a Pipe instance is created for every client that is connected to the server?

At present, my implementation is such that I have a global instance of Pipe that is registered via Dependency Injection and I use the same instance. All the clients write to the same Pipe instance. Could you verify if that approach is correct?

Thanks,
Minu Francis

@mgravell
Copy link
Owner

mgravell commented Apr 12, 2019 via email

@francisminu
Copy link
Author

francisminu commented Apr 12, 2019

@mgravell Thank you for the response Marc.
This is what our requirement is:

  1. We have a TCP Server running
  2. There are several TCP Clients that connect to this server. They connect and then send a message along with the connection and then expects a response back (kind of a synchronous model).
  3. Once the response is sent, the client is disconnected.
  4. So in other words, every message from client connects to our server and once that message is processed and response is sent, it disconnects and this continues .
  5. Now, for the Socket implementation, we have used Dotnetty so that it takes care of the Connect and Disconnect part
  6. They have specific implementations called ChannelActive (when connected) and ChannelInactive (when disconnected). I can make use of these to create instance of Pipe each time a connection is made (This is what I was not sure about, if I should do so and this was my earlier qtn today).

So, at present what happens is ,
Client A -> Connects (Message size : 2048)
Dotnetty reads only 1024 at a time, so it reads 1024 and writes to the Pipe (which is a global instance).
At about the same time there is another Client b -> Connects (MessagesizeL 1024)
Dotnetty reads and writes to the same pipe.

Since it is the same global instance of Pipe that I am using, the Pipe now has data in the form:
PartOfMessage1_Message2_SecondPartOfMessage1
Which is now completely wrong as the messages have got mixed up now.

So, my doubt was if I should go for Pipe creation in each ChannelActive and do a PipeWriter.Complete in ChannelInactive or should I continue with the global pipe instance itself?

Now, if I can go ahead with the global Pipe instance, I went through something called Mutex, is that same as the MutexSlim that you had mentioned earlier?

Thanks,
Minu Francis

@mgravell
Copy link
Owner

mgravell commented Apr 12, 2019 via email

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants