Skip to content

Commit

Permalink
Implemented proper, join-aware paging in SQL Server
Browse files Browse the repository at this point in the history
  • Loading branch information
markrendle committed Nov 6, 2012
1 parent 5bcbf2e commit 54581ae
Show file tree
Hide file tree
Showing 11 changed files with 216 additions and 172 deletions.
212 changes: 106 additions & 106 deletions Simple.Data.Ado.Test/ProviderHelperTest.cs
Original file line number Original file line Diff line number Diff line change
@@ -1,118 +1,118 @@
using System; using System;
using System.Collections.Generic; using System.Collections.Generic;
using System.Linq; using System.Linq;
using System.Text; using System.Text;
using NUnit.Framework; using NUnit.Framework;
using System.Data; using System.Data;
using Simple.Data.Ado.Schema; using Simple.Data.Ado.Schema;
using System.ComponentModel.Composition; using System.ComponentModel.Composition;


namespace Simple.Data.Ado.Test namespace Simple.Data.Ado.Test
{ {
[TestFixture] [TestFixture]
public class ProviderHelperTest public class ProviderHelperTest
{ {
[Test] [Test]
public void ShouldNotRequestExportableTypeFromServiceProvider() public void ShouldNotRequestExportableTypeFromServiceProvider()
{ {
var helper = new ProviderHelper(); var helper = new ProviderHelper();
var connectionProvider = new StubConnectionAndServiceProvider(); var connectionProvider = new StubConnectionAndServiceProvider();
var actual = helper.GetCustomProvider<ITestInterface>(connectionProvider); var actual = helper.GetCustomProvider<ITestInterface>(connectionProvider);
Assert.IsNull(connectionProvider.RequestedServiceType); Assert.IsNull(connectionProvider.RequestedServiceType);
} }


[Test] [Test]
public void ShouldRequestNonExportedTypeFromServiceProvider() public void ShouldRequestNonExportedTypeFromServiceProvider()
{ {
var helper = new ProviderHelper(); var helper = new ProviderHelper();
var connectionProvider = new StubConnectionAndServiceProvider(); var connectionProvider = new StubConnectionAndServiceProvider();
var actual = helper.GetCustomProvider<IQueryPager>(connectionProvider); var actual = helper.GetCustomProvider<IQueryPager>(connectionProvider);
Assert.AreEqual(typeof(IQueryPager), connectionProvider.RequestedServiceType); Assert.AreEqual(typeof(IQueryPager), connectionProvider.RequestedServiceType);
} }


[Test] [Test]
public void ShouldReturnNonExportedTypeFromServiceProvider() public void ShouldReturnNonExportedTypeFromServiceProvider()
{ {
var helper = new ProviderHelper(); var helper = new ProviderHelper();
var connectionProvider = new StubConnectionAndServiceProvider(); var connectionProvider = new StubConnectionAndServiceProvider();
var actual = helper.GetCustomProvider<IQueryPager>(connectionProvider); var actual = helper.GetCustomProvider<IQueryPager>(connectionProvider);
Assert.IsInstanceOf(typeof(IQueryPager), actual); Assert.IsInstanceOf(typeof(IQueryPager), actual);
} }


[Test] [Test]
public void ShouldFindProviderUsingAssemblyAttribute() public void ShouldFindProviderUsingAssemblyAttribute()
{ {
IConnectionProvider provider; IConnectionProvider provider;
Assert.True(ProviderHelper.TryLoadAssemblyUsingAttribute("Test", null, out provider)); Assert.True(ProviderHelper.TryLoadAssemblyUsingAttribute("Test", null, out provider));
Assert.IsNotNull(provider); Assert.IsNotNull(provider);
Assert.IsInstanceOf<StubConnectionProvider>(provider); Assert.IsInstanceOf<StubConnectionProvider>(provider);
} }


public class StubConnectionAndServiceProvider : IConnectionProvider, IServiceProvider public class StubConnectionAndServiceProvider : IConnectionProvider, IServiceProvider
{ {
public void SetConnectionString(string connectionString) public void SetConnectionString(string connectionString)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }


public IDbConnection CreateConnection() public IDbConnection CreateConnection()
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }


public ISchemaProvider GetSchemaProvider() public ISchemaProvider GetSchemaProvider()
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }


public string ConnectionString public string ConnectionString
{ {
get { throw new NotImplementedException(); } get { throw new NotImplementedException(); }
} }


public bool SupportsCompoundStatements public bool SupportsCompoundStatements
{ {
get { throw new NotImplementedException(); } get { throw new NotImplementedException(); }
} }


public string GetIdentityFunction() public string GetIdentityFunction()
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }


public bool SupportsStoredProcedures public bool SupportsStoredProcedures
{ {
get { throw new NotImplementedException(); } get { throw new NotImplementedException(); }
} }


public IProcedureExecutor GetProcedureExecutor(AdoAdapter adapter, ObjectName procedureName) public IProcedureExecutor GetProcedureExecutor(AdoAdapter adapter, ObjectName procedureName)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }


public Type RequestedServiceType { get; private set; } public Type RequestedServiceType { get; private set; }
public Object GetService(Type serviceType) public Object GetService(Type serviceType)
{ {
this.RequestedServiceType = serviceType; this.RequestedServiceType = serviceType;
return new StubQueryPager(); return new StubQueryPager();
} }
} }


public class StubQueryPager : IQueryPager public class StubQueryPager : IQueryPager
{ {
public IEnumerable<string> ApplyLimit(string sql, int take) public IEnumerable<string> ApplyLimit(string sql, int take)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }


public IEnumerable<string> ApplyPaging(string sql, int skip, int take) public IEnumerable<string> ApplyPaging(string sql, string[] keys, int skip, int take)
{ {
throw new NotImplementedException(); throw new NotImplementedException();
} }
} }


public interface ITestInterface { } public interface ITestInterface { }
[Export(typeof(ITestInterface))] [Export(typeof(ITestInterface))]
public class TestClass : ITestInterface { } public class TestClass : ITestInterface { }
} }
} }
14 changes: 11 additions & 3 deletions Simple.Data.Ado/AdoAdapterQueryRunner.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -151,7 +151,7 @@ private ICommandBuilder[] GetPagedQueryCommandBuilders(ref SimpleQuery query, In
} }
else else
{ {
ApplyPaging(commandBuilders, mainCommandBuilder, skipClause, takeClause, query.Clauses.OfType<WithClause>().Any(), queryPager); ApplyPaging(query, commandBuilders, mainCommandBuilder, skipClause, takeClause, query.Clauses.OfType<WithClause>().Any(), queryPager);
} }
} }
return commandBuilders.ToArray(); return commandBuilders.ToArray();
Expand All @@ -168,7 +168,7 @@ private void DeferPaging(ref SimpleQuery query, ICommandBuilder mainCommandBuild
commandBuilders.Add(commandBuilder); commandBuilders.Add(commandBuilder);
} }


private void ApplyPaging(List<ICommandBuilder> commandBuilders, ICommandBuilder mainCommandBuilder, SkipClause skipClause, TakeClause takeClause, bool hasWithClause, IQueryPager queryPager) private void ApplyPaging(SimpleQuery query, List<ICommandBuilder> commandBuilders, ICommandBuilder mainCommandBuilder, SkipClause skipClause, TakeClause takeClause, bool hasWithClause, IQueryPager queryPager)
{ {
const int maxInt = 2147483646; const int maxInt = 2147483646;


Expand All @@ -179,9 +179,17 @@ private void ApplyPaging(List<ICommandBuilder> commandBuilders, ICommandBuilder
} }
else else
{ {
var table = _adapter.GetSchema().FindTable(query.TableName);
if (table.PrimaryKey == null || table.PrimaryKey.Length == 0)
{
throw new AdoAdapterException("Cannot apply paging to a table with no primary key.");
}
var keys = table.PrimaryKey.AsEnumerable()
.Select(k => string.Format("{0}.{1}", table.QualifiedName, _adapter.GetSchema().QuoteObjectName(k)))
.ToArray();
int skip = skipClause == null ? 0 : skipClause.Count; int skip = skipClause == null ? 0 : skipClause.Count;
int take = takeClause == null ? maxInt : takeClause.Count; int take = takeClause == null ? maxInt : takeClause.Count;
commandTexts = queryPager.ApplyPaging(mainCommandBuilder.Text, skip, take); commandTexts = queryPager.ApplyPaging(mainCommandBuilder.Text, keys, skip, take);
} }


commandBuilders.AddRange( commandBuilders.AddRange(
Expand Down
2 changes: 1 addition & 1 deletion Simple.Data.Ado/IQueryPager.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ namespace Simple.Data.Ado
public interface IQueryPager public interface IQueryPager
{ {
IEnumerable<string> ApplyLimit(string sql, int take); IEnumerable<string> ApplyLimit(string sql, int take);
IEnumerable<string> ApplyPaging(string sql, int skip, int take); IEnumerable<string> ApplyPaging(string sql, string[] keys, int skip, int take);
} }
} }
28 changes: 20 additions & 8 deletions Simple.Data.Ado/Joiner.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -121,12 +121,12 @@ private string MakeJoinText(Table rightTable, string alias, ForeignKey foreignKe
builder.AppendFormat(" JOIN {0}", rightTable.QualifiedName); builder.AppendFormat(" JOIN {0}", rightTable.QualifiedName);
if (!string.IsNullOrWhiteSpace(alias)) builder.Append(" " + _schema.QuoteObjectName(alias)); if (!string.IsNullOrWhiteSpace(alias)) builder.Append(" " + _schema.QuoteObjectName(alias));
builder.Append(" ON ("); builder.Append(" ON (");
builder.Append(FormatJoinExpression(foreignKey, 0, alias)); builder.Append(FormatJoinExpression(foreignKey, 0, rightTable, alias));


for (int i = 1; i < foreignKey.Columns.Length; i++) for (int i = 1; i < foreignKey.Columns.Length; i++)
{ {
builder.Append(" AND "); builder.Append(" AND ");
builder.Append(FormatJoinExpression(foreignKey, i, alias)); builder.Append(FormatJoinExpression(foreignKey, i, rightTable, alias));
} }
builder.Append(")"); builder.Append(")");
return builder.ToString(); return builder.ToString();
Expand Down Expand Up @@ -167,13 +167,25 @@ private SimpleExpression CreateJoinExpression(ObjectReference table, ForeignKey
return masterObjectReference == detailObjectReference; return masterObjectReference == detailObjectReference;
} }


private string FormatJoinExpression(ForeignKey foreignKey, int columnIndex, string alias) private string FormatJoinExpression(ForeignKey foreignKey, int columnIndex, Table rightTable, string alias)
{ {
var leftTable = string.IsNullOrWhiteSpace(alias) if (rightTable.ActualName == foreignKey.MasterTable.Name &&
? _schema.QuoteObjectName(foreignKey.MasterTable) rightTable.Schema == foreignKey.MasterTable.Schema)
: _schema.QuoteObjectName(alias); {
return string.Format("{0}.{1} = {2}.{3}", leftTable, _schema.QuoteObjectName(foreignKey.UniqueColumns[columnIndex]), var rightTableName = string.IsNullOrWhiteSpace(alias)
_schema.QuoteObjectName(foreignKey.DetailTable), _schema.QuoteObjectName(foreignKey.Columns[columnIndex])); ? _schema.QuoteObjectName(foreignKey.MasterTable)
: _schema.QuoteObjectName(alias);
return string.Format("{0}.{1} = {2}.{3}",
rightTableName, _schema.QuoteObjectName(foreignKey.UniqueColumns[columnIndex]),
_schema.QuoteObjectName(foreignKey.DetailTable), _schema.QuoteObjectName(foreignKey.Columns[columnIndex])
);
}

var leftTableName = string.IsNullOrWhiteSpace(alias)
? _schema.QuoteObjectName(foreignKey.DetailTable)
: _schema.QuoteObjectName(alias);
return string.Format("{0}.{1} = {2}.{3}", _schema.QuoteObjectName(foreignKey.MasterTable), _schema.QuoteObjectName(foreignKey.UniqueColumns[columnIndex]),
leftTableName, _schema.QuoteObjectName(foreignKey.Columns[columnIndex]));
} }


private string JoinKeyword private string JoinKeyword
Expand Down
2 changes: 1 addition & 1 deletion Simple.Data.SqlCe40/SqlCe40QueryPager.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public IEnumerable<string> ApplyLimit(string sql, int take)
yield return SelectMatch.Replace(sql, match => match.Value + " TOP(" + take + ") "); yield return SelectMatch.Replace(sql, match => match.Value + " TOP(" + take + ") ");
} }


public IEnumerable<string> ApplyPaging(string sql, int skip, int take) public IEnumerable<string> ApplyPaging(string sql, string[] keys, int skip, int take)
{ {
if (sql.IndexOf("order by", StringComparison.InvariantCultureIgnoreCase) < 0) if (sql.IndexOf("order by", StringComparison.InvariantCultureIgnoreCase) < 0)
{ {
Expand Down
4 changes: 2 additions & 2 deletions Simple.Data.SqlCe40Test/SqlCe40QueryPagerTest.cs
Original file line number Original file line Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ public void ShouldApplyPagingUsingOrderBy()
var expected = new[]{ var expected = new[]{
"select a,b,c from d where a = 1 order by c offset 5 rows fetch next 10 rows only"}; "select a,b,c from d where a = 1 order by c offset 5 rows fetch next 10 rows only"};


var pagedSql = new SqlCe40QueryPager().ApplyPaging(sql, 5, 10); var pagedSql = new SqlCe40QueryPager().ApplyPaging(sql, new string[0], 5, 10);
var modified = pagedSql.Select(x=> Normalize.Replace(x, " ").ToLowerInvariant()); var modified = pagedSql.Select(x=> Normalize.Replace(x, " ").ToLowerInvariant());


Assert.IsTrue(expected.SequenceEqual(modified)); Assert.IsTrue(expected.SequenceEqual(modified));
Expand All @@ -42,7 +42,7 @@ public void ShouldApplyPagingUsingOrderByFirstColumnIfNotAlreadyOrdered()
var expected = new[]{ var expected = new[]{
"select a,b,c from d where a = 1 order by a offset 10 rows fetch next 20 rows only"}; "select a,b,c from d where a = 1 order by a offset 10 rows fetch next 20 rows only"};


var pagedSql = new SqlCe40QueryPager().ApplyPaging(sql, 10, 20); var pagedSql = new SqlCe40QueryPager().ApplyPaging(sql, new string[0], 10, 20);
var modified = pagedSql.Select(x => Normalize.Replace(x, " ").ToLowerInvariant()); var modified = pagedSql.Select(x => Normalize.Replace(x, " ").ToLowerInvariant());


Assert.IsTrue(expected.SequenceEqual(modified)); Assert.IsTrue(expected.SequenceEqual(modified));
Expand Down
Loading

0 comments on commit 54581ae

Please sign in to comment.