Skip to content

Commit

Permalink
Rewrite of async StreamReader/StreamWritter operations to not fail on…
Browse files Browse the repository at this point in the history
… subsequent async call. Fixes #9761
  • Loading branch information
marek-safar committed Feb 1, 2013
1 parent 6347323 commit 185743c
Show file tree
Hide file tree
Showing 6 changed files with 269 additions and 60 deletions.
202 changes: 162 additions & 40 deletions mcs/class/corlib/System.IO/StreamReader.cs
Expand Up @@ -8,7 +8,7 @@
//
// (C) Ximian, Inc. http://www.ximian.com
// Copyright (C) 2004 Novell (http://www.novell.com)
// Copyright 2011 Xamarin Inc.
// Copyright 2011, 2013 Xamarin Inc.
//
// Permission is hereby granted, free of charge, to any person obtaining
// a copy of this software and associated documentation files (the
Expand Down Expand Up @@ -381,43 +381,48 @@ public void DiscardBufferedData ()
}

// the buffer is empty, fill it again
private int ReadBuffer ()
// Keep in sync with ReadBufferAsync
int ReadBuffer ()
{
pos = 0;
int cbEncoded = 0;

// keep looping until the decoder gives us some chars
decoded_count = 0;
int parse_start = 0;
do
{
cbEncoded = base_stream.Read (input_buffer, 0, buffer_size);

do {
var cbEncoded = base_stream.Read (input_buffer, 0, buffer_size);
if (cbEncoded <= 0)
return 0;

mayBlock = (cbEncoded < buffer_size);
if (do_checks > 0){
Encoding old = encoding;
parse_start = DoChecks (cbEncoded);
if (old != encoding){
int old_decoded_size = old.GetMaxCharCount (buffer_size) + 1;
int new_decoded_size = encoding.GetMaxCharCount (buffer_size) + 1;
if (old_decoded_size != new_decoded_size)
decoded_buffer = new char [new_decoded_size];
decoder = encoding.GetDecoder ();
}
do_checks = 0;
cbEncoded -= parse_start;
}

decoded_count += decoder.GetChars (input_buffer, parse_start, cbEncoded, decoded_buffer, 0);
parse_start = 0;
decoded_count = ReadBufferCore (cbEncoded);
} while (decoded_count == 0);

return decoded_count;
}

int ReadBufferCore (int cbEncoded)
{
int parse_start;

mayBlock = cbEncoded < buffer_size;
if (do_checks > 0){
Encoding old = encoding;
parse_start = DoChecks (cbEncoded);
if (old != encoding){
int old_decoded_size = old.GetMaxCharCount (buffer_size) + 1;
int new_decoded_size = encoding.GetMaxCharCount (buffer_size) + 1;
if (old_decoded_size != new_decoded_size)
decoded_buffer = new char [new_decoded_size];
decoder = encoding.GetDecoder ();
}
do_checks = 0;
cbEncoded -= parse_start;
} else {
parse_start = 0;
}

return decoder.GetChars (input_buffer, parse_start, cbEncoded, decoded_buffer, 0);
}

//
// Peek can block:
// http://connect.microsoft.com/VisualStudio/feedback/ViewFeedback.aspx?FeedbackID=96484
Expand Down Expand Up @@ -451,6 +456,7 @@ public override int Read ()
return decoded_buffer [pos++];
}

// Keep in sync with ReadAsync
public override int Read ([In, Out] char[] buffer, int index, int count)
{
if (buffer == null)
Expand All @@ -466,8 +472,7 @@ public override int Read ([In, Out] char[] buffer, int index, int count)
CheckState ();

int chars_read = 0;
while (count > 0)
{
while (count > 0) {
if (pos >= decoded_count && ReadBuffer () == 0)
return chars_read > 0 ? chars_read : 0;

Expand Down Expand Up @@ -512,6 +517,7 @@ int FindNextEOL ()
return -1;
}

// Keep in sync with ReadLineAsync
public override string ReadLine()
{
CheckState ();
Expand All @@ -523,7 +529,7 @@ public override string ReadLine()
int end = FindNextEOL ();
if (end < decoded_count && end >= begin)
return new string (decoded_buffer, begin, end - begin);
else if (end == -2)
if (end == -2)
return line_builder.ToString (0, line_builder.Length);

if (line_builder == null)
Expand Down Expand Up @@ -555,23 +561,23 @@ public override string ReadLine()
return sb.ToString (0, sb.Length);
}
return line_builder.ToString (0, line_builder.Length);
} else if (end == -2)
}

if (end == -2)
return line_builder.ToString (0, line_builder.Length);
}
}

// Keep in sync with ReadToEndAsync
public override string ReadToEnd()
{
CheckState ();

StringBuilder text = new StringBuilder ();

int size = decoded_buffer.Length;
char [] buffer = new char [size];
int len;

while ((len = Read (buffer, 0, size)) > 0)
text.Append (buffer, 0, len);
do {
text.Append (decoded_buffer, pos, decoded_count - pos);
} while (ReadBuffer () != 0);

return text.ToString ();
}
Expand All @@ -582,7 +588,7 @@ void CheckState ()
throw new ObjectDisposedException ("StreamReader", "Cannot read from a closed StreamReader");

#if NET_4_5
if (async_task != null && async_task.IsCompleted)
if (async_task != null && !async_task.IsCompleted)
throw new InvalidOperationException ();
#endif
}
Expand All @@ -607,19 +613,60 @@ public override int ReadBlock ([In, Out] char[] buffer, int index, int count)

public override Task<int> ReadAsync (char[] buffer, int index, int count)
{
if (buffer == null)
throw new ArgumentNullException ("buffer");
if (index < 0)
throw new ArgumentOutOfRangeException ("index", "< 0");
if (count < 0)
throw new ArgumentOutOfRangeException ("count", "< 0");
// re-ordered to avoid possible integer overflow
if (index > buffer.Length - count)
throw new ArgumentException ("index + count > buffer.Length");

CheckState ();

Task<int> res;
async_task = res = base.ReadAsync (buffer, index, count);
async_task = res = ReadAsyncCore (buffer, index, count);
return res;
}

async Task<int> ReadAsyncCore (char[] buffer, int index, int count)
{
int chars_read = 0;

while (count > 0) {
if (pos >= decoded_count && await ReadBufferAsync ().ConfigureAwait (false) == 0)
return chars_read > 0 ? chars_read : 0;

int cch = Math.Min (decoded_count - pos, count);
Array.Copy (decoded_buffer, pos, buffer, index, cch);
pos += cch;
index += cch;
count -= cch;
chars_read += cch;
if (mayBlock)
break;
}

return chars_read;
}

public override Task<int> ReadBlockAsync (char[] buffer, int index, int count)
{
if (buffer == null)
throw new ArgumentNullException ("buffer");
if (index < 0)
throw new ArgumentOutOfRangeException ("index", "< 0");
if (count < 0)
throw new ArgumentOutOfRangeException ("count", "< 0");
// re-ordered to avoid possible integer overflow
if (index > buffer.Length - count)
throw new ArgumentException ("index + count > buffer.Length");

CheckState ();

Task<int> res;
async_task = res = base.ReadBlockAsync (buffer, index, count);
async_task = res = ReadAsyncCore (buffer, index, count);
return res;
}

Expand All @@ -628,19 +675,94 @@ public override Task<string> ReadLineAsync ()
CheckState ();

Task<string> res;
async_task = res = base.ReadLineAsync ();
async_task = res = ReadLineAsyncCore ();
return res;
}

async Task<string> ReadLineAsyncCore ()
{
if (pos >= decoded_count && await ReadBufferAsync ().ConfigureAwait (false) == 0)
return null;

int begin = pos;
int end = FindNextEOL ();
if (end < decoded_count && end >= begin)
return new string (decoded_buffer, begin, end - begin);
if (end == -2)
return line_builder.ToString (0, line_builder.Length);

if (line_builder == null)
line_builder = new StringBuilder ();
else
line_builder.Length = 0;

while (true) {
if (foundCR) // don't include the trailing CR if present
decoded_count--;

line_builder.Append (decoded_buffer, begin, decoded_count - begin);
if (await ReadBufferAsync ().ConfigureAwait (false) == 0) {
if (line_builder.Capacity > 32768) {
StringBuilder sb = line_builder;
line_builder = null;
return sb.ToString (0, sb.Length);
}
return line_builder.ToString (0, line_builder.Length);
}

begin = pos;
end = FindNextEOL ();
if (end < decoded_count && end >= begin) {
line_builder.Append (decoded_buffer, begin, end - begin);
if (line_builder.Capacity > 32768) {
StringBuilder sb = line_builder;
line_builder = null;
return sb.ToString (0, sb.Length);
}
return line_builder.ToString (0, line_builder.Length);
}

if (end == -2)
return line_builder.ToString (0, line_builder.Length);
}
}

public override Task<string> ReadToEndAsync ()
{
CheckState ();

Task<string> res;
async_task = res = base.ReadToEndAsync ();
async_task = res = ReadToEndAsyncCore ();
return res;
}

async Task<string> ReadToEndAsyncCore ()
{
StringBuilder text = new StringBuilder ();

do {
text.Append (decoded_buffer, pos, decoded_count - pos);
} while (await ReadBufferAsync () != 0);

return text.ToString ();
}

async Task<int> ReadBufferAsync ()
{
pos = 0;

// keep looping until the decoder gives us some chars
decoded_count = 0;
do {
var cbEncoded = await base_stream.ReadAsync (input_buffer, 0, buffer_size).ConfigureAwait (false);
if (cbEncoded <= 0)
return 0;

decoded_count = ReadBufferCore (cbEncoded);
} while (decoded_count == 0);

return decoded_count;
}
#endif
}
}

0 comments on commit 185743c

Please sign in to comment.