Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Okta Authentication and Validation - Ingestion #2955

Merged
merged 11 commits into from
Feb 26, 2022
26 changes: 26 additions & 0 deletions ingestion/examples/auth_examples/okta_example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
{
"source": {
"type": "sample-data",
"config": {
"sample_data_folder": "./examples/sample_data"
}
},
"sink": {
"type": "metadata-rest",
"config": {}
},
"metadata_server": {
"type": "metadata-server",
"config": {
"api_endpoint": "http://localhost:8585/api",
"auth_provider_type": "okta",
"client_id": "{client_id}",
"org_url": "{Issuer URI}",
"private_key": "Public and Private Keypair",
ayush-shah marked this conversation as resolved.
Show resolved Hide resolved
"email": "email",
"scopes": [
"Authorization Server Scopes"
]
}
}
}
3 changes: 1 addition & 2 deletions ingestion/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,6 @@ def get_long_description():
"typing-inspect",
"pydantic>=1.7.4",
"pydantic[email]>=1.7.2",
"google>=3.0.0",
"google-auth>=1.33.0",
"python-dateutil>=2.8.1",
"email-validator>=1.0.3",
"wheel~=0.36.2",
Expand Down Expand Up @@ -78,6 +76,7 @@ def get_long_description():
"druid": {"pydruid>=0.6.2"},
"elasticsearch": {"elasticsearch~=7.13.1"},
"glue": {"boto3~=1.19.12"},
"google": {"google>=3.0.0", "google-auth>=1.33.0"},
"dynamodb": {"boto3~=1.19.12"},
"hive": {
"pyhive~=0.6.3",
Expand Down
2 changes: 2 additions & 0 deletions ingestion/src/metadata/cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import os
import pathlib
import sys
import traceback
from typing import List, Optional, Tuple

import click
Expand Down Expand Up @@ -81,6 +82,7 @@ def ingest(config: str) -> None:
workflow = Workflow.create(workflow_config)
except ValidationError as e:
click.echo(e, err=True)
logger.debug(traceback.print_exc())
sys.exit(1)

workflow.execute()
Expand Down
5 changes: 3 additions & 2 deletions ingestion/src/metadata/ingestion/api/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -162,15 +162,16 @@ def execute(self):
if hasattr(self, "sink"):
self.sink.write_record(processed_record)
self.report["sink"] = self.sink.get_status().as_obj()
if hasattr(self, "bulk_sink"):
ayush-shah marked this conversation as resolved.
Show resolved Hide resolved
self.bulk_sink.write_records()
self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj()

def stop(self):
if hasattr(self, "processor"):
self.processor.close()
if hasattr(self, "stage"):
self.stage.close()
if hasattr(self, "bulk_sink"):
self.bulk_sink.write_records()
self.report["Bulk_Sink"] = self.bulk_sink.get_status().as_obj()
self.bulk_sink.close()
if hasattr(self, "sink"):
self.sink.close()
Expand Down
3 changes: 2 additions & 1 deletion ingestion/src/metadata/ingestion/ometa/ometa_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
working with OpenMetadata entities.
"""

import asyncio
import logging
import urllib
from typing import Dict, Generic, List, Optional, Type, TypeVar, Union, get_args
Expand Down Expand Up @@ -155,7 +156,7 @@ def __init__(self, config: MetadataServerConfig, raw_data: bool = False):
base_url=self.config.api_endpoint,
api_version=self.config.api_version,
auth_header="Authorization",
auth_token=self._auth_provider.auth_token(),
auth_token=asyncio.run(self._auth_provider.auth_token()),
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need aysncio here?

)
self.client = REST(client_config)
self._use_raw_data = raw_data
Expand Down
73 changes: 58 additions & 15 deletions ingestion/src/metadata/ingestion/ometa/openmetadata_rest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,8 @@
import http.client
import json
import logging
import time
import uuid
from typing import List

import google.auth
import google.auth.transport.requests
from google.oauth2 import service_account
from jose import jwt
from pydantic import BaseModel

from metadata.config.common import ConfigModel
Expand Down Expand Up @@ -101,6 +95,7 @@ class MetadataServerConfig(ConfigModel):
email: str = None
audience: str = "https://www.googleapis.com/oauth2/v4/token"
auth_header: str = "Authorization"
scopes: List = []


class NoOpAuthenticationProvider(AuthenticationProvider):
Expand Down Expand Up @@ -144,6 +139,10 @@ def create(cls, config: MetadataServerConfig):
return cls(config)

def auth_token(self) -> str:
import google.auth
import google.auth.transport.requests
from google.oauth2 import service_account

credentials = service_account.IDTokenCredentials.from_service_account_file(
self.config.secret_key, target_audience=self.config.audience
)
Expand All @@ -164,21 +163,65 @@ def __init__(self, config: MetadataServerConfig):
def create(cls, config: MetadataServerConfig):
return cls(config)

def auth_token(self) -> str:
from okta.jwt import JWT # pylint: disable=import-outside-toplevel
async def auth_token(self) -> str:
import time
import uuid
from urllib.parse import quote, urlencode

from okta.cache.okta_cache import OktaCache
from okta.jwt import JWT, jwt
from okta.request_executor import RequestExecutor

my_pem, my_jwk = JWT.get_PEM_JWK(self.config.private_key)
issued_time = int(time.time())
expiry_time = issued_time + JWT.ONE_HOUR
generated_JWT_ID = str(uuid.uuid4())

_, my_jwk = JWT.get_PEM_JWK(self.config.private_key)
claims = {
"sub": self.config.client_id,
"iat": time.time(),
"exp": time.time() + JWT.ONE_HOUR,
"iat": issued_time,
"exp": expiry_time,
"iss": self.config.client_id,
"aud": self.config.org_url + JWT.OAUTH_ENDPOINT,
"jti": uuid.uuid4(),
"email": self.config.email,
"aud": self.config.org_url,
"jti": generated_JWT_ID,
}
token = jwt.encode(claims, my_jwk.to_dict(), JWT.HASH_ALGORITHM)
return token
config = {
"client": {
"orgUrl": self.config.org_url,
"authorizationMode": "BEARER",
"rateLimit": {},
"privateKey": self.config.private_key,
"clientId": self.config.client_id,
"token": token,
"scopes": self.config.scopes,
}
}
request_exec = RequestExecutor(
config=config, cache=OktaCache(ttl=expiry_time, tti=issued_time)
)
parameters = {
"grant_type": "client_credentials",
"scope": " ".join(config["client"]["scopes"]),
"client_assertion_type": "urn:ietf:params:oauth:client-assertion-type:jwt-bearer",
"client_assertion": token,
}
encoded_parameters = urlencode(parameters, quote_via=quote)
url = f"{self.config.org_url}?" + encoded_parameters
token_request_object = await request_exec.create_request(
"POST",
url,
None,
{
"Accept": "application/json",
"Content-Type": "application/x-www-form-urlencoded",
},
oauth=True,
)
_, res_details, res_json, err = await request_exec.fire_request(
token_request_object[0]
)
return json.loads(res_json).get("access_token")


class Auth0AuthenticationProvider(AuthenticationProvider):
Expand Down