Skip to content

Commit

Permalink
Add reusable chunk to reduce memory consumption
Browse files Browse the repository at this point in the history
  • Loading branch information
howryu committed Mar 4, 2019
1 parent aac43be commit 122ffcd
Show file tree
Hide file tree
Showing 15 changed files with 530 additions and 12 deletions.
2 changes: 1 addition & 1 deletion Snowflake.Data.Tests/SFDbCommandIT.cs
Expand Up @@ -90,7 +90,7 @@ public void TestSimpleLargeResultSet()
conn.Open();

IDbCommand cmd = conn.CreateCommand();
cmd.CommandText = "select seq4(), uniform(1, 10, 42) from table(generator(rowcount => 200000)) v order by 1";
cmd.CommandText = "select seq4(), uniform(1, 10, 42) from table(generator(rowcount => 1000000)) v order by 1";
IDataReader reader = cmd.ExecuteReader();
int counter = 0;
while (reader.Read())
Expand Down
67 changes: 67 additions & 0 deletions Snowflake.Data.Tests/SFReusableChunkTest.cs
@@ -0,0 +1,67 @@
namespace Snowflake.Data.Tests
{
using NUnit.Framework;
using System.IO;
using System.Text;
using Snowflake.Data.Core;

[TestFixture]
class SFReusableChunkTest
{
[Test]
public void TestSimpleChunk()
{
string data = "[ [\"1\", \"1.234\", \"abcde\"], [\"2\", \"5.678\", \"fghi\"] ]";
byte[] bytes = Encoding.UTF8.GetBytes(data);
Stream stream = new MemoryStream(bytes);
IChunkParser parser = new ReusableChunkParser(stream);

ExecResponseChunk chunkInfo = new ExecResponseChunk()
{
url = "fake",
uncompressedSize = 100,
rowCount = 2
};

SFReusableChunk chunk = new SFReusableChunk(3);
chunk.Reset(chunkInfo, 0);

parser.ParseChunk(chunk);

Assert.AreEqual("1", chunk.ExtractCell(0, 0));
Assert.AreEqual("1.234", chunk.ExtractCell(0, 1));
Assert.AreEqual("abcde", chunk.ExtractCell(0, 2));
Assert.AreEqual("2", chunk.ExtractCell(1, 0));
Assert.AreEqual("5.678", chunk.ExtractCell(1, 1));
Assert.AreEqual("fghi", chunk.ExtractCell(1, 2));
}

[Test]
public void TestChunkWithNull()
{
string data = "[ [null, \"1.234\", null], [\"2\", null, \"fghi\"] ]";
byte[] bytes = Encoding.UTF8.GetBytes(data);
Stream stream = new MemoryStream(bytes);
IChunkParser parser = new ReusableChunkParser(stream);

ExecResponseChunk chunkInfo = new ExecResponseChunk()
{
url = "fake",
uncompressedSize = 100,
rowCount = 2
};

SFReusableChunk chunk = new SFReusableChunk(3);
chunk.Reset(chunkInfo, 0);

parser.ParseChunk(chunk);

Assert.AreEqual(null, chunk.ExtractCell(0, 0));
Assert.AreEqual("1.234", chunk.ExtractCell(0, 1));
Assert.AreEqual(null, chunk.ExtractCell(0, 2));
Assert.AreEqual("2", chunk.ExtractCell(1, 0));
Assert.AreEqual(null, chunk.ExtractCell(1, 1));
Assert.AreEqual("fghi", chunk.ExtractCell(1, 2));
}
}
}
4 changes: 2 additions & 2 deletions Snowflake.Data/Core/ChunkDeserializer.cs
Expand Up @@ -14,13 +14,13 @@ internal ChunkDeserializer(Stream stream)
this.stream = stream;
}

public void ParseChunk(SFResultChunk chunk)
public void ParseChunk(IResultChunk chunk)
{
// parse results row by row
using (StreamReader sr = new StreamReader(stream))
using (JsonTextReader jr = new JsonTextReader(sr))
{
chunk.rowSet = JsonSerializer.Deserialize<string[,]>(jr);
((SFResultChunk)chunk).rowSet = JsonSerializer.Deserialize<string[,]>(jr);
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion Snowflake.Data/Core/ChunkDownloaderFactory.cs
Expand Up @@ -19,7 +19,7 @@ class ChunkDownloaderFactory
}
else
{
return new SFBlockingChunkDownloader(responseData.rowType.Count,
return new SFBlockingChunkDownloaderV3(responseData.rowType.Count,
responseData.chunks,
responseData.qrmk,
responseData.chunkHeaders,
Expand Down
6 changes: 3 additions & 3 deletions Snowflake.Data/Core/ChunkStreamingParser.cs
Expand Up @@ -14,7 +14,7 @@ internal ChunkStreamingParser(Stream stream)
this.stream = stream;
}

public void ParseChunk(SFResultChunk chunk)
public void ParseChunk(IResultChunk chunk)
{
// parse results row by row
using (StreamReader sr = new StreamReader(stream))
Expand All @@ -23,7 +23,7 @@ public void ParseChunk(SFResultChunk chunk)
int row = 0;
int col = 0;

var outputMatrix = new string[chunk.rowCount, chunk.colCount];
var outputMatrix = new string[chunk.GetRowCount(), ((SFResultChunk)chunk).colCount];

while (jr.Read())
{
Expand Down Expand Up @@ -55,7 +55,7 @@ public void ParseChunk(SFResultChunk chunk)
}
}

chunk.rowSet = outputMatrix;
((SFResultChunk)chunk).rowSet = outputMatrix;
}
}
}
Expand Down
4 changes: 3 additions & 1 deletion Snowflake.Data/Core/HttpUtil.cs
Expand Up @@ -36,7 +36,9 @@ static private void initHttpClient()
ServicePointManager.UseNagleAlgorithm = false;
ServicePointManager.CheckCertificateRevocationList = true;

HttpUtil.httpClient = new HttpClient(new RetryHandler(new HttpClientHandler()));
HttpUtil.httpClient = new HttpClient(new RetryHandler(new HttpClientHandler(){
AutomaticDecompression = DecompressionMethods.GZip | DecompressionMethods.Deflate
}));
}

class RetryHandler : DelegatingHandler
Expand Down
2 changes: 1 addition & 1 deletion Snowflake.Data/Core/IChunkParser.cs
Expand Up @@ -8,6 +8,6 @@ interface IChunkParser
/// Parse source data stream, result will be store into SFResultChunk.rowset
/// </summary>
/// <param name="chunk"></param>
void ParseChunk(SFResultChunk chunk);
void ParseChunk(IResultChunk chunk);
}
}
3 changes: 3 additions & 0 deletions Snowflake.Data/Core/RestResponse.cs
Expand Up @@ -230,5 +230,8 @@ internal class ExecResponseChunk

[JsonProperty(PropertyName = "rowCount")]
internal int rowCount { get; set; }

[JsonProperty(PropertyName = "uncompressedSize")]
internal int uncompressedSize { get; set; }
}
}
48 changes: 48 additions & 0 deletions Snowflake.Data/Core/ReusableChunkParser.cs
@@ -0,0 +1,48 @@
using System.IO;
using Newtonsoft.Json;

namespace Snowflake.Data.Core
{
using Snowflake.Data.Client;

class ReusableChunkParser : IChunkParser
{
private readonly Stream stream;

internal ReusableChunkParser(Stream stream)
{
this.stream = stream;
}

public void ParseChunk(IResultChunk chunk)
{
SFReusableChunk rc = (SFReusableChunk)chunk;
// parse results row by row
using (StreamReader sr = new StreamReader(stream))
using (JsonTextReader jr = new JsonTextReader(sr))
{
while (jr.Read())
{
switch (jr.TokenType)
{
case JsonToken.StartArray:
case JsonToken.None:
case JsonToken.EndArray:
break;

case JsonToken.Null:
rc.AddCell(null);
break;

case JsonToken.String:
rc.AddCell((string)jr.Value);
break;

default:
throw new SnowflakeDbException(SFError.INTERNAL_ERROR, $"Unexpected token type: {jr.TokenType}");
}
}
}
}
}
}
4 changes: 2 additions & 2 deletions Snowflake.Data/Core/SFBlockingChunkDownloader.cs
Expand Up @@ -100,7 +100,7 @@ private void FillDownloads()

public Task<IResultChunk> GetNextChunkAsync()
{
if (_downloadTasks.IsAddingCompleted)
if (_downloadTasks.IsCompleted)
{
return Task.FromResult<IResultChunk>(null);
}
Expand All @@ -123,7 +123,7 @@ private async Task<IResultChunk> DownloadChunkAsync(DownloadContext downloadCont
qrmk = downloadContext.qrmk,
// s3 download request timeout to one hour
timeout = TimeSpan.FromHours(1),
httpRequestTimeout = TimeSpan.FromSeconds(16),
httpRequestTimeout = TimeSpan.FromSeconds(32),
chunkHeaders = downloadContext.chunkHeaders
};

Expand Down

0 comments on commit 122ffcd

Please sign in to comment.