In [None]:
import os

from ldap3 import Server, Connection, ALL, ObjectDef, AttrDef, Entry, Writer
import pandas as pd
from tqdm.auto import tqdm
from dotenv import load_dotenv

from nyborg_rpa.utils.ms_graph import MSGraphClient

In [None]:
load_dotenv(dotenv_path=r"J:\RPA\.baseflow\.env", override=True)

In [None]:
ms_graph_client = MSGraphClient(
    client_id=os.environ["MS_GRAPH_CLIENT_ID"],
    client_secret=os.environ["MS_GRAPH_CLIENT_SECRET"],
    tenant_id=os.environ["MS_GRAPH_TENANT_ID"],
)

In [None]:
server = Server(
    host=os.environ["AD_LDAP_HOST"],
    port=int(os.environ["AD_LDAP_PORT"]),
    use_ssl=True,
    get_info=ALL,
)

conn = Connection(
    server,
    user=os.environ["AD_LDAP_USER"],
    password=os.environ["AD_LDAP_PASSWORD"],
    authentication="SIMPLE",
    auto_bind=True,
)

In [None]:
eid_users = ms_graph_client.get_paged("https://graph.microsoft.com/beta/users")

for user in eid_users:

    # extract info
    upn: str = user["userPrincipalName"]
    username, domain = upn.split("@")
    mail: str = user.get("mail")
    proxies = list(user.get("proxyAddresses", []))
    existing_smtp = [p for p in proxies if p.lower().startswith("smtp:")]
    smtp_domains = {p.split("@")[1].lower() for p in existing_smtp if "@" in p}
    # proxies_lower = [p.lower() for p in proxies]
    # expected_smtp = [f"smtp:{upn.lower()}", f"smtp:{upn.split('@')[0].lower()}@nyborg365.onmicrosoft.com"]

    # add info
    user["errors"] = []
    user["smtps"] = existing_smtp
    user["smtp_count"] = len(existing_smtp)

    last_sync = user.get("onPremisesLastSyncDateTime")
    user["days_since_sync"] = (pd.to_datetime("now", utc=True) - pd.to_datetime(last_sync, utc=True)).days if last_sync else None
    user["days_since_creation"] = (pd.to_datetime("now", utc=True) - pd.to_datetime(user.get("createdDateTime"), utc=True)).days if user.get("createdDateTime") else None

    active_plans = [plan for plan in user.get("assignedPlans", []) if plan.get("capabilityStatus", "") != "Deleted"]
    user["has_exchange"] = "exchange" in str(active_plans).lower()

In [None]:
obj_def = ObjectDef(["person"], conn)
attributes = ["objectSid", "distinguishedName", "userPrincipalName", "mail", "mailNickname", "proxyAddresses", "userAccountControl"]

for attr in attributes:
    obj_def += AttrDef(attr)

conn.extend.standard.paged_search(
    search_base="OU=Nyborg,DC=NYBORG,DC=DK",
    search_filter="(objectClass=person)",
    attributes=attributes,
    paged_size=1000,
    generator=False,
    get_operational_attributes=True,
)

entries: list[Entry] = conn.entries
len(entries)

In [None]:
EXCLUDED_USERNAMES = {
    "linpe",
    "band",
}

EXCLUDED_OU_PATHS = {
    "OU=Administration IT,OU=IT og Digitalisering",
    "OU=Ekstern,OU=IT og Digitalisering",
    "OU=ImportBruger,OU=IT og Digitalisering",
    "OU=ServiceBrugere,OU=IT og Digitalisering",
    "OU=Ã˜stfyns Museer",
}

In [None]:
candidates = []
for entry in entries:

    user = {"errors": []}
    for key, v in entry._state.attributes.items():
        user[key] = v.value if v.definition.single_value else v.values

    sid: str = user.get("objectSid")
    upn: str = user.get("userPrincipalName") or ""
    dn: str = user.get("distinguishedName") or ""
    uac = user.get("userAccountControl", 0)
    username, domain = upn.split("@") if "@" in upn else ("", "")
    mail: str = user.get("mail") or ""
    mail_nickname: str = user.get("mailNickname") or ""

    proxy_addrs = user.get("proxyAddresses") or []
    smtps = [addr for addr in proxy_addrs if addr.lower().startswith("smtp:")]
    primary_smtp = next((addr for addr in smtps if addr.startswith("SMTP:")), "")
    expected_primary_smtp = f"SMTP:{upn.lower()}"

    # exceptions
    if not upn or not sid or not dn:
        continue

    # skip disabled users using userAccountControl flag
    # https://learn.microsoft.com/en-us/troubleshoot/windows-server/active-directory/useraccountcontrol-manipulate-account-properties#list-of-property-flags
    if uac != 512:
        print(f"skipping disabled {username!r}...")
        continue

    if username in EXCLUDED_USERNAMES:
        print(f"skipping excluded username {username!r}...")
        continue

    if excluded_ou := next((ou for ou in EXCLUDED_OU_PATHS if ou in dn), None):
        print(f"skipping {username!r} in excluded OU path {excluded_ou!r}...")
        continue

    # type assertions
    assert isinstance(sid, str)
    assert isinstance(upn, str)
    assert isinstance(mail, str)
    assert isinstance(mail_nickname, str)
    assert isinstance(proxy_addrs, list)
    assert all(isinstance(addr, str) for addr in proxy_addrs)

    # verify that user exists in Entra ID
    eid_user = next((u for u in eid_users if u["onPremisesSecurityIdentifier"] == sid), None)
    if not eid_user:
        continue

    # add metadata to user
    user["primary_smtp"] = primary_smtp
    user["smtps"] = smtps
    user["eid_smtps"] = eid_user["smtps"]
    user["has_exchange"] = eid_user["has_exchange"]
    user["entry"] = entry

    # mail
    if not mail:
        user["errors"] += ["missing mail"]

    elif mail.lower() != upn.lower():
        user["errors"] += ["wrong mail"]

    # mailNickname
    if not mail_nickname:
        user["errors"] += ["missing nickname"]

    elif mail_nickname.lower() != username.lower():
        user["errors"] += ["wrong nickname"]

    # proxyAddresses
    if not primary_smtp:
        user["errors"] += ["missing primary SMTP"]

    elif primary_smtp.lower() != expected_primary_smtp.lower():
        user["errors"] += ["wrong primary SMTP"]

    # save candidates with errors
    if user["errors"]:
        candidates += [user]

In [None]:
df = pd.DataFrame(candidates).replace({pd.NA: None}).replace({None: ""})
print(f"Found {len(candidates)} candidates")

if candidates:
    df = df[["userPrincipalName", "errors", "mail", "mailNickname", "primary_smtp", "smtps", "eid_smtps", "has_exchange"]]

In [None]:
df

In [None]:
modifications = []
for user in tqdm(candidates):

    upn = user["userPrincipalName"]
    sid = user["objectSid"]

    eid_user = next(u for u in eid_users if u["onPremisesSecurityIdentifier"] == sid)
    entry = next(e for e in entries if "userPrincipalName" in e and e["userPrincipalName"] == upn)
    entry = entry.entry_writable(object_def=obj_def)

    # mail
    if "missing mail" in user["errors"]:
        entry["mail"] = upn

    # mailNickname
    if "missing nickname" in user["errors"]:
        entry["mailNickname"] = upn.split("@")[0]

    # proxyAddresses
    if any(e in user["errors"] for e in ["missing primary SMTP", "wrong primary SMTP"]):

        new_smtps = [f"SMTP:{upn}"]
        candidate_smtps: list[str] = [s.replace(" ", "") for s in eid_user["smtps"] + user["smtps"]]
        for smtp in candidate_smtps:
            if smtp.lower() not in [s.lower() for s in new_smtps]:
                addr = smtp.split(":", 1)[1]
                new_smtps += [f"smtp:{addr}"]

        existing_proxy_addrs: list[str] = [addr for addr in (user["proxyAddresses"] or []) if not addr.lower().startswith("smtp:")]
        entry["proxyAddresses"] = list(dict.fromkeys([*new_smtps, *existing_proxy_addrs]))


    # commit changes
    print(f"Committing changes for {upn} with changes: {entry.entry_changes}")
    modifications += [entry.entry_changes]
    writer: Writer = entry.entry_cursor
    writer.commit()
