Permalink
Browse files

Avoid forcing the data into memory and thus creating multiple copies …

…of the same value and filling up the LOH
  • Loading branch information...
1 parent 63c3f3d commit 8b0b3409e9aa08dc74a5226770f454d44b630a02 @ayende ayende committed Dec 11, 2011
@@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
+using System.IO;
using NLog;
using Raven.Abstractions.Data;
using Raven.Abstractions.Extensions;
@@ -132,12 +133,15 @@ private void ReplicateAttachment(IStorageActionsAccessor actions, string id, Rav
existingAttachment.Metadata.Add(ReplicationConstants.RavenReplicationConflict, RavenJToken.FromObject(true));
actions.Attachments.AddAttachment(existingDocumentConflictId, null, existingAttachment.Data, existingAttachment.Metadata);
+ var conflictAttachment = new RavenJObject
+ {
+ {"Conflicts", new RavenJArray(existingDocumentConflictId, newDocumentConflictId)
+ }
+ };
+ var memoryStream = new MemoryStream();
+ conflictAttachment.WriteTo(memoryStream);
actions.Attachments.AddAttachment(id, null,
- new RavenJObject
- {
- {"Conflicts", new RavenJArray(existingDocumentConflictId, newDocumentConflictId)
- }
- }.ToBytes(),
+ memoryStream.ToArray(),
new RavenJObject
{
{ReplicationConstants.RavenReplicationConflict, true},
@@ -58,21 +58,6 @@ public static RavenJObject ToJObject(this Stream self)
/// <summary>
/// Convert a RavenJToken to a byte array
/// </summary>
- public static byte[] ToBytes(this RavenJToken self)
- {
- using (var memoryStream = new MemoryStream())
- {
- self.WriteTo(new BsonWriter(memoryStream)
- {
- DateTimeKindHandling = DateTimeKind.Unspecified
- });
- return memoryStream.ToArray();
- }
- }
-
- /// <summary>
- /// Convert a RavenJToken to a byte array
- /// </summary>
public static void WriteTo(this RavenJToken self, Stream stream)
{
self.WriteTo(new BsonWriter(stream)
@@ -4,15 +4,16 @@
// </copyright>
//-----------------------------------------------------------------------
using System.ComponentModel.Composition;
+using System.IO;
using Raven.Json.Linq;
namespace Raven.Database.Plugins
{
[InheritedExport]
public abstract class AbstractDocumentCodec
{
- public abstract byte[] Encode(string key, RavenJObject data, RavenJObject metadata, byte[] bytes);
+ public abstract Stream Encode(string key, RavenJObject data, RavenJObject metadata, Stream dataStream);
- public abstract byte[] Decode(string key, RavenJObject metadata, byte[] bytes);
+ public abstract Stream Decode(string key, RavenJObject metadata, Stream dataStream);
}
}
@@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.IO;
using System.Text;
using Microsoft.Isam.Esent.Interop;
using Raven.Abstractions;
@@ -132,10 +133,15 @@ private RavenJObject ReadDocumentDataInTransaction(string key, Guid etag, RavenJ
return cachedDocument.Document;
}
- var dataBuffer = Api.RetrieveColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"]);
- var data = documentCodecs.Aggregate(dataBuffer, (bytes, codec) => codec.Decode(key, metadata, bytes)).ToJObject();
- cacher.SetCachedDocument(key, etag, data, metadata);
- return data;
+ using (Stream stream = new ColumnStream(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"]))
+ {
+ using(var aggregate = documentCodecs.Aggregate(stream, (bytes, codec) => codec.Decode(key, metadata, bytes)))
+ {
+ var data = aggregate.ToJObject();
+ cacher.SetCachedDocument(key, etag, data, metadata);
+ return data;
+ }
+ }
}
private RavenJObject ReadDocumentMetadata(string key, Guid existingEtag)
@@ -153,13 +159,18 @@ private RavenJObject ReadDocumentData(string key, Guid existingEtag, RavenJObjec
if (existingCachedDocument != null)
return existingCachedDocument.Document;
- var dataBuffer = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"]);
- var data = documentCodecs.Aggregate(dataBuffer, (bytes, codec) => codec.Decode(key, metadata, bytes)).ToJObject();
+ using (Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using (var columnStream = documentCodecs.Aggregate(stream, (dataStream, codec) => codec.Decode(key, metadata, dataStream)))
+ {
+ var data = columnStream.ToJObject();
- cacher.SetCachedDocument(key, existingEtag, data, metadata);
+ cacher.SetCachedDocument(key, existingEtag, data, metadata);
- return data;
+ return data;
+ }
+ }
}
public IEnumerable<JsonDocument> GetDocumentsByReverseUpdateOrder(int start)
@@ -173,16 +184,20 @@ public IEnumerable<JsonDocument> GetDocumentsByReverseUpdateOrder(int start)
}
while (Api.TryMovePrevious(session, Documents))
{
- var data = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"]);
var metadata = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["metadata"]).ToJObject();
var key = Api.RetrieveColumnAsString(session, Documents, tableColumnsCache.DocumentsColumns["key"], Encoding.Unicode);
- data = documentCodecs.Aggregate(data, (bytes, codec) => codec.Decode(key, metadata, bytes));
+ RavenJObject dataAsJson;
+ using (Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using (var aggregate = documentCodecs.Aggregate(stream, (bytes, codec) => codec.Decode(key, metadata, bytes)))
+ dataAsJson = aggregate.ToJObject();
+ }
yield return new JsonDocument
{
Key = key,
- DataAsJson = data.ToJObject(),
+ DataAsJson = dataAsJson,
NonAuthoritiveInformation = IsDocumentModifiedInsideTransaction(key),
LastModified = Api.RetrieveColumnAsDateTime(session, Documents, tableColumnsCache.DocumentsColumns["last_modified"]).Value,
Etag = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["etag"]).TransfromToGuidWithProperSorting(),
@@ -200,16 +215,19 @@ public IEnumerable<JsonDocument> GetDocumentsAfter(Guid etag)
yield break;
do
{
- var data = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"]);
var key = Api.RetrieveColumnAsString(session, Documents, tableColumnsCache.DocumentsColumns["key"], Encoding.Unicode);
var metadata = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["metadata"]).ToJObject();
- data = documentCodecs.Aggregate(data, (bytes, codec) => codec.Decode(key, metadata, bytes));
-
+ RavenJObject dataAsJson;
+ using (Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using (var aggregate = documentCodecs.Aggregate(stream, (bytes, codec) => codec.Decode(key, metadata, bytes)))
+ dataAsJson = aggregate.ToJObject();
+ }
yield return new JsonDocument
{
Key = key,
- DataAsJson = data.ToJObject(),
+ DataAsJson = dataAsJson,
NonAuthoritiveInformation = IsDocumentModifiedInsideTransaction(key),
LastModified = Api.RetrieveColumnAsDateTime(session, Documents, tableColumnsCache.DocumentsColumns["last_modified"]).Value,
Etag = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["etag"]).TransfromToGuidWithProperSorting(),
@@ -238,16 +256,20 @@ public IEnumerable<JsonDocument> GetDocumentsWithIdStartingWith(string idPrefix,
start--;
}
- var data = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"]);
var key = Api.RetrieveColumnAsString(session, Documents, tableColumnsCache.DocumentsColumns["key"], Encoding.Unicode);
var metadata = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["metadata"]).ToJObject();
- data = documentCodecs.Aggregate(data, (bytes, codec) => codec.Decode(key, metadata, bytes));
+ RavenJObject dataAsJson;
+ using (Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using (var aggregate = documentCodecs.Aggregate(stream, (bytes, codec) => codec.Decode(key, metadata, bytes)))
+ dataAsJson = aggregate.ToJObject();
+ }
yield return new JsonDocument
{
Key = key,
- DataAsJson = data.ToJObject(),
+ DataAsJson = dataAsJson,
NonAuthoritiveInformation = IsDocumentModifiedInsideTransaction(key),
LastModified = Api.RetrieveColumnAsDateTime(session, Documents, tableColumnsCache.DocumentsColumns["last_modified"]).Value,
Etag = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["etag"]).TransfromToGuidWithProperSorting(),
@@ -274,18 +296,23 @@ public IEnumerable<JsonDocument> GetDocumentsWithIdStartingWith(string idPrefix,
if (id > endId)
break;
- var data = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"]);
- logger.Debug("Document with id '{0}' was found, doc length: {1}", id, data.Length);
+ logger.Debug("Document with id '{0}' was found", id);
var etag = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["etag"]).TransfromToGuidWithProperSorting();
var metadata = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["metadata"]).ToJObject();
var key = Api.RetrieveColumnAsString(session, Documents, tableColumnsCache.DocumentsColumns["key"], Encoding.Unicode);
var modified = Api.RetrieveColumnAsDateTime(session, Documents, tableColumnsCache.DocumentsColumns["last_modified"]);
- data = documentCodecs.Aggregate(data, (bytes, codec) => codec.Decode(key, metadata, bytes));
+
+ RavenJObject dataAsJson;
+ using (Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using (var aggregate = documentCodecs.Aggregate(stream, (dataStream, codec) => codec.Decode(key, metadata, dataStream)))
+ dataAsJson = aggregate.ToJObject();
+ }
var doc = new JsonDocument
{
Key = key,
- DataAsJson = data.ToJObject(),
+ DataAsJson = dataAsJson,
NonAuthoritiveInformation = IsDocumentModifiedInsideTransaction(key),
Etag = etag,
LastModified = modified.Value,
@@ -319,15 +346,25 @@ public Guid AddDocument(string key, Guid? etag, RavenJObject data, RavenJObject
}
Guid newEtag = uuidGenerator.CreateSequentialUuid();
- var bytes = documentCodecs.Aggregate(data.ToBytes(), (current, codec) => codec.Encode(key, data, metadata, current));
using (var update = new Update(session, Documents, isUpdate ? JET_prep.Replace : JET_prep.Insert))
{
Api.SetColumn(session, Documents, tableColumnsCache.DocumentsColumns["key"], key, Encoding.Unicode);
- Api.SetColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"], bytes);
+ using(Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using(var finalStream = documentCodecs.Aggregate(stream, (current, codec) => codec.Encode(key, data, metadata, current)))
+ {
+ data.WriteTo(finalStream);
+ }
+ }
+
Api.SetColumn(session, Documents, tableColumnsCache.DocumentsColumns["etag"], newEtag.TransformToValueForEsentSorting());
Api.SetColumn(session, Documents, tableColumnsCache.DocumentsColumns["last_modified"], SystemTime.UtcNow);
- Api.SetColumn(session, Documents, tableColumnsCache.DocumentsColumns["metadata"], metadata.ToBytes());
+
+ using(Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["metadata"]))
+ {
+ metadata.WriteTo(stream);
+ }
update.Save();
}
@@ -366,15 +403,26 @@ public Guid AddDocumentInTransaction(string key, Guid? etag, RavenJObject data,
Api.MakeKey(session, DocumentsModifiedByTransactions, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
var isUpdateInTransaction = Api.TrySeek(session, DocumentsModifiedByTransactions, SeekGrbit.SeekEQ);
- var bytes = documentCodecs.Aggregate(data.ToBytes(), (current, codec) => codec.Encode(key, data, metadata, current));
using (var update = new Update(session, DocumentsModifiedByTransactions, isUpdateInTransaction ? JET_prep.Replace : JET_prep.Insert))
{
Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["key"], key, Encoding.Unicode);
- Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"], bytes);
+
+ using (Stream stream = new ColumnStream(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"]))
+ {
+ using (var finalStream = documentCodecs.Aggregate(stream, (current, codec) => codec.Encode(key, data, metadata, current)))
+ {
+ data.WriteTo(finalStream);
+ }
+ }
Api.SetColumn(session, DocumentsModifiedByTransactions,
tableColumnsCache.DocumentsModifiedByTransactionsColumns["etag"],
newEtag.TransformToValueForEsentSorting());
- Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["metadata"], metadata.ToBytes());
+
+ using (Stream stream = new ColumnStream(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["metadata"]))
+ {
+ metadata.WriteTo(stream);
+ }
+
Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["last_modified"], SystemTime.UtcNow);
Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["delete_document"], false);
Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["locked_by_transaction"], transactionInformation.Id.ToByteArray());
@@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.IO;
using System.Text;
using Microsoft.Isam.Esent.Interop;
using NLog;
@@ -16,6 +17,7 @@
using Raven.Database.Plugins;
using Raven.Database.Storage;
using Raven.Database.Extensions;
+using Raven.Json.Linq;
namespace Raven.Storage.Esent.StorageActions
{
@@ -223,26 +225,36 @@ public void CompleteTransaction(Guid txId, Action<DocumentInTransactionData> per
do
{
// esent index ranges are approximate, and we need to check them ourselves as well
- if (Api.RetrieveColumnAsGuid(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["locked_by_transaction"]) != txId)
+ if (
+ Api.RetrieveColumnAsGuid(session, DocumentsModifiedByTransactions,
+ tableColumnsCache.DocumentsModifiedByTransactionsColumns["locked_by_transaction"]) != txId)
continue;
- var data = Api.RetrieveColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"]);
- var metadata = Api.RetrieveColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["metadata"]);
- var key = Api.RetrieveColumnAsString(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["key"], Encoding.Unicode);
-
- if(data != null && metadata != null)
+ var metadata = Api.RetrieveColumn(session, DocumentsModifiedByTransactions,
+ tableColumnsCache.DocumentsModifiedByTransactionsColumns["metadata"]);
+ var key = Api.RetrieveColumnAsString(session, DocumentsModifiedByTransactions,
+ tableColumnsCache.DocumentsModifiedByTransactionsColumns["key"],
+ Encoding.Unicode);
+
+ RavenJObject dataAsJson;
+ var metadataAsJson = metadata.ToJObject();
+ using (
+ Stream stream = new ColumnStream(session, DocumentsModifiedByTransactions,
+ tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"]))
{
- var metadataAsJson = metadata.ToJObject();
- data = documentCodecs.Aggregate(data, (bytes, codec) => codec.Decode(key, metadataAsJson, bytes));
+ using (var data = documentCodecs.Aggregate(stream, (dataStream, codec) => codec.Decode(key, metadataAsJson, dataStream)))
+ dataAsJson = data.ToJObject();
}
+
documentsInTransaction.Add(new DocumentInTransactionData
{
- Data = data.ToJObject(),
+ Data = dataAsJson,
Delete =
Api.RetrieveColumnAsBoolean(session, DocumentsModifiedByTransactions,
tableColumnsCache.DocumentsModifiedByTransactionsColumns["delete_document"]).Value,
Etag = Api.RetrieveColumn(session, DocumentsModifiedByTransactions,
- tableColumnsCache.DocumentsModifiedByTransactionsColumns["etag"]).TransfromToGuidWithProperSorting(),
+ tableColumnsCache.DocumentsModifiedByTransactionsColumns["etag"]).
+ TransfromToGuidWithProperSorting(),
Key = key,
Metadata = metadata.ToJObject(),
});
@@ -31,7 +31,12 @@ public void PutMappedResult(string view, string docId, string reduceKey, RavenJO
Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["document_key"], docId, Encoding.Unicode);
Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["reduce_key"], reduceKey, Encoding.Unicode);
Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["reduce_key_and_view_hashed"], viewAndReduceKeyHashed);
- Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["data"], data.ToBytes());
+
+ using(var columnStream = new ColumnStream(session, MappedResults, tableColumnsCache.MappedResultsColumns["data"]))
+ {
+ data.WriteTo(columnStream);
+ }
+
Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["etag"], etag.TransformToValueForEsentSorting());
Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["timestamp"], SystemTime.Now);
Oops, something went wrong.

0 comments on commit 8b0b340

Please sign in to comment.