Skip to content

Commit

Permalink
Fixing FirstOrDefaultAsync for compiled queries
Browse files Browse the repository at this point in the history
  • Loading branch information
sebastienros committed Sep 18, 2018
1 parent 4798e0d commit 877a9cb
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 17 deletions.
8 changes: 8 additions & 0 deletions src/YesSql.Core/Services/DefaultQuery.cs
Expand Up @@ -844,6 +844,14 @@ protected async Task<T> FirstOrDefaultImpl()
// Commit any pending changes before doing a query (auto-flush)
await _query._session.CommitAsync();

if (_query._compiledQuery != null && _query._queryState._parameterBindings != null)
{
foreach (var binding in _query._queryState._parameterBindings)
{
binding(_query._compiledQuery, _query._queryState._sqlBuilder);
}
}

_query.Page(1, 0);

if (typeof(IIndex).IsAssignableFrom(typeof(T)))
Expand Down
85 changes: 68 additions & 17 deletions src/YesSql.Core/Session.cs
Expand Up @@ -34,12 +34,18 @@ public class Session : ISession
private ISqlDialect _dialect;
protected bool _cancel;
protected List<IIndexProvider> _indexes;
private static ConcurrentDictionary<string, ConcurrentDictionary<Type, object>> _compiledQueries = new ConcurrentDictionary<string, ConcurrentDictionary<Type, object>>();
protected string _tablePrefix;

private static object _synLock = new object();
private static ConcurrentDictionary<Type, ConcurrentDictionary<Type, object>> _compiledQueryCaches;
private static ConcurrentDictionary<Type, object> _compiledQueryCache;
private static Type _compiledQueryConnectionType;

public Session(Store store, IsolationLevel isolationLevel)
{
_store = store;
_isolationLevel = isolationLevel;
_tablePrefix = _store.Configuration.TablePrefix;
}

[DebuggerBrowsable(DebuggerBrowsableState.Never)]
Expand Down Expand Up @@ -160,7 +166,7 @@ private async Task SaveEntityAsync(object entity)

doc.Content = Store.Configuration.ContentSerializer.Serialize(entity);

await new CreateDocumentCommand(doc, _store.Configuration.TablePrefix).ExecuteAsync(_connection, _transaction, _dialect);
await new CreateDocumentCommand(doc, _tablePrefix).ExecuteAsync(_connection, _transaction, _dialect);

await MapNew(doc, entity);
}
Expand Down Expand Up @@ -216,7 +222,7 @@ private async Task<Document> GetDocumentByIdAsync(int id)

var documentTable = CollectionHelper.Current.GetPrefixedName(YesSql.Store.DocumentTable);

var command = "select * from " + _dialect.QuoteForTableName(_store.Configuration.TablePrefix + documentTable) + " where " + _dialect.QuoteForColumnName("Id") + " = @Id";
var command = "select * from " + _dialect.QuoteForTableName(_tablePrefix + documentTable) + " where " + _dialect.QuoteForColumnName("Id") + " = @Id";
var key = new WorkerQueryKey(nameof(GetDocumentByIdAsync), new [] { id });
var result = await _store.ProduceAsync(key, () => _connection.QueryAsync<Document>(command, new { Id = id }, _transaction));

Expand Down Expand Up @@ -264,7 +270,7 @@ private async Task DeleteEntityAsync(object obj)
await MapDeleted(doc, obj);

// The command needs to come after any index deletiong because of the database constraints
_commands.Add(new DeleteDocumentCommand(doc, _store.Configuration.TablePrefix));
_commands.Add(new DeleteDocumentCommand(doc, _tablePrefix));
}
}
}
Expand All @@ -284,7 +290,7 @@ private async Task DeleteEntityAsync(object obj)
Demand();

var documentTable = CollectionHelper.Current.GetPrefixedName(YesSql.Store.DocumentTable);
var command = "select * from " + _dialect.QuoteForTableName(_store.Configuration.TablePrefix + documentTable) + " where " + _dialect.QuoteForColumnName("Id") + " " + _dialect.InOperator("@Ids");
var command = "select * from " + _dialect.QuoteForTableName(_tablePrefix + documentTable) + " where " + _dialect.QuoteForColumnName("Id") + " " + _dialect.InOperator("@Ids");

var key = new WorkerQueryKey(nameof(GetAsync), ids);
var documents = await _store.ProduceAsync(key, () =>
Expand Down Expand Up @@ -355,19 +361,64 @@ public IQuery Query()
{
Demand();

return new DefaultQuery(_connection, _transaction, this, _store.Configuration.TablePrefix);
return new DefaultQuery(_connection, _transaction, this, _tablePrefix);
}

public IQuery<T> ExecuteQuery<T>(ICompiledQuery<T> compiledQuery) where T : class
{
// There is a cache of queries per dialect
var connectionName = _connection.GetType().Name.ToLower();
if (compiledQuery == null)
{
throw new ArgumentNullException(nameof(compiledQuery));
}

ConcurrentDictionary<Type, object> cache;

var cache = _compiledQueries.GetOrAdd(connectionName, name => new ConcurrentDictionary<Type, object>());
// There is a cache of queries per dialect. We still optimize for the common case
// which is that a single connection type is used.
var connectionType = _connection.GetType();

if (_compiledQueryCache == null)
{
lock(_synLock)
{
if (_compiledQueryCache == null)
{
_compiledQueryCache = new ConcurrentDictionary<Type, object>();
_compiledQueryConnectionType = connectionType;
}
}

cache = _compiledQueryCache;
}
else
{
if (_compiledQueryConnectionType == connectionType)
{
// Most common case: single connection type, already initialized
cache = _compiledQueryCache;
}
else
{
// We are using multiple connection types
if (_compiledQueryCaches == null)
{
// The caches are not initialized
lock (_synLock)
{
if (_compiledQueryCaches == null)
{
_compiledQueryCaches = new ConcurrentDictionary<Type, ConcurrentDictionary<Type, object>>();
}
}
}

cache = _compiledQueryCaches.GetOrAdd(connectionType, name => new ConcurrentDictionary<Type, object>());
}
}

var query = (DefaultQuery.Query<T>)cache.GetOrAdd(compiledQuery.GetType(), t => compiledQuery.Query().Compile().Invoke(this.Query().For<T>(false)));
var queryState = query._query._queryState;
IQuery newQuery = new DefaultQuery(_connection, _transaction, this, _store.Configuration.TablePrefix, queryState, compiledQuery);
IQuery newQuery = new DefaultQuery(_connection, _transaction, this, _tablePrefix, queryState, compiledQuery);
return newQuery.For<T>();
}

Expand Down Expand Up @@ -627,7 +678,7 @@ private async Task ReduceAsync()
{
if (index == null)
{
_commands.Add(new DeleteReduceIndexCommand(dbIndex, _store.Configuration.TablePrefix));
_commands.Add(new DeleteReduceIndexCommand(dbIndex, _tablePrefix));
}
else
{
Expand All @@ -638,15 +689,15 @@ private async Task ReduceAsync()
deletedDocumentIds = deletedDocumentIds.Where(x => !common.Contains(x)).ToArray();

// Update updated, new and deleted linked documents
_commands.Add(new UpdateIndexCommand(index, addedDocumentIds, deletedDocumentIds, _store.Configuration.TablePrefix));
_commands.Add(new UpdateIndexCommand(index, addedDocumentIds, deletedDocumentIds, _tablePrefix));
}
}
else
{
if (index != null)
{
// The index is new
_commands.Add(new CreateIndexCommand(index, addedDocumentIds, _store.Configuration.TablePrefix));
_commands.Add(new CreateIndexCommand(index, addedDocumentIds, _tablePrefix));
}
}
}
Expand All @@ -657,7 +708,7 @@ private async Task<ReduceIndex> ReduceForAsync(IndexDescriptor descriptor, objec
{
Demand();

var name = _store.Configuration.TablePrefix + descriptor.IndexType.Name;
var name = _tablePrefix + descriptor.IndexType.Name;
var sql = "select * from " + _dialect.QuoteForTableName(name) + " where " + _dialect.QuoteForColumnName(descriptor.GroupKey.Name) + " = @currentKey";

var index = await _connection.QueryAsync(descriptor.IndexType, sql, new { currentKey }, _transaction);
Expand Down Expand Up @@ -729,11 +780,11 @@ private async Task MapNew(Document document, object obj)
{
if (index.Id == 0)
{
_commands.Add(new CreateIndexCommand(index, Enumerable.Empty<int>(), _store.Configuration.TablePrefix));
_commands.Add(new CreateIndexCommand(index, Enumerable.Empty<int>(), _tablePrefix));
}
else
{
_commands.Add(new UpdateIndexCommand(index, Enumerable.Empty<int>(), Enumerable.Empty<int>(), _store.Configuration.TablePrefix));
_commands.Add(new UpdateIndexCommand(index, Enumerable.Empty<int>(), Enumerable.Empty<int>(), _tablePrefix));
}
}
else
Expand All @@ -760,7 +811,7 @@ private async Task MapDeleted(Document document, object obj)
// If the mapped elements are not meant to be reduced, delete
if (descriptor.Reduce == null || descriptor.Delete == null)
{
_commands.Add(new DeleteMapIndexCommand(descriptor.IndexType, document.Id, _store.Configuration.TablePrefix, _dialect));
_commands.Add(new DeleteMapIndexCommand(descriptor.IndexType, document.Id, _tablePrefix, _dialect));
}
else
{
Expand Down
6 changes: 6 additions & 0 deletions test/YesSql.Tests/CoreTests.cs
Expand Up @@ -654,6 +654,12 @@ public async Task ShouldQueryWithCompiledQueries()
Assert.Equal(2, (await session.ExecuteQuery(new PersonByNameOrAgeQuery(12, null)).ListAsync()).Count());
Assert.Empty(await session.ExecuteQuery(new PersonByNameOrAgeQuery(10, null)).ListAsync());
}

// FirstOrDefaultAsync
using (var session = _store.CreateSession())
{
Assert.Equal("Bill", (await session.ExecuteQuery(new PersonByNameOrAgeQuery(50, null)).FirstOrDefaultAsync()).Firstname);
}
}

[Fact]
Expand Down

0 comments on commit 877a9cb

Please sign in to comment.