Skip to content

Commit

Permalink
Use client.id to connect to Confluent Cloud Kafka (#155) (#156)
Browse files Browse the repository at this point in the history
  • Loading branch information
jordivicedo authored Jul 18, 2023
1 parent 17fc0dd commit 33faf3c
Show file tree
Hide file tree
Showing 4 changed files with 252 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
using System;
using System.Collections.Generic;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;
using FluentAssertions;
using Xunit;

namespace QuixStreams.Streaming.UnitTests;

public class StreamingClientShould
{
[Theory]
[InlineData("confluent-testTopic")]
[InlineData("quixdev-secondTest")]
[InlineData("different-topic")]
public void GetTopicConsumer_ShouldUseClientId(string topicName)
{
// Arrange
var messageHandler = new MockHttpMessageHandler(new Dictionary<string, string>()
{
{ "/workspaces", workspaces },
{ "/topics", topics }
});
var client = new HttpClient(messageHandler);
var streamingClient = new QuixStreamingClient(httpClient: client, token: "faketoken");

// Act
var topicConsumer = streamingClient.GetTopicConsumer(topicName);

// Assert
topicConsumer.Should().NotBeNull();
}

private string topics = @"
[
{
""id"": ""confluent-testTopic"",
""name"": ""confluent-testTopic"",
""workspaceId"": ""confluent"",
""status"": ""Ready"",
},
{
""id"": ""quixdev-secondTest"",
""name"": ""quixdev-secondTest"",
""workspaceId"": ""quixdev"",
""status"": ""Ready"",
},
{
""id"": ""different-topic"",
""name"": ""different-topic"",
""workspaceId"": ""different"",
""status"": ""Ready"",
}
]";

private string workspaces = @"
[
{
""workspaceId"": ""confluent""
,""name"": ""Confluent Kafka Workspace"",
""status"": ""Ready"",
""broker"": {
""address"": ""xxxxx:9092"",
""securityMode"": ""SaslSsl"",
""sslPassword"": """",
""saslMechanism"": ""Plain"",
""username"": ""xxxxx"",
""password"": ""xxxxx"",
""hasCertificate"": false
},
""brokerSettings"": {
""brokerType"": ""ConfluentCloud"",
""syncTopics"": false,
""confluentCloudSettings"": {
""clientID"": ""testclientid""
}
}
},
{
""workspaceId"": ""quixdev"",
""name"": ""Shared Kafka Workspace"",
""status"": ""Ready"",
""broker"": {
""address"": ""xxxx:9092"",
""securityMode"": ""SaslSsl"",
""sslPassword"": ""xxxx"",
""saslMechanism"": ""ScramSha256"",
""username"": ""xxxx"",
""password"": ""xxxx"",
""hasCertificate"": false
},
""brokerSettings"": {
""brokerType"": ""SharedKafka""
}
},
{
""workspaceId"": ""different"",
""name"": ""Unknown Kafka Workspace"",
""status"": ""Ready"",
""broker"": {
""address"": ""xxxx:9092"",
""securityMode"": ""SaslSsl"",
""sslPassword"": ""xxxx"",
""saslMechanism"": ""ScramSha256"",
""username"": ""xxxx"",
""password"": ""xxxx"",
""hasCertificate"": false
},
""brokerSettings"": {
""brokerType"": ""ThisIsNew""
}
}
]";

private class MockHttpMessageHandler : HttpMessageHandler
{
private readonly Dictionary<string, string> responses;

public MockHttpMessageHandler(Dictionary<string, string> responses)
{
this.responses = responses;
}

protected override async Task<HttpResponseMessage> SendAsync(HttpRequestMessage request,
CancellationToken cancellationToken)
{
foreach (var keyValuePair in responses)
{
if (request.RequestUri != null && request.RequestUri.ToString().Contains(keyValuePair.Key))
{
return new HttpResponseMessage
{
StatusCode = HttpStatusCode.OK,
Content = new StringContent(keyValuePair.Value)
};
}
}

throw new Exception("URL not found");
}
}
}
62 changes: 62 additions & 0 deletions src/CsharpClient/QuixStreams.Streaming/QuixApi/Portal/Workspace.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using Newtonsoft.Json;

namespace QuixStreams.Streaming.QuixApi.Portal
{
Expand Down Expand Up @@ -36,6 +37,11 @@ public string WorkspaceId
/// Broker connection details of the Workspace
/// </summary>
public WorkspaceBrokerDetails Broker { get; set; }

/// <summary>
/// The broker settings
/// </summary>
public WorkspaceBrokerSettings BrokerSettings { get; set; }
}

/// <summary>
Expand Down Expand Up @@ -176,4 +182,60 @@ internal enum WorkspaceStatus
/// </summary>
Disabled
}

internal enum WorkspaceBrokerType
{
/// <summary>
/// Unknown broker type
/// </summary>
Unknown,

/// <summary>
/// Quix shared managed Kafka cluster
/// </summary>
SharedKafka,

/// <summary>
/// Confluent Cloud Kafka cluster
/// </summary>
ConfluentCloud,

/// <summary>
/// Self Hosted Kafka cluster
/// </summary>
SelfHosted,

/// <summary>
/// Amazon Msk Kafka cluster
/// </summary>
AmazonMsk
}

/// <summary>
/// Describes the workspace broker settings
/// </summary>
internal class WorkspaceBrokerSettings
{
/// <summary>
/// Broker type
/// </summary>
[JsonConverter(typeof(WorkspaceBrokerTypeJsonConverter))]
public WorkspaceBrokerType BrokerType { get; set; }

/// <summary>
/// Confluent Cloud settings
/// </summary>
public ConfluentCloudSettings ConfluentCloudSettings { get; set; }
}

/// <summary>
/// Describes Confluent cluster settings
/// </summary>
internal class ConfluentCloudSettings
{
/// <summary>
/// Partnership ID
/// </summary>
public string ClientID { get; set; }
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
using System;
using Newtonsoft.Json;
using QuixStreams.Streaming.QuixApi.Portal;

namespace QuixStreams.Streaming.QuixApi
{
public class WorkspaceBrokerTypeJsonConverter : JsonConverter
{
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
writer.WriteValue(value.ToString());
}

public override object ReadJson(JsonReader reader, Type objectType, object existingValue, JsonSerializer serializer)
{
if (reader.TokenType == JsonToken.String)
{
string value = reader.Value.ToString();
if (Enum.TryParse(value, out WorkspaceBrokerType result))
{
return result;
}
}

return WorkspaceBrokerType.Unknown;
}

public override bool CanConvert(Type objectType)
{
return objectType == typeof(WorkspaceBrokerType);
}
}
}
13 changes: 13 additions & 0 deletions src/CsharpClient/QuixStreams.Streaming/QuixStreamingClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -458,6 +458,19 @@ private async Task<KafkaStreamingClient> CreateStreamingClientForWorkspace(Works
securityOptions.UseSasl = false;
}

try
{
if (ws.BrokerSettings?.BrokerType == WorkspaceBrokerType.ConfluentCloud &&
!string.IsNullOrWhiteSpace(ws.BrokerSettings.ConfluentCloudSettings.ClientID))
{
brokerProperties["client.id"] = ws.BrokerSettings.ConfluentCloudSettings.ClientID;
}
}
catch (Exception ex)
{
this.logger.LogTrace(ex, "Failed to set Confluent client id");
}

var client = new KafkaStreamingClient(ws.Broker.Address, securityOptions, brokerProperties, debug);
return wsToStreamingClientDict.GetOrAdd(ws.WorkspaceId, client);
}
Expand Down

0 comments on commit 33faf3c

Please sign in to comment.