Skip to content
This repository has been archived by the owner on Feb 12, 2023. It is now read-only.

Commit

Permalink
feat(Execution): asynchronous query execution
Browse files Browse the repository at this point in the history
  • Loading branch information
Marek Magdziak committed Jun 12, 2017
1 parent 4978fbf commit d219fe6
Show file tree
Hide file tree
Showing 9 changed files with 419 additions and 151 deletions.
7 changes: 4 additions & 3 deletions src/GraphQLCore/Events/IEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,16 @@
using System.Collections.Generic;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;

namespace GraphQLCore.Events
{
public delegate void MessageReceived(OnMessageReceivedEventArgs args);
public delegate Task MessageReceived(OnMessageReceivedEventArgs args);

public interface IEventBus
{
event MessageReceived OnMessageReceived;
void Publish(object data, string channel);
void Subscribe(EventBusSubscription eventBusSubscription);
Task Publish(object data, string channel);
Task Subscribe(EventBusSubscription eventBusSubscription);
}
}
9 changes: 6 additions & 3 deletions src/GraphQLCore/Events/InMemoryEventBus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading.Tasks;

public class InMemoryEventBus : IEventBus
{
Expand All @@ -16,7 +17,7 @@ public InMemoryEventBus()
this.subscriptions = new List<EventBusSubscription>();
}

public void Publish(object data, string channel)
public async Task Publish(object data, string channel)
{
if (this.OnMessageReceived == null)
return;
Expand All @@ -25,7 +26,7 @@ public void Publish(object data, string channel)
{
if ((bool)subscription.Filter.Compile().DynamicInvoke(data))
{
this.OnMessageReceived(new OnMessageReceivedEventArgs()
await this.OnMessageReceived(new OnMessageReceivedEventArgs()
{
ClientId = subscription.ClientId,
Channel = subscription.Channel,
Expand All @@ -35,8 +36,10 @@ public void Publish(object data, string channel)
}
}

public void Subscribe(EventBusSubscription subscription)
public async Task Subscribe(EventBusSubscription subscription)
{
await Task.Yield();

if (!this.subscriptions.Any(e =>
e.ClientId == subscription.ClientId &&
e.Filter.ToString() == e.Filter.ToString() &&
Expand Down
87 changes: 46 additions & 41 deletions src/GraphQLCore/Execution/ExecutionManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
using System.Dynamic;
using System.Linq;
using System.Linq.Expressions;
using System.Threading.Tasks;
using Type;
using Type.Introspection;
using Validation;
Expand Down Expand Up @@ -43,12 +44,12 @@ public void Dispose()
{
}

public dynamic Execute()
public async Task<dynamic> ExecuteAsync()
{
return this.Execute(null);
return await this.ExecuteAsync(null);
}

public dynamic Execute(string operationToExecute)
public async Task<dynamic> ExecuteAsync(string operationToExecute)
{
foreach (var definition in this.ast.Definitions)
this.ResolveDefinition(definition, operationToExecute);
Expand All @@ -65,9 +66,11 @@ public dynamic Execute(string operationToExecute)
var operationType = this.GetOperationRootType();

if (this.Operation.Operation == OperationType.Subscription)
return this.ComposeResultForSubscriptions(operationType, this.Operation);
else
return this.ComposeResultForQueryAndMutation(operationType, this.Operation);
return await this.ComposeResultForSubscriptions(operationType, this.Operation);
else if (this.Operation.Operation == OperationType.Query)
return await this.ComposeResultForQuery(operationType, this.Operation);
else //Mutation
return await this.ComposeResultForMutation(operationType, this.Operation);
}

private void ValidateAstAndThrowErrorWhenFaulty()
Expand Down Expand Up @@ -114,11 +117,11 @@ private IValidationRule[] GetValidationRules()
};
}

private void AppendIntrospectionInfo(
private async Task AppendIntrospectionInfo(
FieldScope scope, Dictionary<string, IList<GraphQLFieldSelection>> fields, dynamic resultObject)
{
var introspectedSchema = this.IntrospectSchemaIfRequested(scope, fields);
var introspectedField = this.IntrospectTypeIfRequested(scope, fields);
var introspectedSchema = await this.IntrospectSchemaIfRequested(scope, fields);
var introspectedField = await this.IntrospectTypeIfRequested(scope, fields);

if (introspectedSchema != null)
resultObject.__schema = introspectedSchema;
Expand All @@ -127,28 +130,44 @@ private IValidationRule[] GetValidationRules()
resultObject.__type = introspectedField;
}

public dynamic ComposeResultForSubscriptions(GraphQLComplexType type, GraphQLOperationDefinition operationDefinition)
public async Task<dynamic> ComposeResultForSubscriptions(GraphQLComplexType type, GraphQLOperationDefinition operationDefinition)
{
var context = this.CreateExecutionContext(operationDefinition);

var scope = new FieldScope(context, type, null);

return this.ProcessSubscriptions(
return await this.ProcessSubscriptions(
(GraphQLSubscriptionType)type,
context.FieldCollector,
scope);
}

public dynamic ComposeResultForQueryAndMutation(GraphQLComplexType type, GraphQLOperationDefinition operationDefinition)
public async Task<dynamic> ComposeResultForQuery(
GraphQLComplexType type, GraphQLOperationDefinition operationDefinition)
{
var context = this.CreateExecutionContext(operationDefinition);
var scope = new FieldScope(context, type, null);

return this.ResolveQueryAndMutationResult(
type,
operationDefinition.SelectionSet,
context.FieldCollector,
scope);
var fields = context.FieldCollector.CollectFields(type, operationDefinition.SelectionSet);
var resultObject = await scope.GetObject(fields);

await this.AppendIntrospectionInfo(scope, fields, resultObject);

return resultObject;
}

public async Task<dynamic> ComposeResultForMutation(
GraphQLComplexType type, GraphQLOperationDefinition operationDefinition)
{
var context = this.CreateExecutionContext(operationDefinition);
var scope = new FieldScope(context, type, null);

var fields = context.FieldCollector.CollectFields(type, operationDefinition.SelectionSet);
var resultObject = await scope.GetObjectSynchronously(fields);

await this.AppendIntrospectionInfo(scope, fields, resultObject);

return resultObject;
}

private ExecutionContext CreateExecutionContext(GraphQLOperationDefinition operationDefinition)
Expand All @@ -169,7 +188,7 @@ private ExecutionContext CreateExecutionContext(GraphQLOperationDefinition opera
};
}

private ExpandoObject ProcessSubscriptions(
private async Task<ExpandoObject> ProcessSubscriptions(
GraphQLSubscriptionType type,
IFieldCollector fieldCollector,
FieldScope scope)
Expand All @@ -180,7 +199,7 @@ private ExecutionContext CreateExecutionContext(GraphQLOperationDefinition opera

foreach (var field in fields)
{
var subscriptionId = this.RegisterSubscription(
var subscriptionId = await this.RegisterSubscription(
field.Value.Single(),
type,
this.ast,
Expand All @@ -192,7 +211,7 @@ private ExecutionContext CreateExecutionContext(GraphQLOperationDefinition opera
return result;
}

private long RegisterSubscription(
private async Task<long> RegisterSubscription(
GraphQLFieldSelection fieldSelection,
GraphQLSubscriptionType type,
GraphQLDocument document,
Expand All @@ -201,9 +220,9 @@ private ExecutionContext CreateExecutionContext(GraphQLOperationDefinition opera
var fieldInfo = type.GetFieldInfo(fieldSelection.Name.Value) as GraphQLSubscriptionTypeFieldInfo;

Expression<Func<object, bool>> filter
= entity => (bool)scope.InvokeWithArguments(fieldSelection.Arguments.ToList(), fieldInfo.Filter, entity);
= entity => (bool)scope.InvokeWithArgumentsSync(fieldSelection.Arguments.ToList(), fieldInfo.Filter, entity);

type.EventBus.Subscribe(EventBusSubscription.Create(
await type.EventBus.Subscribe(EventBusSubscription.Create(
fieldInfo.Channel,
Guid.NewGuid().ToString(),
this.Operation.Name.Value,
Expand All @@ -214,20 +233,6 @@ private ExecutionContext CreateExecutionContext(GraphQLOperationDefinition opera
return 5456;
}

private dynamic ResolveQueryAndMutationResult(
GraphQLComplexType type,
GraphQLSelectionSet selectionSet,
IFieldCollector fieldCollector,
FieldScope scope)
{
var fields = fieldCollector.CollectFields(type, selectionSet);
var resultObject = scope.GetObject(fields);

this.AppendIntrospectionInfo(scope, fields, resultObject);

return resultObject;
}

private VariableResolver CreateVariableResolver()
{
return new VariableResolver(
Expand Down Expand Up @@ -266,15 +271,15 @@ private GraphQLComplexType GetOperationRootType()
return (string name) => this.graphQLSchema.IntrospectType(name);
}

private object IntrospectSchemaIfRequested(
private async Task<object> IntrospectSchemaIfRequested(
FieldScope scope, IDictionary<string, IList<GraphQLFieldSelection>> fields)
{
if (fields.ContainsKey("__schema"))
{
var field = fields["__schema"].Single();
fields.Remove("__schema");

return scope.CompleteValue(
return await scope.CompleteValue(
this.graphQLSchema.IntrospectedSchema,
this.graphQLSchema.IntrospectedSchema.GetType(),
field,
Expand All @@ -284,19 +289,19 @@ private GraphQLComplexType GetOperationRootType()
return null;
}

private object IntrospectTypeIfRequested(
private async Task<object> IntrospectTypeIfRequested(
FieldScope scope, IDictionary<string, IList<GraphQLFieldSelection>> fields)
{
if (fields.ContainsKey("__type"))
{
var field = fields["__type"].Single();
fields.Remove("__type");

var value = scope.InvokeWithArguments(
var value = await scope.InvokeWithArguments(
field.Arguments.ToList(),
this.GetTypeIntrospectionLambda());

return scope.CompleteValue(
return await scope.CompleteValue(
value,
value.GetType(),
field,
Expand Down
Loading

0 comments on commit d219fe6

Please sign in to comment.