forked from samus/mongodb-csharp
/
InsertMessage.cs
147 lines (125 loc) · 5.17 KB
/
InsertMessage.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
using System;
using System.Collections.Generic;
using System.IO;
using MongoDB.Bson;
namespace MongoDB.Protocol
{
/// <summary>
/// The OP_INSERT message is used to insert one or more documents into a collection.
/// </summary>
/// <remarks>
/// struct {
/// MsgHeader header; // standard message header
/// int32 ZERO; // 0 - reserved for future use
/// cstring fullCollectionName; // "dbname.collectionname"
/// BSON[] documents; // one or more documents to insert into the collection
/// }
/// </remarks>
public class InsertMessage : MessageBase, IRequestMessage
{
private readonly BsonWriterSettings _bsonWriterSettings;
private readonly List<MessageChunk> _chunks = new List<MessageChunk>();
/// <summary>
/// Initializes a new instance of the <see cref="InsertMessage"/> class.
/// </summary>
public InsertMessage(BsonWriterSettings bsonWriterSettings)
{
if(bsonWriterSettings == null)
throw new ArgumentNullException("bsonWriterSettings");
_bsonWriterSettings = bsonWriterSettings;
Header = new MessageHeader(OpCode.Insert);
}
/// <summary>
/// Gets or sets the full name of the collection.
/// </summary>
/// <value>The full name of the collection.</value>
public string FullCollectionName { get; set; }
/// <summary>
/// Gets or sets the documents.
/// </summary>
/// <value>The documents.</value>
public object[] Documents { get; set; }
/// <summary>
/// Writes the specified stream.
/// </summary>
/// <param name="stream">The stream.</param>
public void Write(Stream stream){
var bstream = new BufferedStream(stream);
var bwriter = new BsonWriter(bstream, _bsonWriterSettings);
ChunkMessage(bwriter);
foreach(var chunk in _chunks)
WriteChunk(bstream, chunk);
}
/// <summary>
/// Breaks down an insert message that may be too large into managable sizes.
/// When inserting only one document there will be only one chunk. However chances
/// are that when inserting thousands of documents at once there will be many.
/// </summary>
protected void ChunkMessage(BsonWriter writer){
var baseSize = CalculateBaseSize(writer);
var chunk = new MessageChunk{Size = baseSize, Documents = new List<object>()};
foreach(var document in Documents){
var documentSize = writer.CalculateSize(document);
if(documentSize + baseSize >= MaximumMessageSize)
throw new MongoException("Document is too big to fit in a message.");
if(documentSize + chunk.Size > MaximumMessageSize){
_chunks.Add(chunk);
chunk = new MessageChunk{Size = baseSize, Documents = new List<object>()};
}
chunk.Documents.Add(document);
chunk.Size += documentSize;
}
_chunks.Add(chunk);
}
/// <summary>
/// The base size that all chunks will have.
/// </summary>
protected int CalculateBaseSize(BsonWriter writer){
var size = 4; //first int32
size += writer.CalculateSize(FullCollectionName, false);
size += Header.MessageLength;
return size;
}
/// <summary>
/// Writes out a header and the chunk of documents.
/// </summary>
/// <param name = "stream"></param>
/// <param name = "chunk"></param>
protected void WriteChunk(Stream stream, MessageChunk chunk){
WriteHeader(new BinaryWriter(stream), chunk.Size);
var writer = new BsonWriter(stream, _bsonWriterSettings);
writer.WriteValue(BsonType.Integer, 0);
writer.Write(FullCollectionName, false);
foreach(var document in chunk.Documents)
writer.WriteObject(document);
writer.Flush();
}
/// <summary>
/// Writes the header.
/// </summary>
/// <param name="writer">The writer.</param>
/// <param name="messageSize">Size of the MSG.</param>
protected void WriteHeader(BinaryWriter writer, int messageSize){
var header = Header;
writer.Write(messageSize);
writer.Write(header.RequestId);
writer.Write(header.ResponseTo);
writer.Write((int)header.OpCode);
writer.Flush();
}
/// <summary>
///
/// </summary>
protected struct MessageChunk
{
/// <summary>
///
/// </summary>
public List<object> Documents;
/// <summary>
///
/// </summary>
public int Size;
}
}
}