Skip to content
Browse files

Avoid forcing the data into memory and thus creating multiple copies …

…of the same value and filling up the LOH
  • Loading branch information...
1 parent 63c3f3d commit 8b0b3409e9aa08dc74a5226770f454d44b630a02 @ayende ayende committed Dec 11, 2011
View
14 Bundles/Raven.Bundles.Replication/Reponsders/AttachmentReplicationResponder.cs
@@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using System.ComponentModel.Composition;
+using System.IO;
using NLog;
using Raven.Abstractions.Data;
using Raven.Abstractions.Extensions;
@@ -132,12 +133,15 @@ private void ReplicateAttachment(IStorageActionsAccessor actions, string id, Rav
existingAttachment.Metadata.Add(ReplicationConstants.RavenReplicationConflict, RavenJToken.FromObject(true));
actions.Attachments.AddAttachment(existingDocumentConflictId, null, existingAttachment.Data, existingAttachment.Metadata);
+ var conflictAttachment = new RavenJObject
+ {
+ {"Conflicts", new RavenJArray(existingDocumentConflictId, newDocumentConflictId)
+ }
+ };
+ var memoryStream = new MemoryStream();
+ conflictAttachment.WriteTo(memoryStream);
actions.Attachments.AddAttachment(id, null,
- new RavenJObject
- {
- {"Conflicts", new RavenJArray(existingDocumentConflictId, newDocumentConflictId)
- }
- }.ToBytes(),
+ memoryStream.ToArray(),
new RavenJObject
{
{ReplicationConstants.RavenReplicationConflict, true},
View
15 Raven.Abstractions/Extensions/JsonExtensions.cs
@@ -58,21 +58,6 @@ public static RavenJObject ToJObject(this Stream self)
/// <summary>
/// Convert a RavenJToken to a byte array
/// </summary>
- public static byte[] ToBytes(this RavenJToken self)
- {
- using (var memoryStream = new MemoryStream())
- {
- self.WriteTo(new BsonWriter(memoryStream)
- {
- DateTimeKindHandling = DateTimeKind.Unspecified
- });
- return memoryStream.ToArray();
- }
- }
-
- /// <summary>
- /// Convert a RavenJToken to a byte array
- /// </summary>
public static void WriteTo(this RavenJToken self, Stream stream)
{
self.WriteTo(new BsonWriter(stream)
View
5 Raven.Database/Plugins/AbstractDocumentCodec.cs
@@ -4,15 +4,16 @@
// </copyright>
//-----------------------------------------------------------------------
using System.ComponentModel.Composition;
+using System.IO;
using Raven.Json.Linq;
namespace Raven.Database.Plugins
{
[InheritedExport]
public abstract class AbstractDocumentCodec
{
- public abstract byte[] Encode(string key, RavenJObject data, RavenJObject metadata, byte[] bytes);
+ public abstract Stream Encode(string key, RavenJObject data, RavenJObject metadata, Stream dataStream);
- public abstract byte[] Decode(string key, RavenJObject metadata, byte[] bytes);
+ public abstract Stream Decode(string key, RavenJObject metadata, Stream dataStream);
}
}
View
104 Raven.Storage.Esent/StorageActions/Documents.cs
@@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.IO;
using System.Text;
using Microsoft.Isam.Esent.Interop;
using Raven.Abstractions;
@@ -132,10 +133,15 @@ private RavenJObject ReadDocumentDataInTransaction(string key, Guid etag, RavenJ
return cachedDocument.Document;
}
- var dataBuffer = Api.RetrieveColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"]);
- var data = documentCodecs.Aggregate(dataBuffer, (bytes, codec) => codec.Decode(key, metadata, bytes)).ToJObject();
- cacher.SetCachedDocument(key, etag, data, metadata);
- return data;
+ using (Stream stream = new ColumnStream(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"]))
+ {
+ using(var aggregate = documentCodecs.Aggregate(stream, (bytes, codec) => codec.Decode(key, metadata, bytes)))
+ {
+ var data = aggregate.ToJObject();
+ cacher.SetCachedDocument(key, etag, data, metadata);
+ return data;
+ }
+ }
}
private RavenJObject ReadDocumentMetadata(string key, Guid existingEtag)
@@ -153,13 +159,18 @@ private RavenJObject ReadDocumentData(string key, Guid existingEtag, RavenJObjec
if (existingCachedDocument != null)
return existingCachedDocument.Document;
- var dataBuffer = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"]);
- var data = documentCodecs.Aggregate(dataBuffer, (bytes, codec) => codec.Decode(key, metadata, bytes)).ToJObject();
+ using (Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using (var columnStream = documentCodecs.Aggregate(stream, (dataStream, codec) => codec.Decode(key, metadata, dataStream)))
+ {
+ var data = columnStream.ToJObject();
- cacher.SetCachedDocument(key, existingEtag, data, metadata);
+ cacher.SetCachedDocument(key, existingEtag, data, metadata);
- return data;
+ return data;
+ }
+ }
}
public IEnumerable<JsonDocument> GetDocumentsByReverseUpdateOrder(int start)
@@ -173,16 +184,20 @@ public IEnumerable<JsonDocument> GetDocumentsByReverseUpdateOrder(int start)
}
while (Api.TryMovePrevious(session, Documents))
{
- var data = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"]);
var metadata = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["metadata"]).ToJObject();
var key = Api.RetrieveColumnAsString(session, Documents, tableColumnsCache.DocumentsColumns["key"], Encoding.Unicode);
- data = documentCodecs.Aggregate(data, (bytes, codec) => codec.Decode(key, metadata, bytes));
+ RavenJObject dataAsJson;
+ using (Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using (var aggregate = documentCodecs.Aggregate(stream, (bytes, codec) => codec.Decode(key, metadata, bytes)))
+ dataAsJson = aggregate.ToJObject();
+ }
yield return new JsonDocument
{
Key = key,
- DataAsJson = data.ToJObject(),
+ DataAsJson = dataAsJson,
NonAuthoritiveInformation = IsDocumentModifiedInsideTransaction(key),
LastModified = Api.RetrieveColumnAsDateTime(session, Documents, tableColumnsCache.DocumentsColumns["last_modified"]).Value,
Etag = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["etag"]).TransfromToGuidWithProperSorting(),
@@ -200,16 +215,19 @@ public IEnumerable<JsonDocument> GetDocumentsAfter(Guid etag)
yield break;
do
{
- var data = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"]);
var key = Api.RetrieveColumnAsString(session, Documents, tableColumnsCache.DocumentsColumns["key"], Encoding.Unicode);
var metadata = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["metadata"]).ToJObject();
- data = documentCodecs.Aggregate(data, (bytes, codec) => codec.Decode(key, metadata, bytes));
-
+ RavenJObject dataAsJson;
+ using (Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using (var aggregate = documentCodecs.Aggregate(stream, (bytes, codec) => codec.Decode(key, metadata, bytes)))
+ dataAsJson = aggregate.ToJObject();
+ }
yield return new JsonDocument
{
Key = key,
- DataAsJson = data.ToJObject(),
+ DataAsJson = dataAsJson,
NonAuthoritiveInformation = IsDocumentModifiedInsideTransaction(key),
LastModified = Api.RetrieveColumnAsDateTime(session, Documents, tableColumnsCache.DocumentsColumns["last_modified"]).Value,
Etag = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["etag"]).TransfromToGuidWithProperSorting(),
@@ -238,16 +256,20 @@ public IEnumerable<JsonDocument> GetDocumentsWithIdStartingWith(string idPrefix,
start--;
}
- var data = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"]);
var key = Api.RetrieveColumnAsString(session, Documents, tableColumnsCache.DocumentsColumns["key"], Encoding.Unicode);
var metadata = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["metadata"]).ToJObject();
- data = documentCodecs.Aggregate(data, (bytes, codec) => codec.Decode(key, metadata, bytes));
+ RavenJObject dataAsJson;
+ using (Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using (var aggregate = documentCodecs.Aggregate(stream, (bytes, codec) => codec.Decode(key, metadata, bytes)))
+ dataAsJson = aggregate.ToJObject();
+ }
yield return new JsonDocument
{
Key = key,
- DataAsJson = data.ToJObject(),
+ DataAsJson = dataAsJson,
NonAuthoritiveInformation = IsDocumentModifiedInsideTransaction(key),
LastModified = Api.RetrieveColumnAsDateTime(session, Documents, tableColumnsCache.DocumentsColumns["last_modified"]).Value,
Etag = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["etag"]).TransfromToGuidWithProperSorting(),
@@ -274,18 +296,23 @@ public IEnumerable<JsonDocument> GetDocumentsWithIdStartingWith(string idPrefix,
if (id > endId)
break;
- var data = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"]);
- logger.Debug("Document with id '{0}' was found, doc length: {1}", id, data.Length);
+ logger.Debug("Document with id '{0}' was found", id);
var etag = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["etag"]).TransfromToGuidWithProperSorting();
var metadata = Api.RetrieveColumn(session, Documents, tableColumnsCache.DocumentsColumns["metadata"]).ToJObject();
var key = Api.RetrieveColumnAsString(session, Documents, tableColumnsCache.DocumentsColumns["key"], Encoding.Unicode);
var modified = Api.RetrieveColumnAsDateTime(session, Documents, tableColumnsCache.DocumentsColumns["last_modified"]);
- data = documentCodecs.Aggregate(data, (bytes, codec) => codec.Decode(key, metadata, bytes));
+
+ RavenJObject dataAsJson;
+ using (Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using (var aggregate = documentCodecs.Aggregate(stream, (dataStream, codec) => codec.Decode(key, metadata, dataStream)))
+ dataAsJson = aggregate.ToJObject();
+ }
var doc = new JsonDocument
{
Key = key,
- DataAsJson = data.ToJObject(),
+ DataAsJson = dataAsJson,
NonAuthoritiveInformation = IsDocumentModifiedInsideTransaction(key),
Etag = etag,
LastModified = modified.Value,
@@ -319,15 +346,25 @@ public Guid AddDocument(string key, Guid? etag, RavenJObject data, RavenJObject
}
Guid newEtag = uuidGenerator.CreateSequentialUuid();
- var bytes = documentCodecs.Aggregate(data.ToBytes(), (current, codec) => codec.Encode(key, data, metadata, current));
using (var update = new Update(session, Documents, isUpdate ? JET_prep.Replace : JET_prep.Insert))
{
Api.SetColumn(session, Documents, tableColumnsCache.DocumentsColumns["key"], key, Encoding.Unicode);
- Api.SetColumn(session, Documents, tableColumnsCache.DocumentsColumns["data"], bytes);
+ using(Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["data"]))
+ {
+ using(var finalStream = documentCodecs.Aggregate(stream, (current, codec) => codec.Encode(key, data, metadata, current)))
+ {
+ data.WriteTo(finalStream);
+ }
+ }
+
Api.SetColumn(session, Documents, tableColumnsCache.DocumentsColumns["etag"], newEtag.TransformToValueForEsentSorting());
Api.SetColumn(session, Documents, tableColumnsCache.DocumentsColumns["last_modified"], SystemTime.UtcNow);
- Api.SetColumn(session, Documents, tableColumnsCache.DocumentsColumns["metadata"], metadata.ToBytes());
+
+ using(Stream stream = new ColumnStream(session, Documents, tableColumnsCache.DocumentsColumns["metadata"]))
+ {
+ metadata.WriteTo(stream);
+ }
update.Save();
}
@@ -366,15 +403,26 @@ public Guid AddDocumentInTransaction(string key, Guid? etag, RavenJObject data,
Api.MakeKey(session, DocumentsModifiedByTransactions, key, Encoding.Unicode, MakeKeyGrbit.NewKey);
var isUpdateInTransaction = Api.TrySeek(session, DocumentsModifiedByTransactions, SeekGrbit.SeekEQ);
- var bytes = documentCodecs.Aggregate(data.ToBytes(), (current, codec) => codec.Encode(key, data, metadata, current));
using (var update = new Update(session, DocumentsModifiedByTransactions, isUpdateInTransaction ? JET_prep.Replace : JET_prep.Insert))
{
Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["key"], key, Encoding.Unicode);
- Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"], bytes);
+
+ using (Stream stream = new ColumnStream(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"]))
+ {
+ using (var finalStream = documentCodecs.Aggregate(stream, (current, codec) => codec.Encode(key, data, metadata, current)))
+ {
+ data.WriteTo(finalStream);
+ }
+ }
Api.SetColumn(session, DocumentsModifiedByTransactions,
tableColumnsCache.DocumentsModifiedByTransactionsColumns["etag"],
newEtag.TransformToValueForEsentSorting());
- Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["metadata"], metadata.ToBytes());
+
+ using (Stream stream = new ColumnStream(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["metadata"]))
+ {
+ metadata.WriteTo(stream);
+ }
+
Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["last_modified"], SystemTime.UtcNow);
Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["delete_document"], false);
Api.SetColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["locked_by_transaction"], transactionInformation.Id.ToByteArray());
View
32 Raven.Storage.Esent/StorageActions/General.cs
@@ -6,6 +6,7 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
+using System.IO;
using System.Text;
using Microsoft.Isam.Esent.Interop;
using NLog;
@@ -16,6 +17,7 @@
using Raven.Database.Plugins;
using Raven.Database.Storage;
using Raven.Database.Extensions;
+using Raven.Json.Linq;
namespace Raven.Storage.Esent.StorageActions
{
@@ -223,26 +225,36 @@ public void CompleteTransaction(Guid txId, Action<DocumentInTransactionData> per
do
{
// esent index ranges are approximate, and we need to check them ourselves as well
- if (Api.RetrieveColumnAsGuid(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["locked_by_transaction"]) != txId)
+ if (
+ Api.RetrieveColumnAsGuid(session, DocumentsModifiedByTransactions,
+ tableColumnsCache.DocumentsModifiedByTransactionsColumns["locked_by_transaction"]) != txId)
continue;
- var data = Api.RetrieveColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"]);
- var metadata = Api.RetrieveColumn(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["metadata"]);
- var key = Api.RetrieveColumnAsString(session, DocumentsModifiedByTransactions, tableColumnsCache.DocumentsModifiedByTransactionsColumns["key"], Encoding.Unicode);
-
- if(data != null && metadata != null)
+ var metadata = Api.RetrieveColumn(session, DocumentsModifiedByTransactions,
+ tableColumnsCache.DocumentsModifiedByTransactionsColumns["metadata"]);
+ var key = Api.RetrieveColumnAsString(session, DocumentsModifiedByTransactions,
+ tableColumnsCache.DocumentsModifiedByTransactionsColumns["key"],
+ Encoding.Unicode);
+
+ RavenJObject dataAsJson;
+ var metadataAsJson = metadata.ToJObject();
+ using (
+ Stream stream = new ColumnStream(session, DocumentsModifiedByTransactions,
+ tableColumnsCache.DocumentsModifiedByTransactionsColumns["data"]))
{
- var metadataAsJson = metadata.ToJObject();
- data = documentCodecs.Aggregate(data, (bytes, codec) => codec.Decode(key, metadataAsJson, bytes));
+ using (var data = documentCodecs.Aggregate(stream, (dataStream, codec) => codec.Decode(key, metadataAsJson, dataStream)))
+ dataAsJson = data.ToJObject();
}
+
documentsInTransaction.Add(new DocumentInTransactionData
{
- Data = data.ToJObject(),
+ Data = dataAsJson,
Delete =
Api.RetrieveColumnAsBoolean(session, DocumentsModifiedByTransactions,
tableColumnsCache.DocumentsModifiedByTransactionsColumns["delete_document"]).Value,
Etag = Api.RetrieveColumn(session, DocumentsModifiedByTransactions,
- tableColumnsCache.DocumentsModifiedByTransactionsColumns["etag"]).TransfromToGuidWithProperSorting(),
+ tableColumnsCache.DocumentsModifiedByTransactionsColumns["etag"]).
+ TransfromToGuidWithProperSorting(),
Key = key,
Metadata = metadata.ToJObject(),
});
View
7 Raven.Storage.Esent/StorageActions/MappedResults.cs
@@ -31,7 +31,12 @@ public void PutMappedResult(string view, string docId, string reduceKey, RavenJO
Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["document_key"], docId, Encoding.Unicode);
Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["reduce_key"], reduceKey, Encoding.Unicode);
Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["reduce_key_and_view_hashed"], viewAndReduceKeyHashed);
- Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["data"], data.ToBytes());
+
+ using(var columnStream = new ColumnStream(session, MappedResults, tableColumnsCache.MappedResultsColumns["data"]))
+ {
+ data.WriteTo(columnStream);
+ }
+
Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["etag"], etag.TransformToValueForEsentSorting());
Api.SetColumn(session, MappedResults, tableColumnsCache.MappedResultsColumns["timestamp"], SystemTime.Now);
View
25 Raven.Storage.Managed/DocumentsStorageActions.cs
@@ -197,19 +197,18 @@ private RavenJObject ReadDocument(Tuple<MemoryStream, RavenJObject> stream, Json
if (stream.Item2 != null)
return stream.Item2;
- var memoryStream = stream.Item1;
+ RavenJObject result;
+ Stream docDataStream = stream.Item1;
if (documentCodecs.Count() > 0)
{
- byte[] buffer = memoryStream.GetBuffer();
var metadataCopy = (RavenJObject)metadata.Metadata.CloneToken() ;
- var dataBuffer = new byte[memoryStream.Length - memoryStream.Position];
- Buffer.BlockCopy(buffer, (int)memoryStream.Position, dataBuffer, 0,
- dataBuffer.Length);
- documentCodecs.Aggregate(dataBuffer, (bytes, codec) => codec.Value.Decode(metadata.Key, metadataCopy, bytes));
- memoryStream = new MemoryStream(dataBuffer);
+ using (docDataStream = documentCodecs.Aggregate(docDataStream, (dataStream, codec) => codec.Value.Decode(metadata.Key, metadataCopy, dataStream)))
+ result = docDataStream.ToJObject();
+ }
+ else
+ {
+ result = docDataStream.ToJObject();
}
-
- var result = memoryStream.ToJObject();
Debug.Assert(metadata.Etag != null);
documentCacher.SetCachedDocument(metadata.Key, metadata.Etag.Value, result, metadata.Metadata);
@@ -243,9 +242,11 @@ public Guid AddDocument(string key, Guid? etag, RavenJObject data, RavenJObject
metadata.WriteTo(ms);
- var bytes = documentCodecs.Aggregate(data.ToBytes(), (current, codec) => codec.Value.Encode(key, data, metadata, current));
-
- ms.Write(bytes, 0, bytes.Length);
+ using (var stream = documentCodecs.Aggregate<Lazy<AbstractDocumentCodec>, Stream>(ms, (dataStream, codec) => codec.Value.Encode(key, data, metadata, dataStream)))
+ {
+ data.WriteTo(stream);
+ stream.Flush();
+ }
var newEtag = generator.CreateSequentialUuid();
storage.Documents.Put(new RavenJObject
View
8 Raven.Storage.Managed/TransactionStorageActions.cs
@@ -63,9 +63,11 @@ public Guid AddDocumentInTransaction(string key, Guid? etag, RavenJObject data,
var ms = new MemoryStream();
metadata.WriteTo(ms);
- var dataBytes = documentCodecs.Aggregate(data.ToBytes(), (bytes, codec) => codec.Encode(key, data, metadata, bytes));
- ms.Write(dataBytes, 0, dataBytes.Length);
-
+ using (var stream = documentCodecs.Aggregate<Lazy<AbstractDocumentCodec>, Stream>(ms, (memoryStream, codec) => codec.Value.Encode(key, data, metadata, memoryStream)))
+ {
+ data.WriteTo(stream);
+ stream.Flush();
+ }
var newEtag = generator.CreateSequentialUuid();
storage.DocumentsModifiedByTransactions.Put(new RavenJObject
{

0 comments on commit 8b0b340

Please sign in to comment.
Something went wrong with that request. Please try again.