# STARS Currencies

- Authenticate with the STARS API
- Retrieve users for a VGS
- Retrieve engineering currencies for a user
- Determine currencies expiring within 30 days
- [Follow the guide](https://rafac.sharepoint.com/sites/msteams_5e36f3/_layouts/15/stream.aspx?id=%2Fsites%2Fmsteams%5F5e36f3%2FShared%20Documents%2FGeneral%2FSTARS%5FPower%20BI%20Connectivity%2Emp4&ct=1764703518233&or=OWA%2DNT%2DMail&cid=dc46e390%2D934d%2D4e31%2D596c%2D1ef5bd9b6f10&ga=1&referrer=StreamWebApp%2EWeb&referrerScenario=AddressBarCopied%2Eview%2E0646d6b0%2Dcf0e%2D4714%2D8d85%2D70c8e3dcb969&isDarkMode=true)

## User Information

In [None]:
# Load environment variables
import os
import requests

from pathlib import Path
from dotenv import load_dotenv

# Get root directory
root_dir = Path.cwd().parent
env_path = root_dir / ".env"

load_dotenv(env_path)

STARS_URI = os.environ["STARS_URI"]
RESOURCE_ID = os.environ["RESOURCE_ID"]
_API_KEY = os.environ["STARS_API_KEY"]


def auth_header() -> dict:
    """Construct headers for STARS API requests."""
    return {
        "Authorization": _API_KEY,
    }


def get_person(person_id: str) -> dict:
    """Retrieve user information from STARS API."""
    # Call STARS API to get person details
    url = f"{STARS_URI}/person/personnel"
    response = requests.get(url, params={"ids": person_id}, headers=auth_header())
    response.raise_for_status()

    # Check response and return data
    if not response.json()["data"]:
        raise ValueError(f"Person with ID {person_id} not found.")

    return response.json()["data"][0]


response = get_person(RESOURCE_ID)
response

In [None]:
"""Get user information from USER_ID"""


def get_user(user_id: str) -> dict:
    """Retrieve user information from STARS API."""
    # Call STARS API to get user details
    url = f"{STARS_URI}/user/users/"
    response = requests.get(url, params={"ids": user_id}, headers=auth_header())
    response.raise_for_status()

    # Check response and return data
    if not response.json()["data"]:
        raise ValueError(f"User with ID {user_id} not found.")

    return response.json()["data"][0]


# Get user ID from person data
user_id = get_person(RESOURCE_ID)["userId"]
response = get_user(user_id)
response

In [None]:
"""Get user email from USER_ID"""

USER_ID = os.environ["USER_ID"]

user_email = get_user(USER_ID)["email"]
user_email

## Engineering Authorisations

In [None]:
"""List engineering authorisations for a user"""

from urllib.parse import quote


def get_eng_auths_for_user(person_id: str) -> list[dict]:
    """Retrieve engineering authorisations for a person from STARS API."""
    # URL encode resource ID
    resource_id = quote(person_id, safe="")

    # Call STARS API to get engineering authorisations
    extra = "?view=Current&baseDate=2025-12-03"
    url = f"{STARS_URI}/eng/personnel/{resource_id}/auths{extra}"
    response = requests.get(url, headers=auth_header())
    response.raise_for_status()

    return response.json()["data"]


eng_auths = get_eng_auths_for_user(RESOURCE_ID)[:2]
eng_auths

In [None]:
"""Get auths by expiry date"""

import datetime

ORG_UNIT_ID = os.environ["ORG_UNIT_ID"]


def get_expiring_auths_by_date(unit_id: str, expiry_date: str) -> list[dict]:
    """Get expiring auths after by a given date for a unit"""
    url = f"{STARS_URI}/eng/personnel/auths"
    params = {
        "view": "Expiring",
        "trade": "",
        "baseDate": datetime.date.fromisoformat(expiry_date),
        "orgUnitID": unit_id,
    }
    response = requests.get(url, params=params, headers=auth_header())
    response.raise_for_status()

    # Validate response
    if not response.json()["data"]:
        raise ValueError(f"No auths found for unit ID: {unit_id}")

    return response.json()["data"]


expiring_auths = get_expiring_auths_by_date(ORG_UNIT_ID, "2025-12-30")[:2]
expiring_auths

In [None]:
"""Join user and their auths"""

ORG_UNIT_ID = os.environ["ORG_UNIT_ID"]
RESOURCE_ID = os.environ["RESOURCE_ID"]


def get_user_from_resource(person_id: str) -> dict:
    """Get user info from resource (person) ID"""
    person = get_person(person_id)
    user_info = get_user(person["userId"])
    return user_info


expiring_auth = get_expiring_auths_by_date(ORG_UNIT_ID, "2025-12-30")[0]
user_info = get_user_from_resource(expiring_auth["resourceId"])

# Join user info to the auth.
expiring_auth["user_info"] = user_info
expiring_auth

## Store Expiring Auths

In [None]:
"""Append users to each expiring auth"""

In [None]:
"""Group by user rather than auth"""

In [None]:
"""Connect to MongoDB"""

In [None]:
"""Update expiring auths collection for the unit"""

## Email Users

In [None]:
"""Create email client"""

In [None]:
"""Send email to test user with expiring auths"""