Skip to content

Commit

Permalink
Merge pull request #8 from robertbetts/develop
Browse files Browse the repository at this point in the history
Prep for release 0.1.6
  • Loading branch information
robertbetts committed Jul 20, 2023
2 parents f55e019 + 5e603ed commit 3a1ee25
Show file tree
Hide file tree
Showing 40 changed files with 2,358 additions and 720 deletions.
123 changes: 123 additions & 0 deletions example_src/msal_flows/acquire_token_on_behalf_of_example.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
"""
Example created By Robert Betts copying code layout from confidential_client_certificate_example.py
and username_password_example.py
The configuration file would look like this:
{
"authority": "https://login.microsoftonline.com/organizations",
"client_id": "your_client_id",
"username": "your_username@your_tenant.com",
"password": "This is a sample only. You better NOT persist your password.",
"scope": ["User.ReadBasic.All"],
// You can find the other permission names from this document
// https://docs.microsoft.com/en-us/graph/permissions-reference
"endpoint": "https://graph.microsoft.com/v1.0/users"
// You can find more Microsoft Graph API endpoints from Graph Explorer
// https://developer.microsoft.com/en-us/graph/graph-explorer
}
You can then run this sample with a JSON configuration file:
python sample.py parameters.json
"""

import sys # For simplicity, we'll read config file from 1st CLI param sys.argv[1]
import json
import logging

import requests
import msal


# Optional logging
logging.basicConfig(level=logging.DEBUG) # Enable DEBUG log for entire script
logging.getLogger("msal").setLevel(logging.INFO) # Optionally disable MSAL DEBUG logs


config = {
"authority": "https://localhost:5005/adfs",
"client_id": "CLIENT-5700-DEV",
"client_secret": "your_client_secret",
"username": "your_username@your_tenant.com",
"password": "This is a sample only. You better NOT persist your password.",
"scope": ["URI:API:CLIENT-5700-API"],
"endpoint": "http://localhost:5700/mock-api/api/private",
"scope2": ["URI:API:CLIENT-5800-API"],
"endpoint2": "http://localhost:5800/mock-api/api/private",
"thumbprint": "thumbprint_value",
"private_key_file": "src/openid_whisperer/demo_certs/key.pem",
}


# Create a preferably long-lived app instance which maintains a token cache.
http_client = requests.session()
http_client.verify = False
app = msal.ConfidentialClientApplication(
config["client_id"],
authority=config["authority"],
client_credential={
# "thumbprint": config["thumbprint"],
"private_key": open(config['private_key_file']).read(),
"thumbprint": "thumbprint_value".encode("utf-8").hex(),
},
validate_authority=False,
verify=False,
http_client=http_client,
# token_cache=... # Default cache is in memory only.
# You can learn how to use SerializableTokenCache from
# https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache
)


# The pattern to acquire a token looks like this.
result = None

# Firstly, check the cache to see if this end user has signed in before
accounts = app.get_accounts(username=config["username"])
if accounts:
logging.info("Account(s) exists in cache, probably with token too. Let's try.")
result = app.acquire_token_silent(config["scope"], account=accounts[0])

if not result:
logging.info("No suitable token exists in cache. Let's get a new one from AAD.")
# See this page for constraints of Username Password Flow.
# https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Username-Password-Authentication
result = app.acquire_token_by_username_password(
config["username"], config["password"], scopes=config["scope"]
)

if "error" in result:
print(result.get("error"))
print(result.get("error_description"))
print(result.get("correlation_id")) # You may need this when reporting a bug
sys.exit()

print(result)
user_assertion = result["access_token"]
logging.info("Now acquire an OBO token from AAD.")
obo_result = app.acquire_token_on_behalf_of(
user_assertion=result["access_token"], scopes=config["scope2"], claims_challenge=None
)

if "access_token" in obo_result:

logging.info("Using OBO token to mke a request to API 2")
graph_data = requests.get( # Use token to call downstream service
config["endpoint2"],
headers={"Authorization": "Bearer " + obo_result["access_token"]},
verify=False,
).json()
print("API call result: %s" % json.dumps(graph_data, indent=2))
else:
print(obo_result.get("error"))
print(obo_result.get("error_description"))
print(obo_result.get("correlation_id")) # You may need this when reporting a bug
if 65001 in obo_result.get(
"error_codes", []
): # Not mean to be coded programatically, but...
# AAD requires user consent for U/P flow
print(
"Visit this to consent:", app.get_authorization_request_url(config["scope2"])
)
94 changes: 94 additions & 0 deletions example_src/msal_flows/confidential_client_certificate_sample.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,94 @@
"""
The configuration file would look like this (sans those // comments):
{
"authority": "https://login.microsoftonline.com/Enter_the_Tenant_Name_Here",
"client_id": "your_client_id",
"scope": ["https://graph.microsoft.com/.default"],
// Specific to Client Credentials Grant i.e. acquire_token_for_client(),
// you don't specify, in the code, the individual scopes you want to access.
// Instead, you statically declared them when registering your application.
// Therefore the only possible scope is "resource/.default"
// (here "https://graph.microsoft.com/.default")
// which means "the static permissions defined in the application".
"thumbprint": "790E... The thumbprint generated by AAD when you upload your public cert",
"private_key_file": "filename.pem",
// For information about generating thumbprint and private key file, refer:
// https://github.com/AzureAD/microsoft-authentication-library-for-python/wiki/Client-Credentials#client-credentials-with-certificate
"endpoint": "https://graph.microsoft.com/v1.0/users"
// For this resource to work, you need to visit Application Permissions
// page in portal, declare scope User.Read.All, which needs admin consent
// https://github.com/Azure-Samples/ms-identity-python-daemon/blob/master/2-Call-MsGraph-WithCertificate/README.md
}
You can then run this sample with a JSON configuration file:
python sample.py parameters.json
"""

import sys # For simplicity, we'll read config file from 1st CLI param sys.argv[1]
import json
import logging

import requests
import msal


# Optional logging
logging.basicConfig(level=logging.DEBUG) # Enable DEBUG log for entire script
logging.getLogger("msal").setLevel(logging.INFO) # Optionally disable MSAL DEBUG logs

config = {
"authority": "https://localhost:5005/adfs",
"client_id": "CLIENT-90274-DEV",
"client_secret": "your_client_secret",
"username": "your_username@your_tenant.com",
"password": "This is a sample only. You better NOT persist your password.",
"scope": ["URI:API:CLIENT-90274-API"],
"endpoint": "http://localhost:5700/mock-api/api/private",
"thumbprint": "thumbprint_value",
"private_key_file": "src/openid_whisperer/demo_certs/key.pem",
}

# Create a preferably long-lived app instance which maintains a token cache.
app = msal.ConfidentialClientApplication(
config["client_id"],
authority=config["authority"],
client_credential={
# "thumbprint": config["thumbprint"],
"private_key": open(config['private_key_file']).read(),
"thumbprint": "thumbprint_value".encode("utf-8").hex(),
},
verify=False,
# token_cache=... # Default cache is in memory only.
# You can learn how to use SerializableTokenCache from
# https://msal-python.readthedocs.io/en/latest/#msal.SerializableTokenCache
)

# The pattern to acquire a token looks like this.
result = None

# Firstly, looks up a token from cache
# Since we are looking for token for the current app, NOT for an end user,
# notice we give account parameter as None.
result = app.acquire_token_silent(config["scope"], account=None)

if not result:
logging.info("No suitable token exists in cache. Let's get a new one from AAD.")
result = app.acquire_token_for_client(scopes=config["scope"])

if "access_token" in result:
# Calling endpoint using the access token
response = requests.get( # Use token to call downstream service
config["endpoint"],
headers={'Authorization': 'Bearer ' + result['access_token']},
verify=False,
)
data = response.json()
print("Endpoint API call result: %s" % json.dumps(data, indent=2))
else:
print(result.get("error"))
print(result.get("error_description"))
print(result.get("correlation_id")) # You may need this when reporting a bug
111 changes: 67 additions & 44 deletions example_src/msal_flows/device_flow_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,44 +30,55 @@

def submit_credentials_with_challenge_code(config, challenge_info):
"""Mock the end user authenticating and submitting the user code provided to them."""
url_parts = urlparse(challenge_info["verification_uri"])
query = url_parts.query
query_items: List[tuple[str, str]] = [
(item.split("=", 1)[0], item.split("=", 1)[1])
for item in [part for part in query.split("&")]
]
query_params = dict(query_items)
logging.info(pformat(query_params))

auth_url = f"{url_parts.scheme}://{url_parts.netloc}{url_parts.path}"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
}
data = {
"response_type": query_params["response_type"],
"client_id": query_params["client_id"],
"prompt": query_params["prompt"],
"scope": query_params["scope"],
"resource": query_params["resource"],
"UserName": config["username"],
"Password": config["password"],
"CodeChallenge": challenge_info["user_code"],
"code_challenge_method": query_params["code_challenge_method"],
"code_challenge": challenge_info.get("code_challenge", ""),
}
response = requests.post(auth_url, data=data, headers=headers, verify=False)
logging.info(response.status_code)
if response.status_code != 200:
logging.info("Error processing end user verification of the user code")
sys.exit()
else:
logging.info("End User code_challenge submitted")
assert (
"Success, you have validated the user code provided to you."
in response.text
try:
url_parts = urlparse(challenge_info["verification_uri"])
query = url_parts.query
query_items: List[tuple[str, str]] = [
(item.split("=", 1)[0], item.split("=", 1)[1])
for item in [part for part in query.split("&")]
]
query_params = dict(query_items)
print(
"Responding to the following authentication request:\n{pformat(query_params)}\n"
)

auth_url = f"{url_parts.scheme}://{url_parts.netloc}{url_parts.path}"
headers = {
"Content-Type": "application/x-www-form-urlencoded",
"Accept": "application/json",
}
data = {
"response_type": query_params["response_type"],
"client_id": query_params["client_id"],
"prompt": query_params["prompt"],
"scope": query_params["scope"],
"resource": query_params["resource"],
"UserName": config["username"],
"Password": config["password"],
"CodeChallenge": challenge_info["user_code"],
"code_challenge_method": query_params["code_challenge_method"],
"code_challenge": challenge_info.get("code_challenge", ""),
}
device_response = requests.post(
auth_url, data=data, headers=headers, verify=False
)
print(
f"\nEnd User code_challenge submission response: {device_response.status_code}"
)

if device_response.status_code != 200:
print("\nError processing end user verification of the user code")
sys.exit()
else:
assert (
"Success, you have validated the user code provided to you."
in device_response.text
)
print("\nSuccessful verification of the user code")
except Exception as e:
print("\nError during verification of the user code")
logging.exception(e)


# Optional logging
logging.basicConfig(level=logging.DEBUG) # Enable DEBUG log for entire script
Expand All @@ -80,9 +91,10 @@ def submit_credentials_with_challenge_code(config, challenge_info):
"username": "your_username@your_tenant.com",
"password": "This is a sample only. You better NOT persist your password.",
"scope": ["URI:API:CLIENT-90274-API"],
"endpoint": "https://localhost:5700/mock-api/api/private",
"endpoint": "http://localhost:5700/mock-api/api/private",
}


# Create a preferably long-lived app instance which maintains a token cache.
app = msal.PublicClientApplication(
config["client_id"],
Expand Down Expand Up @@ -120,29 +132,40 @@ def submit_credentials_with_challenge_code(config, challenge_info):
"Fail to create device flow. Err: %s" % json.dumps(flow, indent=4)
)

print(flow["message"])
print(f'\nDevice Prompt:\n{flow["message"]}\n\n')
sys.stdout.flush() # Some terminal needs this to ensure the message is shown

# Short cust to emulate what the user would do
submit_credentials_with_challenge_code(config, flow)

# Ideally you should wait here, in order to save some unnecessary polling
# input("Press Enter after signing in from another device to proceed, CTRL+C to abort.")

# Short cust to emulate what the user would do
submit_credentials_with_challenge_code(config, flow)

result = app.acquire_token_by_device_flow(flow) # By default it will block
# You can follow this instruction to shorten the block time
# https://msal-python.readthedocs.io/en/latest/#msal.PublicClientApplication.acquire_token_by_device_flow
# or you may even turn off the blocking behavior,
# and then keep calling acquire_token_by_device_flow(flow) in your own customized loop.

if "access_token" in result:
print(f"\nDevice code process successful.")
print(f'Obtained Bearer token: {result["access_token"]}')
print(f'Usering token to access {config["endpoint"]}')
# Calling graph using the access token
graph_data = requests.get( # Use token to call downstream service
response = requests.get( # Use token to call downstream service
config["endpoint"],
headers={"Authorization": "Bearer " + result["access_token"]},
verify=False,
).json()
print("API call result: %s" % json.dumps(graph_data, indent=2))
)
try:
graph_data = response.json()
print("API call result: %s" % json.dumps(graph_data, indent=2))
except Exception as e:
print(e)
print(
f"Expected JSON response from endpoint:\nstatus_code:{response.status_code}\nurl:{response.url}"
)

else:
print(result.get("error"))
print(result.get("error_description"))
Expand Down
3 changes: 2 additions & 1 deletion example_src/msal_flows/interactive_sample.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,10 @@
"username": "your_username@your_tenant.com",
"password": "This is a sample only. You better NOT persist your password.",
"scope": ["URI:API:CLIENT-90274-API"],
"endpoint": "https://localhost:5700/mock-api/api/private",
"endpoint": "http://localhost:5700/mock-api/api/private",
}


# Create a preferably long-lived app instance which maintains a token cache.
app = msal.PublicClientApplication(
config["client_id"],
Expand Down
2 changes: 1 addition & 1 deletion example_src/msal_flows/migrate_rt.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def get_preexisting_rt_and_their_scopes_from_elsewhere():
"username": "your_username@your_tenant.com",
"password": "This is a sample only. You better NOT persist your password.",
"scope": ["URI:API:CLIENT-90274-API"],
"endpoint": "https://localhost:5700/mock-api/api/private",
"endpoint": "http://localhost:5700/mock-api/api/private",
}

app = msal.PublicClientApplication(
Expand Down

0 comments on commit 3a1ee25

Please sign in to comment.