Navigation Menu

Skip to content

Commit

Permalink
Make use of async thread pool in SendPushToUser and SendKickToUser as…
Browse files Browse the repository at this point in the history
… well
  • Loading branch information
felipejfc committed Jul 29, 2019
1 parent 5bd9bcf commit ceb94a8
Showing 1 changed file with 60 additions and 54 deletions.
114 changes: 60 additions & 54 deletions pitaya-sharp/NPitaya/src/PitayaCluster.API.cs
Expand Up @@ -219,78 +219,84 @@ public static void Terminate()
return retServer;
}

public static unsafe bool SendPushToUser(string frontendId, string serverType, string route, string uid,
public static unsafe Task<bool> SendPushToUser(string frontendId, string serverType, string route, string uid,
object pushMsg)
{
bool ok = false;
MemoryBuffer inMemBuf = new MemoryBuffer();
MemoryBuffer* outMemBufPtr = null;
var retError = new Error();

var push = new Push
return _rpcTaskFactory.StartNew(() =>
{
Route = route,
Uid = uid,
Data = ByteString.CopyFrom(SerializerUtils.SerializeOrRaw(pushMsg, _serializer))
};
bool ok = false;
MemoryBuffer inMemBuf = new MemoryBuffer();
MemoryBuffer* outMemBufPtr = null;
var retError = new Error();
try
{
var data = push.ToByteArray();
fixed (byte* p = data)
var push = new Push
{
inMemBuf.data = (IntPtr) p;
inMemBuf.size = data.Length;
IntPtr inMemBufPtr = new StructWrapper(inMemBuf);
Route = route,
Uid = uid,
Data = ByteString.CopyFrom(SerializerUtils.SerializeOrRaw(pushMsg, _serializer))
};
ok = PushInternal(frontendId, serverType, inMemBufPtr, &outMemBufPtr, ref retError);
if (!ok) // error
try
{
var data = push.ToByteArray();
fixed (byte* p = data)
{
Logger.Error($"Push failed: ({retError.code}: {retError.msg})");
return false;
}
inMemBuf.data = (IntPtr) p;
inMemBuf.size = data.Length;
IntPtr inMemBufPtr = new StructWrapper(inMemBuf);
return true;
ok = PushInternal(frontendId, serverType, inMemBufPtr, &outMemBufPtr, ref retError);
if (!ok) // error
{
Logger.Error($"Push failed: ({retError.code}: {retError.msg})");
return false;
}
return true;
}
}
}
finally
{
if (outMemBufPtr != null) FreeMemoryBufferInternal(outMemBufPtr);
}
finally
{
if (outMemBufPtr != null) FreeMemoryBufferInternal(outMemBufPtr);
}
});
}

public static unsafe bool SendKickToUser(string frontendId, string serverType, KickMsg kick)
public static unsafe Task<bool> SendKickToUser(string frontendId, string serverType, KickMsg kick)
{
bool ok = false;
MemoryBuffer inMemBuf = new MemoryBuffer();
MemoryBuffer* outMemBufPtr = null;
var retError = new Error();

try
return _rpcTaskFactory.StartNew(() =>
{
var data = kick.ToByteArray();
fixed (byte* p = data)
bool ok = false;
MemoryBuffer inMemBuf = new MemoryBuffer();
MemoryBuffer* outMemBufPtr = null;
var retError = new Error();
try
{
inMemBuf.data = (IntPtr) p;
inMemBuf.size = data.Length;
IntPtr inMemBufPtr = new StructWrapper(inMemBuf);
ok = KickInternal(frontendId, serverType, inMemBufPtr, &outMemBufPtr, ref retError);
if (!ok) // error
var data = kick.ToByteArray();
fixed (byte* p = data)
{
Logger.Error($"Push failed: ({retError.code}: {retError.msg})");
return false;
}
inMemBuf.data = (IntPtr) p;
inMemBuf.size = data.Length;
IntPtr inMemBufPtr = new StructWrapper(inMemBuf);
ok = KickInternal(frontendId, serverType, inMemBufPtr, &outMemBufPtr, ref retError);
if (!ok) // error
{
Logger.Error($"Push failed: ({retError.code}: {retError.msg})");
return false;
}
var kickAns = new KickAnswer();
kickAns.MergeFrom(new CodedInputStream(outMemBufPtr->GetData()));
var kickAns = new KickAnswer();
kickAns.MergeFrom(new CodedInputStream(outMemBufPtr->GetData()));
return kickAns.Kicked;
return kickAns.Kicked;
}
}
}
finally
{
if (outMemBufPtr != null) FreeMemoryBufferInternal(outMemBufPtr);
}
finally
{
if (outMemBufPtr != null) FreeMemoryBufferInternal(outMemBufPtr);
}
});
}

public static unsafe Task<T> Rpc<T>(string serverId, Route route, object msg)
Expand Down

0 comments on commit ceb94a8

Please sign in to comment.