Skip to content

Commit

Permalink
Fixes issue markrendle#338. (SqlBulkInserter breaks when the records …
Browse files Browse the repository at this point in the history
…given to it aren't all the same.)
  • Loading branch information
Matt Richard committed Mar 25, 2014
1 parent 35e2e06 commit 8b7bf0b
Show file tree
Hide file tree
Showing 3 changed files with 112 additions and 4 deletions.
22 changes: 18 additions & 4 deletions Simple.Data.SqlServer/SqlBulkInserter.cs
Expand Up @@ -44,13 +44,15 @@ public class SqlBulkInserter : IBulkInserter
using (bulkCopy)
{
connection.OpenIfClosed();
foreach (var record in data)

var dataList = data.ToList();
foreach (var record in dataList)
{
if (count == 0)
{
dataTable = CreateDataTable(adapter, tableName, record.Keys, bulkCopy);
dataTable = CreateDataTable(adapter, tableName, dataList.SelectMany(r => r.Keys).Distinct(), bulkCopy);
}
dataTable.Rows.Add(record.Values.ToArray());
AddRow(dataTable, record);

if (++count%5000 == 0)
{
Expand Down Expand Up @@ -82,7 +84,7 @@ private SqlBulkCopyOptions BuildBulkCopyOptions(AdoAdapter adapter)
return options;
}

private DataTable CreateDataTable(AdoAdapter adapter, string tableName, ICollection<string> keys, SqlBulkCopy bulkCopy)
private DataTable CreateDataTable(AdoAdapter adapter, string tableName, IEnumerable<string> keys, SqlBulkCopy bulkCopy)
{
var table = adapter.GetSchema().FindTable(tableName);
var dataTable = new DataTable(table.ActualName);
Expand All @@ -104,5 +106,17 @@ private DataTable CreateDataTable(AdoAdapter adapter, string tableName, ICollect

return dataTable;
}

private void AddRow(DataTable dataTable, IDictionary<string, object> record)
{
var dataRow = dataTable.NewRow();
foreach (DataColumn column in dataTable.Columns)
{
if (record.ContainsKey(column.ColumnName))
dataRow[column] = record[column.ColumnName];
}
dataTable.Rows.Add(dataRow);
}

}
}
82 changes: 82 additions & 0 deletions Simple.Data.SqlTest/BulkInsertTest.cs
Expand Up @@ -52,6 +52,59 @@ public void BulkInsertUsesSchemaAndFireTriggers()
Assert.AreEqual(1000, rowsWhichWhereUpdatedByTrigger);
}


[Test]
public void BulkInsertRecordsWithDifferentColumnsProperlyInsertsData()
{
DatabaseHelper.Reset();

var db = DatabaseHelper.Open();
dynamic r1 = new SimpleRecord();
r1.FirstName = "Bob";
r1.LastName = "Dole";

dynamic r2 = new SimpleRecord();
r2.FirstName = "Bob";
r2.MiddleInitial = "L";
r2.LastName = "Saget";

db.OptionalColumnTest.Insert(new[] { r2, r1 });

var objs = db.OptionalColumnTest.All().ToList<OptionalColumnTestObject>();

var expected = new[] {new OptionalColumnTestObject("Bob", "Dole"), new OptionalColumnTestObject("Bob", "Saget", "L"),};

Assert.That(objs, Is.EquivalentTo(expected));

}

[Test]
public void BulkInsertRecordsWithDifferentColumnsAndFewerColumnsInFirstRecordProperlyInsertsData()
{
DatabaseHelper.Reset();

var db = DatabaseHelper.Open();

dynamic r1 = new SimpleRecord();
r1.FirstName = "Bob";
r1.LastName = "Dole";

dynamic r2 = new SimpleRecord();
r2.FirstName = "Bob";
r2.MiddleInitial = "L";
r2.LastName = "Saget";

db.OptionalColumnTest.Insert(new[] { r1, r2 });

var objs = db.OptionalColumnTest.All().ToList<OptionalColumnTestObject>();

var expected = new[] { new OptionalColumnTestObject("Bob", "Dole"), new OptionalColumnTestObject("Bob", "Saget", "L"), };

Assert.That(objs, Is.EquivalentTo(expected));

}


private static IEnumerable<SchemaItem> GenerateItems()
{
for (int i = 0; i < 1000; i++)
Expand All @@ -61,6 +114,35 @@ private static IEnumerable<SchemaItem> GenerateItems()
}
}

class OptionalColumnTestObject
{
public int Id { get; set; }
public string FirstName { get; set; }
public string LastName { get; set; }
public string MiddleInitial { get; set; }

public OptionalColumnTestObject() {}

public OptionalColumnTestObject(string first, string last, string middle = null)
{
FirstName = first;
LastName = last;
MiddleInitial = middle;
}

public override string ToString()
{
return string.Format("<FirstName={0}, LastName={1}, MiddleInitial={2}>", FirstName, LastName, MiddleInitial);
}

public override bool Equals(object obj)
{
var other = obj as OptionalColumnTestObject;
if (other == null) return false;
return other.FirstName == FirstName && other.LastName == LastName && other.MiddleInitial == MiddleInitial;
}
}

class SchemaItem
{
public SchemaItem(int id, string description)
Expand Down
12 changes: 12 additions & 0 deletions Simple.Data.SqlTest/Resources/DatabaseReset.txt
Expand Up @@ -86,6 +86,8 @@ BEGIN
DROP TABLE [dbo].[HierarchyIdTest]
IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[TimestampTest]') AND type in (N'U'))
DROP TABLE [dbo].[TimestampTest]
IF EXISTS (SELECT * FROM sys.objects WHERE object_id = OBJECT_ID(N'[dbo].[OptionalColumnTest]') AND type in (N'U'))
DROP TABLE [dbo].[OptionalColumnTest]

CREATE TABLE [dbo].[Users] (
[Id] INT IDENTITY (1, 1) NOT NULL,
Expand Down Expand Up @@ -245,6 +247,16 @@ BEGIN
[Id] ASC
))

CREATE TABLE [dbo].[OptionalColumnTest](
[Id] [int] IDENTITY(1,1) NOT NULL,
[FirstName] [nvarchar](50),
[LastName] [nvarchar](50),
[MiddleInitial] [nvarchar](50),
)

ALTER TABLE [dbo].[OptionalColumnTest]
ADD CONSTRAINT [PK_OptionalColumnTest] PRIMARY KEY CLUSTERED ([Id] ASC)

BEGIN TRANSACTION
SET IDENTITY_INSERT [dbo].[Customers] ON
INSERT INTO [dbo].[Customers] ([CustomerId], [Name], [Address]) VALUES (1, N'Test', N'100 Road')
Expand Down

0 comments on commit 8b7bf0b

Please sign in to comment.