Skip to content
This repository has been archived by the owner on Jan 24, 2024. It is now read-only.

[oauthclient] Create a zero-dependencies jar: - remove Async HTTP client and use the standard JDK Http client - shade and relocate Jackson Databind, used for JSON #1979

71 changes: 47 additions & 24 deletions oauth-client/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,6 @@
<name>StreamNative :: Pulsar Protocol Handler :: OAuth 2.0 Client</name>
<description>OAuth 2.0 login callback handler for Kafka client</description>

<properties>
<async-http-client.version>2.12.1</async-http-client.version>
<guava.version>31.1-jre</guava.version>
</properties>

<!-- include the dependencies -->
<dependencies>
<dependency>
Expand All @@ -45,32 +40,14 @@
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>

<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-annotations</artifactId>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.asynchttpclient</groupId>
<artifactId>async-http-client</artifactId>
<version>${async-http-client.version}</version>
</dependency>

<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>${guava.version}</version>
<scope>provided</scope>
</dependency>

<dependency>
<groupId>org.testng</groupId>
<artifactId>testng</artifactId>
Expand All @@ -88,5 +65,51 @@
<artifactId>test-listener</artifactId>
<scope>test</scope>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<!-- <build>-->
gaoran10 marked this conversation as resolved.
Show resolved Hide resolved
<!-- <plugins>-->
<!-- <plugin>-->
<!-- <groupId>org.apache.maven.plugins</groupId>-->
<!-- <artifactId>maven-shade-plugin</artifactId>-->
<!-- <version>${maven-shade-plugin.version}</version>-->
<!-- <executions>-->
<!-- <execution>-->
<!-- <phase>package</phase>-->
<!-- <goals>-->
<!-- <goal>shade</goal>-->
<!-- </goals>-->
<!-- <configuration>-->
<!-- <createDependencyReducedPom>true</createDependencyReducedPom>-->
<!-- <relocations>-->
<!-- <relocation>-->
<!-- <pattern>com.fasterxml.jackson</pattern>-->
<!-- <shadedPattern>kafka.oauthclient.com.fasterxml.jackson</shadedPattern>-->
<!-- </relocation>-->
<!-- </relocations>-->
<!-- <artifactSet>-->
<!-- <includes>-->
<!-- <include>com.fasterxml.jackson.core:*</include>-->
<!-- </includes>-->
<!-- </artifactSet>-->
<!-- <filters>-->
<!-- <filter>-->
<!-- <artifact>com.fasterxml.jackson.core:*</artifact>-->
<!-- <excludes>-->
<!-- <exclude>META-INF/**</exclude>-->
<!-- </excludes>-->
<!-- </filter>-->
<!-- </filters>-->
<!-- </configuration>-->
<!-- </execution>-->
<!-- </executions>-->
<!-- </plugin>-->
<!-- </plugins>-->
<!-- </build>-->
</project>
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,23 @@

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectReader;
import com.google.common.annotations.VisibleForTesting;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.io.UnsupportedEncodingException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.net.URLConnection;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.stream.Collectors;
import lombok.Getter;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClient;
import org.asynchttpclient.DefaultAsyncHttpClientConfig;
import org.asynchttpclient.Response;

/**
* The OAuth 2.0 client credential flow.
Expand All @@ -52,74 +46,79 @@ public class ClientCredentialsFlow implements Closeable {
private final Duration connectTimeout = Duration.ofSeconds(10);
private final Duration readTimeout = Duration.ofSeconds(30);
private final ClientConfig clientConfig;
private final AsyncHttpClient httpClient;

public ClientCredentialsFlow(ClientConfig clientConfig) {
this.clientConfig = clientConfig;
this.httpClient = new DefaultAsyncHttpClient(new DefaultAsyncHttpClientConfig.Builder()
.setFollowRedirect(true)
.setConnectTimeout((int) connectTimeout.toMillis())
.setReadTimeout((int) readTimeout.toMillis())
.build());
}

@VisibleForTesting
protected ClientCredentialsFlow(ClientConfig clientConfig, AsyncHttpClient httpClient) {
this.clientConfig = clientConfig;
this.httpClient = httpClient;
}

public OAuthBearerTokenImpl authenticate() throws IOException {
final String tokenEndPoint = findAuthorizationServer().getTokenEndPoint();
final ClientInfo clientInfo = clientConfig.getClientInfo();
final URL url = new URL(tokenEndPoint);
HttpURLConnection con = (HttpURLConnection) url.openConnection();
try {
con.setReadTimeout((int) readTimeout.toMillis());
con.setConnectTimeout((int) connectTimeout.toMillis());
con.setDoOutput(true);
con.setRequestMethod("POST");
con.setRequestProperty("Content-Type", "application/x-www-form-urlencoded");
con.setRequestProperty("Accept", "application/json");
final String body = buildClientCredentialsBody(clientInfo);
final Response response = httpClient.preparePost(tokenEndPoint)
.setHeader("Accept", "application/json")
.setHeader("Content-Type", "application/x-www-form-urlencoded")
.setBody(body)
.execute()
.get();
switch (response.getStatusCode()) {
case 200:
OAuthBearerTokenImpl token = TOKEN_RESULT_READER.readValue(response.getResponseBodyAsBytes());
String tenant = clientInfo.getTenant();
// Add tenant for multi-tenant.
if (tenant != null) {
token.setTenant(tenant);
}
return token;
try (OutputStream o = con.getOutputStream()) {
o.write(body.getBytes(StandardCharsets.UTF_8));
}
try (InputStream in = con.getInputStream()) {
OAuthBearerTokenImpl token = TOKEN_RESULT_READER.readValue(in);
String tenant = clientInfo.getTenant();
// Add tenant for multi-tenant.
if (tenant != null) {
token.setTenant(tenant);
}
return token;
}
} catch (IOException err) {
switch (con.getResponseCode()) {
case 400: // Bad request
case 401: // Unauthorized
throw new IOException(OBJECT_MAPPER.writeValueAsString(
TOKEN_ERROR_READER.readValue(response.getResponseBodyAsBytes())));
case 401: { // Unauthorized
IOException error;
try {
error = new IOException(OBJECT_MAPPER.writeValueAsString(
TOKEN_ERROR_READER.readValue(con.getErrorStream())));
error.addSuppressed(err);
} catch (Exception ignoreJsonError) {
err.addSuppressed(ignoreJsonError);
throw err;
}
throw error;
}
default:
throw new IOException("Failed to perform HTTP request: "
+ response.getStatusCode() + " " + response.getStatusText());
throw new IOException("Failed to perform HTTP request to " + tokenEndPoint
+ ":" + con.getResponseCode() + " " + con.getResponseMessage(), err);
}
} catch (UnsupportedEncodingException | InterruptedException
| ExecutionException | JsonProcessingException e) {
throw new IOException(e);
} finally {
con.disconnect();
}
}

@Override
public void close() throws IOException {
httpClient.close();
}

@VisibleForTesting
Metadata findAuthorizationServer() throws IOException {
// See RFC-8414 for this well-known URI
final URL wellKnownMetadataUrl = URI.create(clientConfig.getIssuerUrl().toExternalForm()
+ "/.well-known/openid-configuration").normalize().toURL();
final URLConnection connection = wellKnownMetadataUrl.openConnection();
connection.setConnectTimeout((int) connectTimeout.toMillis());
connection.setReadTimeout((int) readTimeout.toMillis());
connection.setRequestProperty("Accept", "application/json");
final HttpURLConnection connection = (HttpURLConnection) wellKnownMetadataUrl.openConnection();
try {
connection.setConnectTimeout((int) connectTimeout.toMillis());
connection.setReadTimeout((int) readTimeout.toMillis());
connection.setRequestProperty("Accept", "application/json");

try (InputStream inputStream = connection.getInputStream()) {
return METADATA_READER.readValue(inputStream);
try (InputStream inputStream = connection.getInputStream()) {
return METADATA_READER.readValue(inputStream);
}
} finally {
connection.disconnect();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,19 +13,19 @@
*/
package io.streamnative.pulsar.handlers.kop.security.oauth;

import static org.mockito.ArgumentMatchers.anyString;
import static com.github.tomakehurst.wiremock.client.WireMock.configureFor;
import static com.github.tomakehurst.wiremock.client.WireMock.stubFor;
import static com.github.tomakehurst.wiremock.client.WireMock.urlPathEqualTo;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;

import com.github.tomakehurst.wiremock.WireMockServer;
import com.github.tomakehurst.wiremock.client.WireMock;
import com.github.tomakehurst.wiremock.core.WireMockConfiguration;
import java.io.IOException;
import java.util.Collections;
import java.util.Objects;
import java.util.concurrent.ExecutionException;
import org.asynchttpclient.AsyncHttpClient;
import org.asynchttpclient.BoundRequestBuilder;
import org.asynchttpclient.ListenableFuture;
import org.asynchttpclient.Response;
import org.testng.Assert;
import org.testng.annotations.Test;

Expand Down Expand Up @@ -56,37 +56,36 @@ public void testLoadPrivateKey() {
}

@Test
public void testTenantToken() throws ExecutionException, InterruptedException, IOException {
AsyncHttpClient mockHttpClient = mock(AsyncHttpClient.class);
final ClientCredentialsFlow flow = spy(new ClientCredentialsFlow(ClientConfigHelper.create(
"http://localhost:4444",
Objects.requireNonNull(
getClass().getClassLoader().getResource("private_key.json")).toString()
), mockHttpClient));
public void testTenantToken() throws IOException {
WireMockServer mockOauthServer = new WireMockServer(WireMockConfiguration.wireMockConfig().dynamicPort());
BewareMyPower marked this conversation as resolved.
Show resolved Hide resolved
try {
mockOauthServer.start();
final ClientCredentialsFlow flow = spy(new ClientCredentialsFlow(ClientConfigHelper.create(
mockOauthServer.url("/"),
Objects.requireNonNull(
getClass().getClassLoader().getResource("private_key.json")).toString()
)));

ClientCredentialsFlow.Metadata mockMetadata = mock(ClientCredentialsFlow.Metadata.class);
doReturn("mockTokenEndPoint").when(mockMetadata).getTokenEndPoint();
ClientCredentialsFlow.Metadata mockMetadata = mock(ClientCredentialsFlow.Metadata.class);
doReturn(mockOauthServer.url("/mockTokenEndPoint")).when(mockMetadata).getTokenEndPoint();

doReturn(mockMetadata).when(flow).findAuthorizationServer();
doReturn(mockMetadata).when(flow).findAuthorizationServer();

BoundRequestBuilder mockBuilder = mock(BoundRequestBuilder.class);
ListenableFuture<Response> mockFuture = mock(ListenableFuture.class);
Response mockResponse = mock(Response.class);
doReturn(200).when(mockResponse).getStatusCode();
String responseString = "{\n"
+ " \"access_token\":\"my-token\",\n"
+ " \"expires_in\":42,\n"
+ " \"scope\":\"test\"\n"
+ "}";
doReturn(responseString.getBytes()).when(mockResponse).getResponseBodyAsBytes();
doReturn(mockResponse).when(mockFuture).get();
doReturn(mockFuture).when(mockBuilder).execute();
doReturn(mockBuilder).when(mockHttpClient).preparePost(anyString());
doReturn(mockBuilder).when(mockBuilder).setHeader(anyString(), anyString());
doReturn(mockBuilder).when(mockBuilder).setBody(anyString());
String responseString = "{\n"
+ " \"access_token\":\"my-token\",\n"
+ " \"expires_in\":42,\n"
+ " \"scope\":\"test\"\n"
+ "}";
configureFor("localhost", mockOauthServer.port());
stubFor(WireMock.post(urlPathEqualTo("/mockTokenEndPoint"))
.willReturn(WireMock.ok(responseString))
);
OAuthBearerTokenImpl token = flow.authenticate();
Assert.assertEquals(token.value(), "my-tenant" + OAuthBearerTokenImpl.DELIMITER + "my-token");
Assert.assertEquals(token.scope(), Collections.singleton("test"));
} finally {
mockOauthServer.shutdown();
}

OAuthBearerTokenImpl token = flow.authenticate();
Assert.assertEquals(token.value(), "my-tenant" + OAuthBearerTokenImpl.DELIMITER + "my-token");
Assert.assertEquals(token.scope(), Collections.singleton("test"));
}
}
9 changes: 8 additions & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@
<kafka.version>2.8.0</kafka.version>
<lombok.version>1.18.24</lombok.version>
<mockito.version>4.11.0</mockito.version>
<wiremock.version>3.0.0-beta-2</wiremock.version>
<pulsar.group.id>io.streamnative</pulsar.group.id>
<pulsar.version>3.0.0.1</pulsar.version>
<slf4j.version>1.7.25</slf4j.version>
Expand All @@ -70,7 +71,7 @@
<maven-checkstyle-plugin.version>3.1.1</maven-checkstyle-plugin.version>
<maven-compiler-plugin.version>3.8.1</maven-compiler-plugin.version>
<maven-javadoc-plugin.version>3.4.1</maven-javadoc-plugin.version>
<maven-shade-plugin.version>3.4.0</maven-shade-plugin.version>
<maven-shade-plugin.version>3.4.1</maven-shade-plugin.version>
<maven-surefire-plugin.version>3.0.0-M3</maven-surefire-plugin.version>
<puppycrawl.checkstyle.version>8.37</puppycrawl.checkstyle.version>
<spotbugs-maven-plugin.version>4.2.2</spotbugs-maven-plugin.version>
Expand Down Expand Up @@ -250,6 +251,12 @@
<version>${mockito.version}</version>
</dependency>

<dependency>
<groupId>com.github.tomakehurst</groupId>
<artifactId>wiremock</artifactId>
<version>${wiremock.version}</version>
</dependency>

<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-inline</artifactId>
Expand Down
Loading