### üõ†Ô∏è Initialize Notebook Variables

**Only modify entries under _USER CONFIGURATION_.**

In [None]:
import utils
from apimtypes import *

# ------------------------------
#    USER CONFIGURATION
# ------------------------------

rg_location = 'eastus2'
index       = 1
apim_sku    = APIM_SKU.BASICV2              # Options: 'BASICV2', 'STANDARDV2', 'PREMIUMV2'
deployment  = INFRASTRUCTURE.SIMPLE_APIM    # Options: 'AFD_APIM_PE', 'APIM_ACA', 'SIMPLE_APIM'
api_prefix  = 'authX-pro-'                                  # ENTER A PREFIX FOR THE APIS TO REDUCE COLLISION POTENTIAL WITH OTHER SAMPLES
tags        = ['authX-pro', 'jwt', 'policy-fragment']       # ENTER DESCRIPTIVE TAG(S)



# ------------------------------
#    SYSTEM CONFIGURATION
# ------------------------------

# Create the notebook helper with JWT support
sample_folder    = 'authX-pro'
rg_name          = utils.get_infra_rg_name(deployment, index)
supported_infras = [INFRASTRUCTURE.AFD_APIM_PE, INFRASTRUCTURE.APIM_ACA, INFRASTRUCTURE.SIMPLE_APIM]
nb_helper        = utils.NotebookHelper(sample_folder, rg_name, rg_location, deployment, supported_infras, True, index = index, apim_sku = apim_sku)

# Define the APIs and their operations and policies

# Set up the named values
nvs: List[NamedValue] = [
    NamedValue(nb_helper.jwt_key_name, nb_helper.jwt_key_value_bytes_b64, True),
    NamedValue('HRMemberRoleId', Role.HR_MEMBER),
    NamedValue('HRAssociateRoleId', Role.HR_ASSOCIATE),
    NamedValue('HRAdministratorRoleId', Role.HR_ADMINISTRATOR)
]

# Set up the policy fragments
pf_authx_hr_member_xml = utils.read_policy_xml('pf-authx-hr-member.xml', {
    'jwt_signing_key'   : nb_helper.jwt_key_name,
    'hr_member_role_id' : 'HRMemberRoleId'
}, sample_folder)

pfs: List[PolicyFragment] = [
    PolicyFragment('AuthX-HR-Member', pf_authx_hr_member_xml, 'Authenticates and authorizes HR members.')
]

# Define the Products
pol_hr_product = utils.read_policy_xml('hr_product.xml', {
    'jwt_signing_key'   : nb_helper.jwt_key_name, 
    'hr_member_role_id' : 'HRMemberRoleId'
}, sample_folder)

hr_product_name = 'hr'
products: List[Product] = [
    Product(hr_product_name, 'Human Resources', 
            'Product for Human Resources APIs providing access to employee data, organizational structure, benefits information, and HR management services. Includes JWT-based authentication for HR members.', 
            'published', True, False, pol_hr_product)
]

# Define the APIs and their operations and policies
pol_hr_all_operations_pro = utils.read_policy_xml('hr_all_operations_pro.xml', sample_name = sample_folder)
pol_hr_get                = utils.read_policy_xml('hr_get.xml', sample_name = sample_folder)
pol_hr_post               = utils.read_policy_xml('hr_post.xml', sample_name = sample_folder)

# API 1: Employees (HR)
hr_employees_path = f'{api_prefix}employees'
hr_employees_get  = GET_APIOperation('Gets the employees', pol_hr_get,)
hr_employees_post = POST_APIOperation('Creates a new employee', pol_hr_post)
hr_employees      = API(hr_employees_path, 'Employees Pro', hr_employees_path, 'This is a Human Resources API for employee information', pol_hr_all_operations_pro,
                        operations = [hr_employees_get, hr_employees_post], tags = tags, productNames = [hr_product_name], subscriptionRequired = False)

# API 2: Benefits (HR)
hr_benefits_path = f'{api_prefix}benefits'
hr_benefits_get  = GET_APIOperation('Gets employee benefits', pol_hr_get)
hr_benefits_post = POST_APIOperation('Creates employee benefits', pol_hr_post)
hr_benefits      = API(hr_benefits_path, 'Benefits Pro', hr_benefits_path, 'This is a Human Resources API for employee benefits', pol_hr_all_operations_pro,
                        operations = [hr_benefits_get, hr_benefits_post], tags = tags, productNames = [hr_product_name], subscriptionRequired = False)

# APIs Array
apis: List[API] = [hr_employees, hr_benefits]

utils.print_ok('Notebook initialized')

### üöÄ Deploy Infrastructure and APIs

Creates the bicep deployment into the previously-specified resource group. A bicep parameters, `params.json`, file will be created prior to execution.

In [None]:
# Build the bicep parameters
bicep_parameters = {
    'apis'            : {'value': [api.to_dict() for api in apis]},
    'namedValues'     : {'value': [nv.to_dict() for nv in nvs]},
    'policyFragments' : {'value': [pf.to_dict() for pf in pfs]},
    'products'        : {'value': [product.to_dict() for product in products]}
}

# Deploy the sample
output = nb_helper.deploy_sample(bicep_parameters)

if output.success:
    # Extract deployment outputs for testing
    apim_name        = output.get('apimServiceName', 'APIM Service Name')
    apim_gateway_url = output.get('apimResourceGatewayURL', 'APIM API Gateway URL')
    apim_products    = output.getJson('productOutputs', 'Products')

    utils.print_ok('Deployment completed successfully')
else:
    utils.print_error("Deployment failed!")
    raise SystemExit(1)

### ‚úÖ Verify API Request Success

Assert that the deployment was successful by making simple calls to APIM. 

‚ùóÔ∏è If the infrastructure shields APIM and requires a different ingress (e.g. Azure Front Door), the request to the APIM gateway URl will fail by design. Obtain the Front Door endpoint hostname and try that instead.

In [None]:
import utils
from apimrequests import ApimRequests
from apimtesting import ApimTesting
from apimtypes import Role
from users import UserHelper
from authfactory import AuthFactory

# Initialize testing framework
tests = ApimTesting("AuthX-Pro Sample Tests", sample_folder, deployment)
hr_product_apim_subscription_key = apim_products[0]['subscriptionPrimaryKey']

# Preflight: Check if the infrastructure architecture deployment uses Azure Front Door. If so, assume that APIM is not directly accessible and use the Front Door URL instead.
endpoint_url = utils.test_url_preflight_check(deployment, rg_name, apim_gateway_url)

# 1) HR Administrator
# Create a JSON Web Token with a payload and sign it with the symmetric key from above.
encoded_jwt_token_hr_admin = AuthFactory.create_symmetric_jwt_token_for_user(UserHelper.get_user_by_role(Role.HR_ADMINISTRATOR), nb_helper.jwt_key_value)
print(f'\nJWT token for HR Admin:\n{encoded_jwt_token_hr_admin}')  # this value is used to call the APIs via APIM

# Set up an APIM requests object with the JWT token
reqsApimAdmin = ApimRequests(endpoint_url, hr_product_apim_subscription_key)
reqsApimAdmin.headers['Authorization'] = f'Bearer {encoded_jwt_token_hr_admin}'

# Call APIM
output = reqsApimAdmin.singleGet(hr_employees_path, msg = 'Calling GET Employees API via API Management Gateway URL. Expect 200.')
tests.verify(output, 'Successful GET')

output = reqsApimAdmin.singlePost(hr_employees_path, msg = 'Calling POST Employees API via API Management Gateway URL. Expect 200.')
tests.verify(output, 'Successful POST')

output = reqsApimAdmin.singleGet(hr_benefits_path, msg = 'Calling GET Benefits API via API Management Gateway URL. Expect 200.')
tests.verify(output, 'Successful GET')

output = reqsApimAdmin.singlePost(hr_benefits_path, msg = 'Calling POST Benefits API via API Management Gateway URL. Expect 200.')
tests.verify(output, 'Successful POST')

# 2) HR Associate
# Create a JSON Web Token with a payload and sign it with the symmetric key from above.
encoded_jwt_token_hr_associate = AuthFactory.create_symmetric_jwt_token_for_user(UserHelper.get_user_by_role(Role.HR_ASSOCIATE), nb_helper.jwt_key_value)
print(f'\nJWT token for HR Associate:\n{encoded_jwt_token_hr_associate}')  # this value is used to call the APIs via APIM

# Set up an APIM requests object with the JWT token
reqsApimAssociate = ApimRequests(endpoint_url, hr_product_apim_subscription_key)
reqsApimAssociate.headers['Authorization'] = f'Bearer {encoded_jwt_token_hr_associate}'

# Call APIM
output = reqsApimAssociate.singleGet(hr_employees_path, msg = 'Calling GET Employees API via API Management Gateway URL. Expect 200.')
tests.verify(output, 'Successful GET')

output = reqsApimAssociate.singlePost(hr_employees_path, msg = 'Calling POST Employees API via API Management Gateway URL. Expect 403.')
tests.verify(output, 'Access denied - no matching roles found')

output = reqsApimAssociate.singleGet(hr_benefits_path, msg = 'Calling GET Benefits API via API Management Gateway URL. Expect 200.')
tests.verify(output, 'Successful GET')

output = reqsApimAssociate.singlePost(hr_benefits_path, msg = 'Calling POST Benefits API via API Management Gateway URL. Expect 403.')
tests.verify(output, 'Access denied - no matching roles found')

# 3) HR Administrator but no HR product subscription key (api-key)
# Set up an APIM requests object with the JWT token
reqsApimAdminNoHrProduct = ApimRequests(endpoint_url)
reqsApimAdminNoHrProduct.headers['Authorization'] = f'Bearer {encoded_jwt_token_hr_admin}'

# Call APIM
output = reqsApimAdminNoHrProduct.singleGet(hr_employees_path, msg = 'Calling GET Employees API via API Management Gateway URL but with no HR product subscription key. Expect 403.')
tests.verify(output, 'Access denied - no matching product found')

# 4) HR Associate but no HR product subscription key (api-key)
# Set up an APIM requests object with the JWT token
reqsApimAssociateNoHrProduct = ApimRequests(endpoint_url)
reqsApimAssociateNoHrProduct.headers['Authorization'] = f'Bearer {encoded_jwt_token_hr_associate}'

# Call APIM
output = reqsApimAssociateNoHrProduct.singleGet(hr_employees_path, msg = 'Calling GET Employees API via API Management Gateway URL but with no HR product subscription key. Expect 403.')
tests.verify(output, 'Access denied - no matching product found')

tests.print_summary()

utils.print_ok('All done!')