Skip to content
This repository has been archived by the owner on Feb 12, 2023. It is now read-only.

Commit

Permalink
feat(Execution): better subscription model with example
Browse files Browse the repository at this point in the history
  • Loading branch information
Marek Magdziak committed Jun 18, 2017
1 parent 04dd8ab commit 6c103c0
Show file tree
Hide file tree
Showing 29 changed files with 2,355 additions and 70 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
using GraphQLCore.Type;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace GraphQLCore.GraphiQLExample.Middlewares.GraphQLWs
{
public class GraphQLInitHandler : IGraphQLWsHandler
{
public async Task Handle(WebSocket socket, string clientId, IGraphQLSchema schema, WsInputObject input)
{
var dataString = JsonConvert.SerializeObject( new
{
type = "init_success"
});

var resultBuffer = System.Text.Encoding.UTF8.GetBytes(dataString);

await socket.SendAsync(
new ArraySegment<byte>(resultBuffer), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
using GraphQLCore.Type;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace GraphQLCore.GraphiQLExample.Middlewares.GraphQLWs
{
public class GraphQLSubscriptionEndHandler : IGraphQLWsHandler
{
public async Task Handle(WebSocket socket, string clientId, IGraphQLSchema schema, WsInputObject input)
{
await Task.Yield();

if (input.Id.HasValue)
{
schema.Unsubscribe(clientId, input.Id.Value);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
using GraphQLCore.Exceptions;
using GraphQLCore.Type;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace GraphQLCore.GraphiQLExample.Middlewares.GraphQLWs
{
public class GraphQLSubscriptionStartHandler : IGraphQLWsHandler
{
public async Task Handle(WebSocket socket, string clientId, IGraphQLSchema schema, WsInputObject input)
{
try
{
await Subscribe(socket, clientId, schema, input);
}
catch (GraphQLValidationException ex)
{
await SendResponseToGraphQLValidationException(socket, input.Id.Value, ex);
}
catch (GraphQLException ex)
{
await SendResponseToGraphQLException(socket, input.Id.Value, ex);
}
catch (Exception ex)
{
await SendReponseToException(socket, input.Id.Value, ex);
}
}

private static async Task Subscribe(WebSocket socket, string clientId, IGraphQLSchema schema, WsInputObject input)
{
var data = schema.Execute(input.Query, null, null, clientId, input.Id.Value);

var dataString = JsonConvert.SerializeObject(new { id = input.Id, type = "subscription_success" });
var resultBuffer = System.Text.Encoding.UTF8.GetBytes(dataString);

await socket.SendAsync(
new ArraySegment<byte>(resultBuffer), WebSocketMessageType.Text, true, CancellationToken.None);
}

private static async Task SendResponseToGraphQLValidationException(WebSocket socket, int id, GraphQLValidationException ex)
{
var dataString = JsonConvert.SerializeObject(new
{
id,
type = "subscription_fail",
payload = new
{
errors = ex.Errors
}
});

await SendResponse(socket, dataString);
}

private static async Task SendResponseToGraphQLException(WebSocket socket, int id, GraphQLException ex)
{
var dataString = JsonConvert.SerializeObject(new
{
id,
type = "subscription_fail",
payload = new
{
errors = new dynamic[] { new { message = ex.Message + "\n" + ex.StackTrace } }
}
});

await SendResponse(socket, dataString);
}

private static async Task SendReponseToException(WebSocket socket, int id, Exception ex)
{
var dataString = JsonConvert.SerializeObject( new
{
id,
type = "subscription_fail",
payload = new
{
errors = new dynamic[] { new { message = ex.Message + "\n" + ex.StackTrace } }
}
});

await SendResponse(socket, dataString);
}

private static async Task SendResponse(WebSocket socket, string dataString)
{
var resultBuffer = System.Text.Encoding.UTF8.GetBytes(dataString);

await socket.SendAsync(
new ArraySegment<byte>(resultBuffer), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using GraphQLCore.Type;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Threading.Tasks;

namespace GraphQLCore.GraphiQLExample.Middlewares.GraphQLWs
{
public interface IGraphQLWsHandler
{
Task Handle(WebSocket socket, string clientId, IGraphQLSchema schema, WsInputObject input);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading.Tasks;

namespace GraphQLCore.GraphiQLExample.Middlewares.GraphQLWs
{
public class WsInputObject
{
[JsonProperty(PropertyName = "type")]
public string Type { get; set; }

[JsonProperty(PropertyName = "query")]
public string Query { get; set; }

[JsonProperty(PropertyName = "id")]
public int? Id { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,156 @@
using GraphQLCore.Exceptions;
using GraphQLCore.GraphiQLExample.Middlewares.GraphQLWs;
using GraphQLCore.Type;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net.WebSockets;
using System.Threading;
using System.Threading.Tasks;

namespace GraphQLCore.GraphiQLExample.Middlewares
{
public static class GraphQLWsMiddleware
{
private static Dictionary<string, IGraphQLWsHandler> handlers = new Dictionary<string, IGraphQLWsHandler>()
{
{ "init", new GraphQLInitHandler() },
{ "subscription_start", new GraphQLSubscriptionStartHandler() },
{ "subscription_end", new GraphQLSubscriptionEndHandler() }
};

public static void AddGraphQLWs(this IApplicationBuilder app)
{
app.Use(Middleware);
}

private static async Task Middleware(HttpContext context, Func<Task> next)
{
if (context.Request.Path == "/graphql")
{
if (context.WebSockets.IsWebSocketRequest)
{
var webSocket = await context.WebSockets.AcceptWebSocketAsync();

await StartCommunication(context, webSocket);
}
else
{
context.Response.StatusCode = 400;
}
}
else
{
await next();
}
}

private static async Task StartCommunication(HttpContext context, WebSocket webSocket)
{
var clientId = GenerateClientId();

var onDataReceived = GetCallback(webSocket, clientId);

var schema = GetSchema(context, onDataReceived);

var result = await MainLoop(webSocket, clientId, schema);

schema.Unsubscribe(clientId);
schema.OnSubscriptionMessageReceived -= onDataReceived;

await webSocket.CloseAsync(result.CloseStatus.Value, result.CloseStatusDescription, CancellationToken.None);
}

private static string GenerateClientId()
{
return Guid.NewGuid().ToString();
}

private static IGraphQLSchema GetSchema(HttpContext context, SubscriptionMessageReceived received)
{
var schema = context.RequestServices.GetService(typeof(IGraphQLSchema)) as IGraphQLSchema;
schema.OnSubscriptionMessageReceived += received;
return schema;
}

private static async Task<WebSocketReceiveResult> MainLoop(WebSocket webSocket, string clientId, IGraphQLSchema schema)
{
var buffer = new byte[1024 * 4];
var result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);

GetKeepAliveTask(webSocket, result);

while (!result.CloseStatus.HasValue)
{
var text = System.Text.Encoding.UTF8.GetString(buffer);
var input = JsonConvert.DeserializeObject<WsInputObject>(text);

if (handlers.ContainsKey(input.Type))
{
await handlers[input.Type].Handle(webSocket, clientId, schema, input);
}

buffer = new byte[1024 * 4];
result = await webSocket.ReceiveAsync(new ArraySegment<byte>(buffer), CancellationToken.None);
}

return result;
}

private static void GetKeepAliveTask(WebSocket webSocket, WebSocketReceiveResult result)
{
var keepAliveTask = Task.Run(async () =>
{
await Task.Yield();
while (!result.CloseStatus.HasValue)
{
await Task.Delay(1000);
var dataString = JsonConvert.SerializeObject(new
{
type = "keepalive"
});
var resultBuffer = System.Text.Encoding.UTF8.GetBytes(dataString);
await webSocket.SendAsync(
new ArraySegment<byte>(resultBuffer), WebSocketMessageType.Text, true, CancellationToken.None);
}
});
}

private static SubscriptionMessageReceived GetCallback(WebSocket webSocket, string clientId)
{
return async (string msgClientId, int subscriptionId, dynamic subscriptionData) =>
{
try
{
if (clientId == msgClientId)
{
var ds = JsonConvert.SerializeObject(new
{
id = subscriptionId,
type = "subscription_data",
payload = new
{
data = subscriptionData
}
});
var db = System.Text.Encoding.UTF8.GetBytes(ds);
await webSocket.SendAsync(
new ArraySegment<byte>(db), WebSocketMessageType.Text, true, CancellationToken.None);
}
}
catch (Exception ex)
{
}
};
}
}
}
4 changes: 3 additions & 1 deletion examples/GraphQLCore.GraphiQLExample/Schema/Mutation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ public class Mutation : GraphQLObjectType

public Mutation() : base("Mutation", "")
{
this.Field("addDroid", (NonNullable<Droid> droid) => this.CreateAndGet(droid));
this.Field("addDroid", (NonNullable<Droid> droid) => this.CreateAndGet(droid))
.OnChannel("characters")
.OnChannel("droid");
}

private Droid CreateAndGet(Droid droid)
Expand Down
3 changes: 3 additions & 0 deletions examples/GraphQLCore.GraphiQLExample/Schema/StarWarsSchema.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ public StarWarsSchema()
{
var rootQuery = new Query();
var rootMutation = new Mutation();
var subscriptionType = new Subscription();

this.AddKnownType(new GraphQLCharacterUnion());
this.AddKnownType(new GraphQLCharacterInterface());
Expand All @@ -17,9 +18,11 @@ public StarWarsSchema()
this.AddKnownType(new GraphQLDroidInputObject());
this.AddKnownType(rootQuery);
this.AddKnownType(rootMutation);
this.AddKnownType(subscriptionType);

this.Query(rootQuery);
this.Mutation(rootMutation);
this.Subscription(subscriptionType);
}
}
}
Loading

0 comments on commit 6c103c0

Please sign in to comment.