Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 12 additions & 7 deletions .pubnub.yml
Original file line number Diff line number Diff line change
@@ -1,8 +1,13 @@
name: c-sharp
version: "7.3.6"
version: "7.3.7"
schema: 1
scm: github.com/pubnub/c-sharp
changelog:
- date: 2025-04-07
version: v7.3.7
changes:
- type: improvement
text: "Enhance request handling with detailed response interface."
- date: 2025-04-03
version: v7.3.6
changes:
Expand Down Expand Up @@ -877,7 +882,7 @@ features:
- QUERY-PARAM
supported-platforms:
-
version: Pubnub 'C#' 7.3.6
version: Pubnub 'C#' 7.3.7
platforms:
- Windows 10 and up
- Windows Server 2008 and up
Expand All @@ -888,7 +893,7 @@ supported-platforms:
- .Net Framework 4.6.1+
- .Net Framework 6.0
-
version: PubnubPCL 'C#' 7.3.6
version: PubnubPCL 'C#' 7.3.7
platforms:
- Xamarin.Android
- Xamarin.iOS
Expand All @@ -908,7 +913,7 @@ supported-platforms:
- .Net Core
- .Net 6.0
-
version: PubnubUWP 'C#' 7.3.6
version: PubnubUWP 'C#' 7.3.7
platforms:
- Windows Phone 10
- Universal Windows Apps
Expand All @@ -932,7 +937,7 @@ sdks:
distribution-type: source
distribution-repository: GitHub
package-name: Pubnub
location: https://github.com/pubnub/c-sharp/releases/tag/v7.3.6.0
location: https://github.com/pubnub/c-sharp/releases/tag/v7.3.7.0
requires:
-
name: ".Net"
Expand Down Expand Up @@ -1215,7 +1220,7 @@ sdks:
distribution-type: source
distribution-repository: GitHub
package-name: PubNubPCL
location: https://github.com/pubnub/c-sharp/releases/tag/v7.3.6.0
location: https://github.com/pubnub/c-sharp/releases/tag/v7.3.7.0
requires:
-
name: ".Net Core"
Expand Down Expand Up @@ -1574,7 +1579,7 @@ sdks:
distribution-type: source
distribution-repository: GitHub
package-name: PubnubUWP
location: https://github.com/pubnub/c-sharp/releases/tag/v7.3.6.0
location: https://github.com/pubnub/c-sharp/releases/tag/v7.3.7.0
requires:
-
name: "Universal Windows Platform Development"
Expand Down
4 changes: 4 additions & 0 deletions CHANGELOG
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
v7.3.7 - April 07 2025
-----------------------------
- Modified: enhance request handling with detailed response interface.

v7.3.6 - April 03 2025
-----------------------------
- Fixed: reafctor: Removed excess logging from transport module. Removed redundant logging from transport layer and added thread id information for tracking http request.
Expand Down
2 changes: 1 addition & 1 deletion src/Api/PubnubApi/EndPoint/Files/SendFileOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -426,7 +426,7 @@ private async Task<PNResult<PNFileUploadResult>> ProcessFileUpload()
publishFailed = true;
returnValue.Status = publishFileMessageStatus;
logger?.Debug($"PublishFileMessage Failed. retry count={currentFileRetryCount}");
await Task.Delay(1000);
await Task.Delay(1000).ConfigureAwait(false);
}
} while (publishFailed && currentFileRetryCount <= publishFileRetryLimit &&
!(publishFileMessageStatus?.StatusCode != 400 || publishFileMessageStatus.StatusCode != 403));
Expand Down
6 changes: 3 additions & 3 deletions src/Api/PubnubApi/EndPoint/PubSub/PublishOperation.cs
Original file line number Diff line number Diff line change
Expand Up @@ -204,8 +204,8 @@ internal void Publish(string channel, object message, bool storeInHistory, int t

PubnubInstance.transportMiddleware.Send(transportRequest).ContinueWith(t =>
{
var transportResponse = t.Result;
if (transportResponse.Error == null)
var transportResponse = t?.Result;
if (transportResponse is { Error: null })
{
var responseString = Encoding.UTF8.GetString(transportResponse.Content);
if (!string.IsNullOrEmpty(responseString))
Expand Down Expand Up @@ -304,7 +304,7 @@ internal async Task<PNResult<PNPublishResult>> Publish(string channel, object me
var transportResponse =
await PubnubInstance.transportMiddleware.Send(transportRequest).ConfigureAwait(false);
logger?.Debug($"Publish() got transport response: {transportResponse?.StatusCode}\n error: {transportResponse?.Error?.Message}\nstackTrace: {transportResponse?.Error?.StackTrace}");
if (transportResponse.Error == null)
if (transportResponse is { Error: null, Content: not null })
{
string responseString = Encoding.UTF8.GetString(transportResponse.Content);
PNStatus errorStatus = GetStatusIfError<PNPublishResult>(requestState, responseString);
Expand Down
19 changes: 10 additions & 9 deletions src/Api/PubnubApi/EndPoint/PubSub/SubscribeManager2.cs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ public async Task<Tuple<HandshakeResponse, PNStatus>> HandshakeRequest(PNOperati
if (config.MaintainPresenceState) presenceState = BuildJsonUserState(channels, channelGroups, true);
var requestParameter = CreateSubscribeRequestParameter(channels: channels, channelGroups: channelGroups, timetoken: timetoken.GetValueOrDefault(), region: region.GetValueOrDefault(), stateJsonValue: presenceState, initialSubscribeUrlParams: initialSubscribeUrlParams, externalQueryParam: externalQueryParam);
var transportRequest = pubnubInstance.transportMiddleware.PreapareTransportRequest(requestParameter: requestParameter, operationType: PNOperationType.PNSubscribeOperation);
cancellationTokenSource = transportRequest.CancellationTokenSource;
RequestState<HandshakeResponse> pubnubRequestState = new RequestState<HandshakeResponse>
{
Channels = channels,
Expand All @@ -43,16 +42,14 @@ public async Task<Tuple<HandshakeResponse, PNStatus>> HandshakeRequest(PNOperati
Region = region.GetValueOrDefault(),
TimeQueued = DateTime.Now
};
var transportResponse = await pubnubInstance.transportMiddleware.Send(transportRequest: transportRequest);

var transportResponse = await pubnubInstance.transportMiddleware.Send(transportRequest: transportRequest).ConfigureAwait(false);
if (transportResponse.StatusCode == Constants.HttpRequestSuccessStatusCode && transportResponse.Error == null && transportResponse.Content != null) {
var responseJson = Encoding.UTF8.GetString(transportResponse.Content);
logger?.Debug($"Handshake Effect received json: {responseJson}");
PNStatus status = new PNStatus(null, PNOperationType.PNSubscribeOperation, PNStatusCategory.PNConnectedCategory, channels, channelGroups);
HandshakeResponse handshakeResponse = jsonLibrary.DeserializeToObject<HandshakeResponse>(responseJson);
return new Tuple<HandshakeResponse, PNStatus>(handshakeResponse, status);
}

PNStatus errStatus;
if (transportResponse.Error != null) {
PNStatusCategory category = PNStatusCategoryHelper.GetPNStatusCategory(transportResponse.Error);
Expand Down Expand Up @@ -89,7 +86,8 @@ internal async Task<Tuple<ReceivingResponse<object>, PNStatus>> ReceiveRequest<T
string channelsJsonState = BuildJsonUserState(channels, channelGroups, false);
var requestParameter = CreateSubscribeRequestParameter(channels: channels, channelGroups: channelGroups, timetoken: timetoken.GetValueOrDefault(), region: region.GetValueOrDefault(), stateJsonValue: channelsJsonState, initialSubscribeUrlParams: initialSubscribeUrlParams, externalQueryParam: externalQueryParam);
var transportRequest = pubnubInstance.transportMiddleware.PreapareTransportRequest(requestParameter: requestParameter, operationType: PNOperationType.PNSubscribeOperation);
cancellationTokenSource = transportRequest.CancellationTokenSource;
if (timetoken > 0)
cancellationTokenSource = transportRequest.CancellationTokenSource;
RequestState<ReceivingResponse<object>> pubnubRequestState = new RequestState<ReceivingResponse<object>>
{
Channels = channels,
Expand All @@ -100,14 +98,15 @@ internal async Task<Tuple<ReceivingResponse<object>, PNStatus>> ReceiveRequest<T
TimeQueued = DateTime.Now
};

var transportResponse = await pubnubInstance.transportMiddleware.Send(transportRequest: transportRequest);
var transportResponse = await pubnubInstance.transportMiddleware.Send(transportRequest: transportRequest).ConfigureAwait(false);
if (transportResponse.Content != null && transportResponse.Error == null && transportResponse.StatusCode == Constants.HttpRequestSuccessStatusCode) {
var responseJson = Encoding.UTF8.GetString(transportResponse.Content);
logger?.Debug($"Receiving Effect received json: {responseJson}");
PNStatus status = new PNStatus(null, PNOperationType.PNSubscribeOperation, PNStatusCategory.PNConnectedCategory, channels, channelGroups);
ReceivingResponse<object> receiveResponse = jsonLibrary.DeserializeToObject<ReceivingResponse<object>>(responseJson);
return new Tuple<ReceivingResponse<object>, PNStatus>(receiveResponse, status);
}
if (transportResponse.IsCancelled) return resp;
PNStatus errStatus;
if (transportResponse.Error != null) {
PNStatusCategory category = PNStatusCategoryHelper.GetPNStatusCategory(transportResponse.Error);
Expand All @@ -132,13 +131,14 @@ internal void ReceiveRequestCancellation()
if (cancellationTokenSource != null) {
cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
cancellationTokenSource = null;
} else {
logger?.Trace($"SubscribeManager RequestCancellation. No request to cancel.");
logger?.Trace($"ReceiveRequestCancellation: No active request found to cancel.");
}
logger?.Trace($"SubscribeManager ReceiveRequestCancellation. Done.");
logger?.Trace($"ReceiveRequestCancellation: Active request found and cancelled.");
} catch (Exception ex)
{
logger?.Trace($"SubscribeManager ReceiveRequestCancellation Exception: {ex}");
logger?.Trace($"ReceiveRequestCancellation Exception: {ex}");
}
}

Expand All @@ -148,6 +148,7 @@ internal void ReceiveReconnectRequestCancellation()
if (cancellationTokenSource != null) {
cancellationTokenSource.Cancel();
cancellationTokenSource.Dispose();
cancellationTokenSource = null;
} else {
logger?.Trace($"SubscribeManager ReceiveReconnectRequestCancellation. No request to cancel.");
}
Expand Down
55 changes: 24 additions & 31 deletions src/Api/PubnubApi/EventEngine/Common/Delay.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ namespace PubnubApi.EventEngine.Common
public class Delay
{
public bool Cancelled { get; private set; } = false;
private readonly TaskCompletionSource<object> taskCompletionSource = new TaskCompletionSource<object>();
private readonly object monitor = new object();
private readonly TaskCompletionSource<object> taskCompletionSource = new ();
private readonly CancellationTokenSource cancellationTokenSource = new ();
private readonly int milliseconds;

public Delay(int milliseconds)
Expand All @@ -17,44 +17,37 @@ public Delay(int milliseconds)

public Task Start()
{
#if NETFX_CORE || WINDOWS_UWP || UAP || NETSTANDARD10 || NETSTANDARD11 || NETSTANDARD12
Task taskAwaiter = Task.Factory.StartNew(AwaiterLoop);
taskAwaiter.Wait();
#else
Thread awaiterThread = new Thread(AwaiterLoop);
awaiterThread.Start();
#endif
AwaiterLoop();
return taskCompletionSource.Task; }

public void Cancel()
{
lock (monitor)
{
Cancelled = true;
Monitor.Pulse(monitor);
}
Cancelled = true;
cancellationTokenSource.Cancel();
}

private void AwaiterLoop()
private async void AwaiterLoop()
{
while(true)
if (Cancelled)
{
taskCompletionSource.TrySetCanceled();
return;
}
try
{
await Task.Delay(milliseconds, cancellationTokenSource.Token).ConfigureAwait(false);
}
catch (TaskCanceledException e)
{
taskCompletionSource.TrySetCanceled();
return;
}
if (Cancelled)
{
lock (monitor)
{
if (Cancelled)
{
taskCompletionSource.TrySetCanceled();
break;
}
Monitor.Wait(monitor, milliseconds);
if (Cancelled)
{
taskCompletionSource.TrySetCanceled();
break;
}
taskCompletionSource.TrySetResult(null);
}
taskCompletionSource.TrySetCanceled();
return;
}
taskCompletionSource.TrySetResult(null);
}
}
}
2 changes: 1 addition & 1 deletion src/Api/PubnubApi/EventEngine/Core/EffectDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public async Task Dispatch(IEffectInvocation invocation) {
if (handler.IsBackground(invocation))
FireAndForget(handler, invocation);
else
await handler.Run(invocation);
await handler.Run(invocation).ConfigureAwait(false);
}
}

Expand Down
8 changes: 4 additions & 4 deletions src/Api/PubnubApi/EventEngine/Core/Engine.cs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ private async Task Transition(IEvent e)
return;
}

await ExecuteStateChange(currentState, stateInvocationPair.State, stateInvocationPair.Invocations);
await ExecuteStateChange(currentState, stateInvocationPair.State, stateInvocationPair.Invocations).ConfigureAwait(false);

this.currentState = stateInvocationPair.State;
}
Expand All @@ -78,16 +78,16 @@ private async Task ExecuteStateChange(State sourceState, State targetState, IEnu
logger?.Debug($"Exiting state {sourceState}");
foreach (var effectInvocation in sourceState.OnExit ?? emptyInvocationList) {
logger?.Debug($"Dispatching effect: {effectInvocation}");
await dispatcher.Dispatch(effectInvocation);
await dispatcher.Dispatch(effectInvocation).ConfigureAwait(false);
}
foreach (var effectInvocation in invocations ?? emptyInvocationList) {
logger?.Debug($"Dispatching effect: {effectInvocation}");
await dispatcher.Dispatch(effectInvocation);
await dispatcher.Dispatch(effectInvocation).ConfigureAwait(false);
}
logger?.Debug($"Entering state {targetState}");
foreach (var effectInvocation in targetState.OnEntry ?? emptyInvocationList) {
logger?.Debug($"Dispatching effect: {effectInvocation}");
await dispatcher.Dispatch(effectInvocation);
await dispatcher.Dispatch(effectInvocation).ConfigureAwait(false);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ public override async Task Run(DelayedHeartbeatInvocation invocation)
return;
}
retryDelay = new Delay(retryConfiguration.RetryPolicy.GetDelay(invocation.RetryCount, invocation.Reason, null));
await retryDelay.Start();
await retryDelay.Start().ConfigureAwait(false);
if (!retryDelay.Cancelled)
await MakeHeartbeatRequest(invocation);
await MakeHeartbeatRequest(invocation).ConfigureAwait(false);
}

private void EnqueueHeartbeatGiveUpEvent()
Expand All @@ -49,7 +49,7 @@ private async Task MakeHeartbeatRequest(DelayedHeartbeatInvocation invocation)
var resp = await heartbeatOperation.HeartbeatRequest<string>(
invocation.Input.Channels.ToArray(),
invocation.Input.ChannelGroups.ToArray()
);
).ConfigureAwait(false);
switch (resp) {
case { } when resp.Error:
eventQueue.Enqueue(new Events.HeartbeatFailureEvent() { retryCount = invocation.RetryCount + 1, Status = resp });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public override async Task Run(HeartbeatInvocation invocation)
var resp = await heartbeatOperation.HeartbeatRequest<string>(
invocation.Input.Channels.ToArray(),
invocation.Input.ChannelGroups.ToArray()
);
).ConfigureAwait(false);
switch (resp) {
case { } when resp.Error:
eventQueue.Enqueue(new Events.HeartbeatFailureEvent() { retryCount = 0, Status = resp });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ public override async Task Run(LeaveInvocation invocation)
await leaveOperation.LeaveRequest<string>(
invocation.Input.Channels?.ToArray(),
invocation.Input.ChannelGroups?.ToArray()
);
).ConfigureAwait(false);
} catch (Exception) { }
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ public override bool IsBackground(WaitInvocation invocation)
public override async Task Run(WaitInvocation invocation)
{
retryDelay = new Delay((int)(pnConfiguration.PresenceInterval * 1000));
await retryDelay.Start();
await retryDelay.Start().ConfigureAwait(false);
if (!retryDelay.Cancelled)
eventQueue.Enqueue(new Events.TimesUpEvent());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ internal HandshakeEffectHandler(SubscribeManager2 manager, EventQueue eventQueue

public override async Task Run(HandshakeInvocation invocation)
{
var response = await MakeHandshakeRequest(invocation);
var response = await MakeHandshakeRequest(invocation).ConfigureAwait(false);
SubscriptionCursor cursor = null;
if (response.Item1 != null)
{
Expand Down Expand Up @@ -89,7 +89,7 @@ public override bool IsBackground(HandshakeInvocation invocation)
null,
invocation.InitialSubscribeQueryParams,
invocation.ExternalQueryParams
);
).ConfigureAwait(false);
}

public override async Task Cancel()
Expand Down Expand Up @@ -133,9 +133,9 @@ public override async Task Run(HandshakeReconnectInvocation invocation)
else
{
retryDelay = new Delay(retryConfiguration.RetryPolicy.GetDelay(invocation.AttemptedRetries, invocation.Reason, null));
await retryDelay.Start();
await retryDelay.Start().ConfigureAwait(false);
if (!retryDelay.Cancelled)
await handshakeEffectHandler.Run(invocation as HandshakeInvocation);
await handshakeEffectHandler.Run(invocation as HandshakeInvocation).ConfigureAwait(false);
}
}
catch (Exception ex)
Expand All @@ -152,7 +152,7 @@ public override async Task Cancel()
{
retryDelay.Cancel();
}
await handshakeEffectHandler.Cancel();
await handshakeEffectHandler.Cancel().ConfigureAwait(false);
}
}
}
Loading
Loading