Permalink
Browse files

done server streaming

  • Loading branch information...
1 parent 18df4b6 commit 7e7d1d4c15ff6740a949a9ac20d5a16cde7f273b @neuecc committed Dec 11, 2016
@@ -0,0 +1,41 @@
+/*
+ This is a configuration file for the SwitchStartupProject Visual Studio Extension
+ See https://bitbucket.org/thirteen/switchstartupproject/src/tip/Configuration.md
+*/
+{
+ /* Configuration File Version */
+ "Version": 3,
+
+ /* Create an item in the dropdown list for each project in the solution? */
+ "ListAllProjects": true,
+
+ /*
+ Dictionary of named startup project configurations with one or multiple
+ startup projects and optional command line arguments.
+ Example:
+
+ "MultiProjectConfigurations": {
+ "A + B": {
+ "Projects": {
+ "MyProjectA": {},
+ "MyProjectB": {
+ "CommandLineArguments": "1234"
+ }
+ }
+ },
+ "D": {
+ "Projects": {
+ "MyProjectD": {}
+ }
+ }
+ },
+ */
+ "MultiProjectConfigurations": {
+ "Server + Client": {
+ "Projects": {
+ "Sandbox.ConsoleServer": {},
+ "Sandbox.ConsoleClient": {}
+ }
+ }
+ }
+}
@@ -21,6 +21,7 @@ static void Main(string[] args)
UnaryRun(c).GetAwaiter().GetResult();
ClientStreamRun(c).GetAwaiter().GetResult();
+ ServerStreamRun(c).GetAwaiter().GetResult();
}
static async Task UnaryRun(IMyFirstService client)
@@ -60,6 +61,26 @@ static void Main(string[] args)
Console.WriteLine(ex);
}
}
+
+ static async Task ServerStreamRun(IMyFirstService client)
+ {
+ try
+ {
+ var stream = await client.StreamingTwo(10, 20, 3);
+
+ await stream.ResponseStream.ForEachAsync(x =>
+ {
+ Console.WriteLine("ServerStream Response:" + x);
+ });
+
+ var stream2 = client.StreamingTwo2(10, 20, 3);
+ await stream2.ResponseStream.ForEachAsync(x => { });
+ }
+ catch (Exception ex)
+ {
+ Console.WriteLine(ex);
+ }
+ }
}
public class ClientSimu : MagicOnionClientBase<IMyFirstService>, IMyFirstService
@@ -75,6 +96,21 @@ protected ClientSimu(CallInvoker callInvoker) : base(callInvoker)
return Task.FromResult(result);
}
+ public Task<ServerStreamingResult<string>> StreamingTwo(int x, int y, int z)
+ {
+ // throw new NotImplementedException();
+ byte[] request = null; // marshalling
+
+ var callResult = callInvoker.AsyncServerStreamingCall<byte[], byte[]>(null, host, option, request);
+ var result = new ServerStreamingResult<string>(callResult, null); // response marshaller
+ return Task.FromResult(result);
+ }
+
+ public ServerStreamingResult<string> StreamingTwo2(int x, int y, int z)
+ {
+ throw new NotImplementedException();
+ }
+
public Task<UnaryResult<string>> SumAsync(int x, int y)
{
throw new NotImplementedException();
@@ -27,16 +27,41 @@ public UnaryResult<string> SumAsync2(int x, int y)
public async Task<ClientStreamingResult<int, string>> StreamingOne()
{
+ Logger.Debug($"Called StreamingOne");
+
var stream = GetClientStreamingContext<int, string>();
await stream.ForEachAsync(x =>
{
- Console.WriteLine("Client Stream Received:" + x);
-
+ Logger.Debug("Client Stream Received:" + x);
});
return stream.Result("finished");
}
+
+ public async Task<ServerStreamingResult<string>> StreamingTwo(int x, int y, int z)
+ {
+ Logger.Debug($"Called StreamingTwo - x:{x} y:{y} z:{z}");
+
+ var stream = GetServerStreamingContext<string>();
+
+ var acc = 0;
+ for (int i = 0; i < z; i++)
+ {
+ acc = acc + x + y;
+ await stream.WriteAsync(acc.ToString());
+ }
+
+ return stream.Result();
+ }
+
+ public ServerStreamingResult<string> StreamingTwo2(int x, int y, int z)
+ {
+ Logger.Debug($"Called StreamingTwo2 - x:{x} y:{y} z:{z}");
+
+ var stream = GetServerStreamingContext<string>();
+ return stream.Result();
+ }
}
}
@@ -13,5 +13,7 @@ public interface IMyFirstService : IService<IMyFirstService>
UnaryResult<string> SumAsync2(int x, int y);
Task<ClientStreamingResult<int, string>> StreamingOne();
+ Task<ServerStreamingResult<string>> StreamingTwo(int x, int y, int z);
+ ServerStreamingResult<string> StreamingTwo2(int x, int y, int z);
}
}
@@ -0,0 +1,34 @@
+using Grpc.Core;
+using System;
+using System.Collections.Generic;
+using System.Linq;
+using System.Text;
+using System.Threading.Tasks;
+
+namespace MagicOnion
+{
+ public static class AsyncStreamReaderExtensions
+ {
+ public static async Task ForEachAsync<T>(this IAsyncStreamReader<T> stream, Action<T> action)
+ {
+ using (stream)
+ {
+ while (await stream.MoveNext())
+ {
+ action(stream.Current);
+ }
+ }
+ }
+
+ public static async Task ForEachAsync<T>(this IAsyncStreamReader<T> stream, Func<T, Task> actionAction)
+ {
+ using (stream)
+ {
+ while (await stream.MoveNext())
+ {
+ await actionAction(stream.Current);
+ }
+ }
+ }
+ }
+}
@@ -124,7 +124,7 @@ static void DefineStaticConstructor(TypeBuilder typeBuilder, Type resolverType,
il.Emit(OpCodes.Newobj, bytesMethod.GetConstructors()[0]);
il.Emit(OpCodes.Stsfld, def.FieldMethod);
- if (def.MethodType == MethodType.Unary)
+ if (def.MethodType == MethodType.Unary || def.MethodType == MethodType.ServerStreaming)
{
il.Emit(OpCodes.Ldtoken, resolverType);
il.Emit(OpCodes.Call, getTypeFromHandle);
@@ -296,8 +296,16 @@ static void DefineMethods(TypeBuilder typeBuilder, Type resolverType, Type inter
switch (def.MethodType)
{
case MethodType.Unary:
+ case MethodType.ServerStreaming:
il.DeclareLocal(typeof(byte[])); // request
- il.DeclareLocal(typeof(AsyncUnaryCall<byte[]>)); // callResult
+ if (def.MethodType == MethodType.Unary)
+ {
+ il.DeclareLocal(typeof(AsyncUnaryCall<byte[]>)); // callResult
+ }
+ else
+ {
+ il.DeclareLocal(typeof(AsyncServerStreamingCall<byte[]>));
+ }
il.Emit(OpCodes.Ldsfld, def.FieldRequestMarshaller);
il.Emit(OpCodes.Callvirt, def.FieldRequestMarshaller.FieldType.GetProperty("Serializer").GetGetMethod());
@@ -328,12 +336,26 @@ static void DefineMethods(TypeBuilder typeBuilder, Type resolverType, Type inter
il.Emit(OpCodes.Ldarg_0);
il.Emit(OpCodes.Ldfld, optionField);
il.Emit(OpCodes.Ldloc_0);
- il.Emit(OpCodes.Callvirt, typeof(CallInvoker).GetMethod("AsyncUnaryCall").MakeGenericMethod(typeof(byte[]), typeof(byte[])));
+ if (def.MethodType == MethodType.Unary)
+ {
+ il.Emit(OpCodes.Callvirt, typeof(CallInvoker).GetMethod("AsyncUnaryCall").MakeGenericMethod(typeof(byte[]), typeof(byte[])));
+ }
+ else
+ {
+ il.Emit(OpCodes.Callvirt, typeof(CallInvoker).GetMethod("AsyncServerStreamingCall").MakeGenericMethod(typeof(byte[]), typeof(byte[])));
+ }
il.Emit(OpCodes.Stloc_1);
il.Emit(OpCodes.Ldloc_1);
il.Emit(OpCodes.Ldsfld, def.FieldResponseMarshaller);
- il.Emit(OpCodes.Newobj, typeof(UnaryResult<>).MakeGenericType(def.ResponseType).GetConstructors()[0]);
+ if (def.MethodType == MethodType.Unary)
+ {
+ il.Emit(OpCodes.Newobj, typeof(UnaryResult<>).MakeGenericType(def.ResponseType).GetConstructors()[0]);
+ }
+ else
+ {
+ il.Emit(OpCodes.Newobj, typeof(ServerStreamingResult<>).MakeGenericType(def.ResponseType).GetConstructors()[0]);
+ }
if (def.ResponseIsTask)
{
il.Emit(OpCodes.Call, typeof(Task).GetMethod("FromResult").MakeGenericMethod(typeof(UnaryResult<>).MakeGenericType(def.ResponseType)));
@@ -362,8 +384,6 @@ static void DefineMethods(TypeBuilder typeBuilder, Type resolverType, Type inter
il.Emit(OpCodes.Call, typeof(Task).GetMethod("FromResult").MakeGenericMethod(typeof(ClientStreamingResult<,>).MakeGenericType(def.RequestType, def.ResponseType)));
}
break;
- case MethodType.ServerStreaming:
- break;
case MethodType.DuplexStreaming:
break;
default:
@@ -407,6 +427,12 @@ static Type UnwrapResponseType(MethodDefinition def, out MethodType methodType,
requestTypeIfExists = genArgs[0];
return genArgs[1];
}
+ else if (returnType == typeof(ServerStreamingResult<>))
+ {
+ methodType = MethodType.ServerStreaming;
+ requestTypeIfExists = null;
+ return t.GetGenericArguments()[0];
+ }
else
{
//methodType = MethodType.Unary; // TODO:others...
@@ -58,6 +58,7 @@
</Reference>
</ItemGroup>
<ItemGroup>
+ <Compile Include="AsyncStreamReaderExtensions.cs" />
<Compile Include="ClientStreamingResult.cs" />
<Compile Include="Client\DynamicClientBuilder.cs" />
<Compile Include="Client\MagicOnionClient.cs" />
@@ -71,6 +72,7 @@
<Compile Include="MagicOnionMarshallers.cs" />
<Compile Include="Nil.cs" />
<Compile Include="Properties\AssemblyInfo.cs" />
+ <Compile Include="ServerStreamingResult.cs" />
<Compile Include="Server\MagicOnionEngine.cs" />
<Compile Include="Server\MagicOnionOptions.cs" />
<Compile Include="Server\MethodHandler.cs" />
@@ -11,7 +11,7 @@
namespace MagicOnion
{
- internal class MarshallingAsyncStreamReader<TRequest, TResponse> : IAsyncStreamReader<TRequest>
+ internal class MarshallingAsyncStreamReader<TRequest> : IAsyncStreamReader<TRequest>
{
readonly IAsyncStreamReader<byte[]> inner;
readonly Marshaller<TRequest> marshaller;
@@ -43,6 +43,37 @@ public void Dispose()
}
}
+ internal class MarshallingAsyncStreamWriter<T> : IAsyncStreamWriter<T>
+ {
+ readonly IAsyncStreamWriter<byte[]> inner;
+ readonly Marshaller<T> marshaller;
+
+ public MarshallingAsyncStreamWriter(IAsyncStreamWriter<byte[]> inner, Marshaller<T> marshaller)
+ {
+ this.inner = inner;
+ this.marshaller = marshaller;
+ }
+
+ public WriteOptions WriteOptions
+ {
+ get
+ {
+ return inner.WriteOptions;
+ }
+
+ set
+ {
+ inner.WriteOptions = value;
+ }
+ }
+
+ public Task WriteAsync(T message)
+ {
+ var bytes = marshaller.Serializer(message);
+ return inner.WriteAsync(bytes);
+ }
+ }
+
internal class MarshallingServerStreamWriter<TResponse> : IServerStreamWriter<TResponse>
{
readonly IServerStreamWriter<byte[]> inner;
@@ -74,7 +105,6 @@ public Task WriteAsync(TResponse message)
}
}
-
internal class MarshallingClientStreamWriter<T> : IClientStreamWriter<T>
{
readonly IClientStreamWriter<byte[]> inner;
Oops, something went wrong.

0 comments on commit 7e7d1d4

Please sign in to comment.