diff --git a/.github/workflows/python-publish.yml b/.github/workflows/python-publish.yml index b888113..751b4de 100644 --- a/.github/workflows/python-publish.yml +++ b/.github/workflows/python-publish.yml @@ -8,16 +8,16 @@ name: Upload Python Package -on: workflow_dispatch +on: + release: + types: [published] permissions: contents: read jobs: deploy: - runs-on: ubuntu-latest - steps: - uses: actions/checkout@v3 - name: Set up Python diff --git a/.pylintrc b/.pylintrc index e6d1055..662f255 100644 --- a/.pylintrc +++ b/.pylintrc @@ -1,13 +1,5 @@ [pylint.messages_control] disable = - C0303, # Trailing whitespace - C0103, # Variable names - C0305, # Trailing newlines - C0304, # Missing final line C0301, # Line too long I1101, E1101, # C-modules members - W0621, # Redefine outer name R0913 # Too many arguments - -[MASTER] -ignore-paths = ^tests/ # Ignore the tests folder diff --git a/ITK_dev_shared_components/SAP/sap_login.py b/ITK_dev_shared_components/SAP/sap_login.py deleted file mode 100644 index c2c98f5..0000000 --- a/ITK_dev_shared_components/SAP/sap_login.py +++ /dev/null @@ -1,119 +0,0 @@ -"""This module provides a functions to open SAP GUI.""" - -import os -import pathlib -import subprocess -import time -from selenium import webdriver -from selenium.webdriver.common.by import By -import pywintypes -from ITK_dev_shared_components.SAP import multi_session - - -def login_using_portal(username:str, password:str): - """Open KMD Portal in Edge, login and start SAP GUI. - - Args: - user: KMD Portal username. - password: KMD Portal password. - """ - driver = webdriver.Chrome() - driver.implicitly_wait(10) - driver.get('https://portal.kmd.dk/irj/portal') - driver.maximize_window() - - #Login - user_field = driver.find_element(By.ID, 'logonuidfield') - pass_field = driver.find_element(By.ID, 'logonpassfield') - login_button = driver.find_element(By.ID, 'buttonLogon') - - user_field.clear() - user_field.send_keys(username) - - pass_field.clear() - pass_field.send_keys(password) - - login_button.click() - - #Opus - mine_genveje = driver.find_element(By.CSS_SELECTOR, "div[title='Mine Genveje']") - mine_genveje.click() - - #Wait for download and launch file - _wait_for_download() - - driver.quit() - - _wait_for_sap_to_open() - - -def _wait_for_download(): - """Private function that checks if the SAP.erp file has been downloaded. - - Raises: - TimeoutError: If the file hasn't been downloaded within 5 seconds. - """ - downloads_folder = str(pathlib.Path.home() / "Downloads") - for _ in range(10): - for file in os.listdir(downloads_folder): - if file.endswith(".sap"): - path = os.path.join(downloads_folder, file) - os.startfile(path) - return - - time.sleep(0.5) - raise TimeoutError(f".SAP file not found in {downloads_folder}") - - -def login_using_cli(username: str, password: str, client='751', system='P02') -> None: - """Open and login to SAP with commandline expressions. - - Args: - username (str): AZ username - password (str): password - client (str, optional): Kommune ID (Aarhus = 751). Defaults to '751'. - system (str, optional): Environment SID (e.g. P02 = 'KMD OPUS Produktion [P02]'). Defaults to 'P02'. - """ - - command_args = [ - r"C:\Program Files (x86)\SAP\FrontEnd\SAPgui\sapshcut.exe", - f"-system={system}", - f"-client={client}", - f"-user={username}", - f"-pw={password}" - ] - - subprocess.run(command_args, check=False) - - _wait_for_sap_to_open() - - -def _wait_for_sap_to_open() -> None: - """Check every second for 10 seconds if the SAP Gui scripting engine is available. - - Raises: - TimeoutError: If SAP doesn't start within 10 seconds. - """ - for _ in range(10): - time.sleep(1) - try: - sessions = multi_session.get_all_SAP_sessions() - if len(sessions) > 0: - return - except pywintypes.com_error: - pass - - raise TimeoutError("SAP didn't respond within 10 seconds.") - -def kill_sap(): - """Kills all SAP processes currently running.""" - os.system("taskkill /F /IM saplogon.exe") - -if __name__=="__main__": - # user = "az12345" - # password = "Hunter2" - # login_using_portal(user, password) - # login_using_cli(user, password) - kill_sap() - - diff --git a/ITK_dev_shared_components/SAP/tree_util.py b/ITK_dev_shared_components/SAP/tree_util.py deleted file mode 100644 index 034186e..0000000 --- a/ITK_dev_shared_components/SAP/tree_util.py +++ /dev/null @@ -1,71 +0,0 @@ -"""This module provides static functions to peform common tasks with SAP GuiTree COM objects.""" - -def get_node_key_by_text(tree, text: str, fuzzy=False) -> str: - """Get the node key of a node based on its text. - tree: A SAP GuiTree object. - text: The text to search for. - fuzzy: Whether to check if the node text just contains the search text. - """ - for key in tree.GetAllNodeKeys(): - t = tree.GetNodeTextByKey(key) - - if t == text or (fuzzy and text in t): - return key - - raise ValueError(f"No node with the text '{text}' was found.") - -def get_item_by_text(tree, text: str, fuzzy=False) -> tuple[str,str]: - """Get the node key and item name of an item based on its text. - tree: A SAP GuiTree object. - text: The text to search for. - fuzzy: Whether to check if the item text just contains the search text. - """ - for key in tree.GetAllNodeKeys(): - for name in tree.GetColumnNames(): - t = tree.GetItemText(key, name) - - if t == text or (fuzzy and text in t): - return (key, name) - - raise ValueError(f"No item with the text '{text}' was found.") - -def check_all_check_boxes(tree) -> None: - """Find and check all checkboxes in the tree. - tree: A SAP GuiTree object. - """ - for key in tree.GetAllNodeKeys(): - for name in tree.GetColumnNames(): - if tree.GetItemType(key, name) == 3: - tree.ChangeCheckBox(key, name, True) - -def uncheck_all_check_boxes(tree) -> None: - """Find and uncheck all checkboxes in the tree. - tree: A SAP GuiTree object. - """ - for key in tree.GetAllNodeKeys(): - for name in tree.GetColumnNames(): - if tree.GetItemType(key, name) == 3: - tree.ChangeCheckBox(key, name, False) - - - - -if __name__ == '__main__': - from ITK_dev_shared_components.SAP import multi_session - - session = multi_session.spawn_sessions(1)[0] - - tree = session.findById('/app/con[0]/ses[0]/wnd[1]/usr/cntlCONTAINER_PSOBKEY/shellcont/shell/shellcont[1]/shell[1]') - - # print([tree.GetNodeTextByKey(key) for key in tree.GetAllNodeKeys()]) - # print([tree.GetItemText(key, '&Hierarchy') for key in tree.GetAllNodeKeys()]) - - # print(list(tree.GetColumnNames())) - - # print(tree.GetItemText(' 2', '&Hierarchy')) - - # key, name = get_item_by_text(tree, '2291987', True) - # tree.ChangeCheckBox(key, name, True) - - check_all_check_boxes(tree) - uncheck_all_check_boxes(tree) \ No newline at end of file diff --git a/ITK_dev_shared_components/SAP/__init__.py b/itk_dev_shared_components/__init__.py similarity index 100% rename from ITK_dev_shared_components/SAP/__init__.py rename to itk_dev_shared_components/__init__.py diff --git a/ITK_dev_shared_components/__init__.py b/itk_dev_shared_components/graph/__init__.py similarity index 100% rename from ITK_dev_shared_components/__init__.py rename to itk_dev_shared_components/graph/__init__.py diff --git a/itk_dev_shared_components/graph/authentication.py b/itk_dev_shared_components/graph/authentication.py new file mode 100644 index 0000000..cd66de6 --- /dev/null +++ b/itk_dev_shared_components/graph/authentication.py @@ -0,0 +1,63 @@ +"""This module is responsible for authenticating a Microsoft Graph +connection.""" + +import msal + +# pylint: disable-next=too-few-public-methods +class GraphAccess: + """An object that handles access to the Graph api. + This object should not be created directly but instead + using one of the authorize methods in the graph.authentication module. + """ + def __init__(self, app: msal.PublicClientApplication, scopes: list[str]) -> str: + self.app = app + self.scopes = scopes + + def get_access_token(self): + """Get the access token to Graph. + This function automatically reuses an existing token + or refreshes an expired one. + + Raises: + RuntimeError: If the access token couldn't be acquired. + + Returns: + str: The Graph access token. + """ + account = self.app.get_accounts()[0] + token = self.app.acquire_token_silent(self.scopes, account) + + if "access_token" in token: + return token['access_token'] + + if 'error_description' in token: + raise RuntimeError(f"Token could not be acquired. {token['error_description']}") + + raise RuntimeError("Something went wrong. No error description was returned from Graph.") + + +def authorize_by_username_password(username: str, password: str, *, client_id: str, tenant_id: str) -> GraphAccess: + """Get a bearer token for the given user. + This is used in most other Graph API calls. + + Args: + username: The username of the user (email address). + password: The password of the user. + client_id: The Graph API client id in 8-4-4-12 format. + tenant_id: The Graph API tenant id in 8-4-4-12 format. + + Returns: + GraphAccess: The GraphAccess object used to authorize Graph access. + """ + authority = f"https://login.microsoftonline.com/{tenant_id}" + scopes = ["https://graph.microsoft.com/.default"] + + app = msal.PublicClientApplication(client_id, authority=authority) + app.acquire_token_by_username_password(username, password, scopes) + + graph_access = GraphAccess(app, scopes) + + # Test connection + graph_access.get_access_token() + + return graph_access diff --git a/itk_dev_shared_components/graph/mail.py b/itk_dev_shared_components/graph/mail.py new file mode 100644 index 0000000..5bd120a --- /dev/null +++ b/itk_dev_shared_components/graph/mail.py @@ -0,0 +1,317 @@ +"""This module is responsible for accessing emails using the Microsoft Graph API.""" + +from dataclasses import dataclass, field +import io + +import requests +from bs4 import BeautifulSoup + +from itk_dev_shared_components.graph.authentication import GraphAccess + + +@dataclass +# pylint: disable-next=too-many-instance-attributes +class Email: + """A class representing an email.""" + user: str + id: str = field(repr=False) + received_time: str + sender: str + receivers: list[str] + subject: str + body: str = field(repr=False) + body_type: str + has_attachments: bool + + def get_text(self) -> str: + """Get the body as plain text. + If the body is html it's converted to plaintext. + If the body is text it's returned as is. + + Returns: + str: The body as plain text. + """ + if self.body_type == 'html': + soup = BeautifulSoup(self.body, "html.parser") + return soup.get_text().strip() + + return self.body + + +@dataclass +class Attachment: + """A dataclass representing an email Attachment. + It contains the graph id, name and size of the attachment. + To get the actual data call graph.mail.get_attachment_data. + """ + email: Email = field(repr=False) + id: str = field(repr=False) + name: str + size: int + + +def get_emails_from_folder(user: str, folder_path: str, graph_access: GraphAccess) -> tuple[Email]: + """Get all emails from the specified user and folder. + You need to authorize against Graph to get the GraphAccess before using this function + see the graph.authentication module. + + Args: + user: The user who owns the folder. + folder_path: The absolute path of the folder e.g. 'Inbox/Economy/May' + graph_access: The GraphAccess object used to authenticate. + + Returns: + tuple[Email]: The emails from the given folder. + """ + folder_id = get_folder_id_from_path(user, folder_path, graph_access) + + endpoint = f"https://graph.microsoft.com/v1.0/users/{user}/mailFolders/{folder_id}/messages?$top=1000" + + response = _get_request(endpoint, graph_access) + emails_raw = response.json()['value'] + + return _unpack_email_response(user, emails_raw) + + +def get_email_as_mime(email: Email, graph_access: GraphAccess) -> io.BytesIO: + """Get an email as a file-like object in MIME format. + + Args: + email: The email to get as MIME. + graph_access: The GraphAccess object used to authenticate. + + Returns: + io.BytesIO: A file-like object of the MIME file. + """ + endpoint = f"https://graph.microsoft.com/v1.0/users/{email.user}/messages/{email.id}/$value" + response = _get_request(endpoint, graph_access) + data = response.content + return io.BytesIO(data) + + +def get_folder_id_from_path(user: str, folder_path: str, graph_access: GraphAccess) -> str: + """Get the Graph id of a folder based on the path of the folder. + You need to authorize against Graph to get the GraphAccess before using this function + see the graph.authentication module. + + Args: + user: The user who owns the folder. + folder_path: The absolute path of the folder e.g. 'Inbox/Economy/May' + graph_access: The GraphAccess object used to authenticate. + + Raises: + ValueError: If a folder in the path can't be found. + + Returns: + str: The UUID of the folder in Graph. + """ + folders = folder_path.split("/") + main_folder = folders[0] + child_folders = folders[1:] + + folder_id = None + + # Get main folder + endpoint = f"https://graph.microsoft.com/v1.0/users/{user}/mailFolders" + response = _get_request(endpoint, graph_access).json() + folder_id = _find_folder(response, main_folder) + if folder_id is None: + raise ValueError(f"Top level folder '{main_folder}' was not found for user '{user}'.") + + # Get child folders + for child_folder in child_folders: + endpoint = f"https://graph.microsoft.com/v1.0/users/{user}/mailFolders/{folder_id}/childFolders" + response = _get_request(endpoint, graph_access).json() + folder_id = _find_folder(response, child_folder) + if folder_id is None: + raise ValueError(f"Child folder '{child_folder}' not found under '{main_folder}' for user '{user}'.") + + return folder_id + + +def list_email_attachments(email: Email, graph_access: GraphAccess) -> tuple[Attachment]: + """List all attachments of the given email. This function only gets the id, name and size + of the attachment. Use get_attachment_data to get the actual data of an attachment. + + Args: + email: The email which attachments to list. + graph_access: The GraphAccess object used to authenticate. + + Returns: + tuple[Attachment]: A tuple of Attachment objects describing the attachments. + """ + endpoint = f"https://graph.microsoft.com/v1.0/users/{email.user}/messages/{email.id}/attachments?$select=name,size,id" + response = _get_request(endpoint, graph_access).json() + + attachments = [] + for att in response['value']: + attachments.append(Attachment(email, att['id'], att['name'], att['size'])) + + return tuple(attachments) + + +def get_attachment_data(attachment: Attachment, graph_access: GraphAccess) -> io.BytesIO: + """Get a file-like object representing the attachment. + + Args: + attachment: The attachment to get. + graph_access: The GraphAccess object used to authenticate. + + Returns: + io.BytesIO: A file-like object representing the attachment. + """ + email = attachment.email + endpoint = f"https://graph.microsoft.com/v1.0/users/{email.user}/messages/{email.id}/attachments/{attachment.id}/$value" + response = _get_request(endpoint, graph_access) + data_bytes = response.content + return io.BytesIO(data_bytes) + + +def move_email(email: Email, folder_path: str, graph_access: GraphAccess, *, well_known_folder: bool=False) -> None: + """Move an email to another folder under the same user. + If well_known_folder is true, the folder path is assumed to be a well defined folder. + See https://learn.microsoft.com/en-us/graph/api/resources/mailfolder?view=graph-rest-1.0 + for a list of well defined folder names. + + Args: + email: The email to move. + folder_path: The absolute path to the new folder. E.g. 'Inbox/Economy/May' + graph_access: The GraphAccess object used to authenticate. + well_known_folder: Whether the path is a 'well known folder'. Defaults to False. + """ + if well_known_folder: + folder_id = folder_path + else: + folder_id = get_folder_id_from_path(email.user, folder_path, graph_access) + + endpoint = f"https://graph.microsoft.com/v1.0/users/{email.user}/messages/{email.id}/move" + + token = graph_access.get_access_token() + headers = { + 'Authorization': f"Bearer {token}", + 'Content-Type': "application/json" + } + + body = { + 'destinationId': folder_id + } + + response = requests.post( + url=endpoint, + headers=headers, + json=body, + timeout=30 + ) + response.raise_for_status() + + new_id = response.json()['id'] + email.id = new_id + + +def delete_email(email: Email, graph_access: GraphAccess, *, permanent: bool=False) -> None: + """Delete an email from the mailbox. + If permanent is true the email is completely removed from the user's mailbox. + If permanent is false the email is instead moved to the Deleted Items folder. + + Args: + email: The email to delete. + graph_access: The GraphAccess object used to authenticate. + permanent: Whether to permanently remove the email or not. Defaults to False. + """ + if permanent: + endpoint = f"https://graph.microsoft.com/v1.0/users/{email.user}/messages/{email.id}" + + token = graph_access.get_access_token() + headers = {'Authorization': f"Bearer {token}"} + + response = requests.delete( + url=endpoint, + headers=headers, + timeout=30 + ) + response.raise_for_status() + else: + move_email(email, "deleteditems", graph_access, well_known_folder=True) + + +def _find_folder(response: dict, target_folder: str) -> str: + """Find the target folder in + + Args: + response: The json dict of the HTTP response. + target_folder: The folder to find. + + Returns: + str: The id of the target folder. + """ + for g_folder in response['value']: + if g_folder['displayName'] == target_folder: + return g_folder['id'] + return None + + +def _unpack_email_response(user: str, emails_raw: list[dict[str, str]]) -> tuple[Email]: + """Unpack a json HTTP response and create a list of Email objects. + + Args: + user: The user who owns the email folder. + json: The json dictionary created by response.json(). + + Returns: + tuple[Email]: A tuple of Email objects. + """ + emails = [] + + for email in emails_raw: + mail_id = email['id'] + received_time = email['receivedDateTime'] + sender = email['from']['emailAddress']['address'] + receivers = [r['emailAddress']['address'] for r in email['toRecipients']] + subject = email['subject'] + body = email['body']['content'] + body_type = email['body']['contentType'] + has_attachments = email['hasAttachments'] + + emails.append( + Email( + user, + mail_id, + received_time, + sender, + receivers, + subject, + body, + body_type, + has_attachments + ) + ) + + return tuple(emails) + + +def _get_request(endpoint: str, graph_access: GraphAccess) -> requests.models.Response: + """Sends a get request to the given Graph endpoint using the GraphAccess + and returns the json object of the response. + + Args: + endpoint: The URL of the Graph endpoint. + graph_access: The GraphAccess object used to authenticate. + timeout: Timeout in seconds of the HTTP request. Defaults to 10. + + Returns: + Response: The response object of the GET request. + + Raises: + HTTPError: Any errors raised while performing GET request. + """ + token = graph_access.get_access_token() + headers = {'Authorization': f"Bearer {token}"} + + response = requests.get( + endpoint, + headers=headers, + timeout=30 + ) + response.raise_for_status() + + return response diff --git a/itk_dev_shared_components/sap/__init__.py b/itk_dev_shared_components/sap/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/ITK_dev_shared_components/SAP/gridview_util.py b/itk_dev_shared_components/sap/gridview_util.py similarity index 80% rename from ITK_dev_shared_components/SAP/gridview_util.py rename to itk_dev_shared_components/sap/gridview_util.py index f927675..8889c74 100644 --- a/ITK_dev_shared_components/SAP/gridview_util.py +++ b/itk_dev_shared_components/sap/gridview_util.py @@ -1,4 +1,4 @@ -"""This module provides static functions to peform common tasks with SAP GuiGridView COM objects.""" +"""This module provides static functions to perform common tasks with SAP GuiGridView COM objects.""" def scroll_entire_table(grid_view, return_to_top=False) -> None: """This function scrolls through the entire table to load all cells. @@ -6,14 +6,13 @@ def scroll_entire_table(grid_view, return_to_top=False) -> None: Args: grid_view: A SAP GuiGridView object. return_to_top: Whether to return the table to the first row after scrolling. Defaults to False. - - Returns: - _type_: _description_ - """ + """ + if grid_view.RowCount == 0 or grid_view.VisibleRowCount == 0: + return for i in range(0, grid_view.RowCount, grid_view.VisibleRowCount): grid_view.FirstVisibleRow = i - + if return_to_top: grid_view.FirstVisibleRow = 0 @@ -28,7 +27,7 @@ def get_all_rows(grid_view, pre_load=True) -> tuple[tuple[str]]: Returns: tuple[tuple[str]]: A 2D tuple of all cell values in the gridview. - """ + """ if pre_load: scroll_entire_table(grid_view, True) @@ -43,9 +42,9 @@ def get_all_rows(grid_view, pre_load=True) -> tuple[tuple[str]]: for c in columns: v = grid_view.GetCellValue(r, c) row_data.append(v) - + output.append(tuple(row_data)) - + return tuple(output) @@ -60,7 +59,7 @@ def get_row(grid_view, row:int, scroll_to_row=False) -> tuple[str]: Returns: tuple[str]: A tuple of the row's data. - """ + """ if scroll_to_row: grid_view.FirstVisibleRow = row @@ -83,7 +82,7 @@ def iterate_rows(grid_view) -> tuple[str]: Yields: tuple[str]: A tuple of the next row's data. - """ + """ row = 0 while row < grid_view.RowCount: @@ -104,7 +103,7 @@ def get_column_titles(grid_view) -> tuple[str]: Returns: tuple[str]: A tuple of the gridview's column titles. - """ + """ return tuple(grid_view.GetColumnTitles(c)[0] for c in grid_view.ColumnOrder) @@ -123,7 +122,7 @@ def find_row_index_by_value(grid_view, column:str, value:str) -> int: Returns: int: The index of the first row which column value matches the given value. - """ + """ if column not in grid_view.ColumnOrder: raise ValueError(f"Column '{column}' not in grid_view") @@ -132,14 +131,14 @@ def find_row_index_by_value(grid_view, column:str, value:str) -> int: # Only scroll when row isn't visible if not grid_view.FirstVisibleRow <= row <= grid_view.FirstVisibleRow + grid_view.VisibleRowCount-1: grid_view.FirstVisibleRow = row - + if grid_view.GetCellValue(row, column) == value: return row - + return -1 -def find_all_row_indecies_by_value(grid_view, column:str, value:str) -> list[int]: - """Find all indecies of the rows where the given column's value +def find_all_row_indices_by_value(grid_view, column:str, value:str) -> list[int]: + """Find all indices of the rows where the given column's value match the given value. Returns an empty list if no row is found. Args: @@ -151,8 +150,8 @@ def find_all_row_indecies_by_value(grid_view, column:str, value:str) -> list[int ValueError: If the column name doesn't exist in the grid view. Returns: - list[int]: A list of row indecies where the value matches. - """ + list[int]: A list of row indices where the value matches. + """ if column not in grid_view.ColumnOrder: raise ValueError(f"Column '{column}' not in grid_view") @@ -162,39 +161,8 @@ def find_all_row_indecies_by_value(grid_view, column:str, value:str) -> list[int # Only scroll when row isn't visible if not grid_view.FirstVisibleRow <= row <= grid_view.FirstVisibleRow + grid_view.VisibleRowCount-1: grid_view.FirstVisibleRow = row - + if grid_view.GetCellValue(row, column) == value: rows.append(row) - - return rows - - - - -if __name__=='__main__': - import win32com.client - - SAP = win32com.client.GetObject("SAPGUI") - app = SAP.GetScriptingEngine - connection = app.Connections(0) - session = connection.Sessions(0) - - table = session.findById("wnd[0]/usr/cntlGRID1/shellcont/shell") - - rows = find_all_row_indecies_by_value(table, "ZZ_PARTNER", '15879880') - print(rows) - table.setCurrentCell(rows[0], "ZZ_PARTNER") - - # print(get_row(table, 1, True)) - - # scroll_entire_table(table) - - # data = get_all_rows(table) - # print(len(data), len(data[0])) - - # for r in iterate_rows(table): - # print(r) - - # print(get_column_titles(table)) - + return rows diff --git a/ITK_dev_shared_components/SAP/multi_session.py b/itk_dev_shared_components/sap/multi_session.py similarity index 80% rename from ITK_dev_shared_components/SAP/multi_session.py rename to itk_dev_shared_components/sap/multi_session.py index f1ce075..b3d01de 100644 --- a/ITK_dev_shared_components/SAP/multi_session.py +++ b/itk_dev_shared_components/sap/multi_session.py @@ -5,13 +5,15 @@ import time import threading from typing import Callable +import math + import pythoncom import win32com.client import win32gui def run_with_session(session_index:int, func:Callable, args:tuple) -> None: - """Run a function in a sepcific session based on the sessions index. - This function is meant to be run inside a seperate thread. + """Run a function in a specific session based on the sessions index. + This function is meant to be run inside a separate thread. The function must take a session object as its first argument. Note that this function will not spawn the sessions before running, use spawn_sessions to do that. @@ -19,8 +21,8 @@ def run_with_session(session_index:int, func:Callable, args:tuple) -> None: pythoncom.CoInitialize() - SAP = win32com.client.GetObject("SAPGUI") - app = SAP.GetScriptingEngine + sap = win32com.client.GetObject("SAPGUI") + app = sap.GetScriptingEngine connection = app.Connections(0) session = connection.Sessions(session_index) @@ -28,6 +30,7 @@ def run_with_session(session_index:int, func:Callable, args:tuple) -> None: pythoncom.CoUninitialize() + def run_batch(func:Callable, args:tuple[tuple], num_sessions=6) -> None: """Run a function in parallel sessions. The function will be run {num_sessions} times with args[i] as input. @@ -40,7 +43,7 @@ def run_batch(func:Callable, args:tuple[tuple], num_sessions=6) -> None: for i in range(num_sessions): t = ExThread(target=run_with_session, args=(i, func, args[i])) threads.append(t) - + for t in threads: t.start() for t in threads: @@ -49,6 +52,7 @@ def run_batch(func:Callable, args:tuple[tuple], num_sessions=6) -> None: if t.error: raise t.error + def run_batches(func:Callable, args:tuple[tuple], num_sessions=6): """Run a function in parallel batches. This function runs the input function for each set of arguments in args. @@ -62,16 +66,25 @@ def run_batches(func:Callable, args:tuple[tuple], num_sessions=6): batch = args[b:b+num_sessions] run_batch(func, args, len(batch)) + def spawn_sessions(num_sessions=6) -> list: """A function to spawn multiple sessions of SAP. This function will attempt to spawn the desired number of sessions. - If the current number of open sessions exceeds the desired number of sessions + If the current number of already open sessions exceeds the desired number of sessions the already open sessions will not be closed to match the desired number. The number of sessions must be between 1 and 6. - Returns a list of all open sessions. + + Args: + num_sessions: The number of sessions desired. Defaults to 6. + + Raises: + ValueError: If the number of sessions is not between 1 and 6. + + Returns: + tuple: A tuple of all currently open sessions. """ - SAP = win32com.client.GetObject("SAPGUI") - app = SAP.GetScriptingEngine + sap = win32com.client.GetObject("SAPGUI") + app = sap.GetScriptingEngine connection = app.Connections(0) session = connection.Sessions(0) @@ -82,25 +95,17 @@ def spawn_sessions(num_sessions=6) -> list: for _ in range(num_sessions - connection.Sessions.count): session.CreateSession() - + # Wait for the sessions to spawn while connection.Sessions.count < num_sessions: time.sleep(0.1) - sessions = list(connection.Sessions) + sessions = tuple(connection.Sessions) num_sessions = len(sessions) - if num_sessions == 1: - c = 1 - elif num_sessions <= 4: - c = 2 - elif num_sessions <= 6: - c = 3 - - if num_sessions < 3: - r = 1 - else: - r = 2 + # Calculate number of columns and rows + c = math.ceil(math.sqrt(num_sessions)) + r = math.ceil(num_sessions / c) w, h = 1920//c, 1040//r @@ -111,20 +116,22 @@ def spawn_sessions(num_sessions=6) -> list: x = i % c * w y = i // c * h win32gui.MoveWindow(hwnd, x, y, w, h, True) - + return sessions -def get_all_SAP_sessions() -> tuple: + +def get_all_sap_sessions() -> tuple: """Returns a tuple of all open SAP sessions (on connection index 0). Returns: tuple: A tuple of SAP GuiSession objects. """ - SAP = win32com.client.GetObject("SAPGUI") - app = SAP.GetScriptingEngine + sap = win32com.client.GetObject("SAPGUI") + app = sap.GetScriptingEngine connection = app.Connections(0) return tuple(connection.Sessions) + class ExThread(threading.Thread): """A thread with a handle to get an exception raised inside the thread: ExThread.error""" def __init__(self, *args, **kwargs): @@ -136,5 +143,3 @@ def run(self): self._target(*self._args, **self._kwargs) except Exception as e: # pylint: disable=broad-exception-caught self.error = e - - diff --git a/ITK_dev_shared_components/SAP/opret_kundekontakt.py b/itk_dev_shared_components/sap/opret_kundekontakt.py similarity index 76% rename from ITK_dev_shared_components/SAP/opret_kundekontakt.py rename to itk_dev_shared_components/sap/opret_kundekontakt.py index adc8168..306abff 100644 --- a/ITK_dev_shared_components/SAP/opret_kundekontakt.py +++ b/itk_dev_shared_components/sap/opret_kundekontakt.py @@ -1,17 +1,17 @@ -"""This module provides a single function to conviniently peform the action 'opret-kundekontaker' in SAP.""" +"""This module provides a single function to conveniently perform the action 'opret-kundekontaker' in SAP.""" from typing import Literal import win32clipboard -from ITK_dev_shared_components.SAP import tree_util +from itk_dev_shared_components.sap import tree_util -def opret_kundekontakter(session, fp:str, aftaler:list[str] | None, - art:Literal[' ', 'Automatisk', 'Fakturagrundlag', 'Fuldmagt ifm. værge', 'Konverteret', 'Myndighedshenvend.', 'Orientering', 'Returpost', 'Ringeaktivitet', 'Skriftlig henvend.', 'Telefonisk henvend.'], +def opret_kundekontakter(session, fp:str, aftaler:list[str] | None, + art:Literal[' ', 'Automatisk', 'Fakturagrundlag', 'Fuldmagt ifm. værge', 'Konverteret', 'Myndighedshenvend.', 'Orientering', 'Returpost', 'Ringeaktivitet', 'Skriftlig henvend.', 'Telefonisk henvend.'], notat:str, lock=None) -> None: """Creates a kundekontakt on the given FP and aftaler. Args: - session (COM Object): The SAP session to peform the action. + session (COM Object): The SAP session to preform the action. fp (str): The forretningspartner number. aftaler (list[str] | None): A list of aftaler to put the kundekontakt on. If empty or None the kundekontakt will be created on fp-level. art (str): The art of the kundekontakt. @@ -41,11 +41,11 @@ def opret_kundekontakter(session, fp:str, aftaler:list[str] | None, # Go to editor and paste (lock if multithreaded) session.findById("wnd[0]/usr/subNOTICE:SAPLEENO:1002/btnEENO_TEXTE-EDITOR").press() - if lock: + if lock: lock.acquire() - _setClipboard(notat) + _set_clipboard(notat) session.findById("wnd[0]/tbar[1]/btn[9]").press() - if lock: + if lock: lock.release() # Back and save @@ -53,21 +53,13 @@ def opret_kundekontakter(session, fp:str, aftaler:list[str] | None, session.findById("wnd[0]/tbar[0]/btn[11]").press() -def _setClipboard(text:str) -> None: +def _set_clipboard(text:str) -> None: + """Private function to set text to the clipboard. + + Args: + text: Text to set to clipboard. + """ win32clipboard.OpenClipboard() win32clipboard.EmptyClipboard() win32clipboard.SetClipboardText(text) win32clipboard.CloseClipboard() - - -if __name__ == '__main__': - from ITK_dev_shared_components.SAP import multi_session - from datetime import datetime - - session = multi_session.spawn_sessions(1)[0] - fp = '25564617' - aftaler = ['2291987', '2421562', '2311094'] - art = 'Orientering' - notat = 'Test '+ str(datetime.now()) - - opret_kundekontakter(session, fp, aftaler, 'Automatisk', notat) \ No newline at end of file diff --git a/itk_dev_shared_components/sap/sap_login.py b/itk_dev_shared_components/sap/sap_login.py new file mode 100644 index 0000000..27ffc78 --- /dev/null +++ b/itk_dev_shared_components/sap/sap_login.py @@ -0,0 +1,186 @@ +"""This module provides functions to handle opening and closing SAP Gui +as well as a function to change user passwords.""" + +import os +import pathlib +import subprocess +import time + +from selenium import webdriver +from selenium.webdriver.common.by import By +import pywintypes +import win32com.client + +from itk_dev_shared_components.sap import multi_session + + +def login_using_portal(username:str, password:str): + """Open KMD Portal in Edge, login and start SAP GUI. + Args: + user: KMD Portal username. + password: KMD Portal password. + """ + driver = webdriver.Chrome() + driver.implicitly_wait(10) + driver.get('https://portal.kmd.dk/irj/portal') + driver.maximize_window() + + #Login + user_field = driver.find_element(By.ID, 'logonuidfield') + pass_field = driver.find_element(By.ID, 'logonpassfield') + login_button = driver.find_element(By.ID, 'buttonLogon') + user_field.clear() + user_field.send_keys(username) + pass_field.clear() + pass_field.send_keys(password) + login_button.click() + + #Opus + mine_genveje = driver.find_element(By.CSS_SELECTOR, "div[title='Mine Genveje']") + mine_genveje.click() + + #Wait for download and launch file + _wait_for_download() + + driver.quit() + _wait_for_sap_session(10) + + +def _wait_for_download(): + """Private function that checks if the SAP.erp file has been downloaded. + Raises: + TimeoutError: If the file hasn't been downloaded within 5 seconds. + """ + downloads_folder = str(pathlib.Path.home() / "Downloads") + for _ in range(10): + for file in os.listdir(downloads_folder): + if file.endswith(".sap"): + path = os.path.join(downloads_folder, file) + os.startfile(path) + return + + time.sleep(0.5) + raise TimeoutError(f".sap file not found in {downloads_folder}") + + +def login_using_cli(username: str, password: str, client:str='751', system:str='P02', timeout:int=10) -> None: + """Open and login to SAP with commandline expressions. + Args: + username: AZ username + password: password + client: Kommune ID (Aarhus = 751). Defaults to '751'. + system: Environment SID (e.g. P02 = 'KMD OPUS Produktion [P02]'). Defaults to 'P02'. + timeout: The time in seconds to wait for SAP Logon to start. Defaults to 10. + Raises: + TimeoutError: If SAP doesn't start within timeout limit. + ValueError: If SAP is unable to log in using the given credentials. + """ + + command_args = [ + r"C:\Program Files (x86)\SAP\FrontEnd\SAPgui\sapshcut.exe", + f"-system={system}", + f"-client={client}", + f"-user={username}", + f"-pw={password}" + ] + + subprocess.run(command_args, check=False) + _wait_for_sap_session(timeout) + if not _check_for_splash_screen(): + raise ValueError("Unable to log in. Please check username and password.") + + +def _wait_for_sap_session(timeout:int) -> None: + """Check every second if the SAP Gui scripting engine is available until timeout is reached. + Args: + timeout: The time in seconds to wait for SAP Logon to start. Defaults to 10. + Raises: + TimeoutError: If SAP doesn't start within timeout limit. + """ + for _ in range(timeout): + time.sleep(1) + try: + sessions = multi_session.get_all_sap_sessions() + if len(sessions) > 0: + return + except pywintypes.com_error: + pass + + raise TimeoutError(f"SAP didn't respond within timeout limit: {timeout} seconds.") + + +def _check_for_splash_screen() -> bool: + """Check if the splash screen image is currently present. + Returns: + bool: True if the splash screen image is currently present. + """ + session = multi_session.get_all_sap_sessions()[0] + image = session.findById("wnd[0]/usr/cntlIMAGE_CONTAINER/shellcont/shell/shellcont[1]/shell", False) + + return image is not None + + +def change_password(username:str, old_password:str, new_password:str, + client:str='751', + system:str='...KMD OPUS Produktion [P02]', + timeout:int=10) -> None: + """Change the password of a user in SAP Gui. Closes SAP when done. + Args: + username: The username of the user. + old_password: The current password of the user. + new_password: The new password to change to. + client: The client number. Defaults to '751'. + system: The description string of the connection as displayed in SAP Logon. Defaults to '...KMD OPUS Produktion [P02]'. + timeout: The time in seconds to wait for SAP Logon to start. Defaults to 10. + Raises: + TimeoutError: If the connection couldn't be established within the timeout limit. + ValueError: If the current credentials are not valid or if the password can't be changed. + ValueError: If the new password is not valid. + + """ + + subprocess.Popen(r"C:\Program Files (x86)\SAP\FrontEnd\SAPgui\saplogon.exe") #pylint: disable=consider-using-with + + # Wait for SAP Logon to open + for _ in range(timeout): + time.sleep(1) + try: + sap = win32com.client.GetObject("SAPGUI") + app = sap.GetScriptingEngine + app.OpenConnection(system) + break + except pywintypes.com_error: + pass + else: + raise TimeoutError(f"SAP Logon didn't open within timeout limit: {timeout} seconds.") + + session = multi_session.get_all_sap_sessions()[0] + + # Enter credentials + session.findById("wnd[0]/usr/txtRSYST-MANDT").text = client + session.findById("wnd[0]/usr/txtRSYST-BNAME").text = username + session.findById("wnd[0]/usr/pwdRSYST-BCODE").text = old_password + session.findById("wnd[0]/tbar[1]/btn[5]").press() + + # Check status bar + status_bar = session.findById("wnd[0]/sbar") + if status_bar.MessageType != 'S': + text = status_bar.Text + kill_sap() + raise ValueError(f"Password change was blocked: {text}") + + # Enter new password + session.findById("wnd[1]/usr/pwdRSYST-NCODE").text = new_password + session.findById("wnd[1]/usr/pwdRSYST-NCOD2").text = new_password + session.findById("wnd[1]/tbar[0]/btn[0]").press() + + if not _check_for_splash_screen(): + kill_sap() + raise ValueError("New password couldn't be set. Please check password requirements.") + + kill_sap() + + +def kill_sap(): + """Kills all SAP processes currently running.""" + os.system("taskkill /F /IM saplogon.exe > NUL 2>&1") diff --git a/ITK_dev_shared_components/SAP/sap_util.py b/itk_dev_shared_components/sap/sap_util.py similarity index 75% rename from ITK_dev_shared_components/SAP/sap_util.py rename to itk_dev_shared_components/sap/sap_util.py index 698d17a..70d022d 100644 --- a/ITK_dev_shared_components/SAP/sap_util.py +++ b/itk_dev_shared_components/sap/sap_util.py @@ -1,7 +1,7 @@ """This module provides miscellaneous static functions to peform common tasks in SAP.""" def print_all_descendants(container, max_depth=-1, indent=0): - """Prints the object and all of its decendants recursivly + """Prints the object and all of its descendants recursively to the console. Args: @@ -23,13 +23,3 @@ def print_all_descendants(container, max_depth=-1, indent=0): print_all_descendants(child, max_depth, indent+1) else: print(indent_text) - - -if __name__=='__main__': - from ITK_dev_shared_components.SAP import multi_session - - session = multi_session.spawn_sessions(1)[0] - - usr = session.FindById('/app/con[0]/ses[0]/wnd[0]/usr') - - print_all_descendants(usr) \ No newline at end of file diff --git a/itk_dev_shared_components/sap/tree_util.py b/itk_dev_shared_components/sap/tree_util.py new file mode 100644 index 0000000..3eeca44 --- /dev/null +++ b/itk_dev_shared_components/sap/tree_util.py @@ -0,0 +1,71 @@ +"""This module provides static functions to perform common tasks with SAP GuiTree COM objects.""" + +def get_node_key_by_text(tree, text: str, fuzzy: bool=False) -> str: + """Get the node key of a node based on its text. + + Args: + tree: A SAP GuiTree object. + text: The text to search for. + fuzzy: Whether to check if the node text just contains the search text. + + Raises: + ValueError: If no node is found with the given text. + + Returns: + str: The node key of the found node. + """ + for key in tree.GetAllNodeKeys(): + t = tree.GetNodeTextByKey(key) + + if t == text or (fuzzy and text in t): + return key + + raise ValueError(f"No node with the text '{text}' was found.") + + +def get_item_by_text(tree, text: str, fuzzy: bool=False) -> tuple[str,str]: + """Get the node key and item name of an item based on its text. + + Args: + tree: A SAP GuiTree object. + text: The text to search for. + fuzzy: Whether to check if the item text just contains the search text. + + Raises: + ValueError: If no tem is found with the given text. + + Returns: + tuple[str,str]: The node key and item name of the found item. + """ + for key in tree.GetAllNodeKeys(): + for name in tree.GetColumnNames(): + t = tree.GetItemText(key, name) + + if t == text or (fuzzy and text in t): + return (key, name) + + raise ValueError(f"No item with the text '{text}' was found.") + + +def check_all_check_boxes(tree) -> None: + """Find and check all checkboxes in the tree. + + Args: + tree: A SAP GuiTree object. + """ + for key in tree.GetAllNodeKeys(): + for name in tree.GetColumnNames(): + if tree.GetItemType(key, name) == 3: + tree.ChangeCheckBox(key, name, True) + + +def uncheck_all_check_boxes(tree) -> None: + """Find and uncheck all checkboxes in the tree. + + Args: + tree: A SAP GuiTree object. + """ + for key in tree.GetAllNodeKeys(): + for name in tree.GetColumnNames(): + if tree.GetItemType(key, name) == 3: + tree.ChangeCheckBox(key, name, False) diff --git a/pyproject.toml b/pyproject.toml index 469e0d0..da6691e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -3,8 +3,8 @@ requires = ["setuptools>=65.0"] build-backend = "setuptools.build_meta" [project] -name = "ITK_dev_shared_components" -version = "0.0.6" +name = "itk_dev_shared_components" +version = "1.0.0" authors = [ { name="ITK Development", email="itk-rpa@mkb.aarhus.dk" }, ] @@ -18,7 +18,10 @@ classifiers = [ ] dependencies = [ "pywin32 >= 306", - "selenium >= 4.10.0" + "selenium >= 4.10.0", + "msal >= 1.24.1", + "requests >= 2.31.0", + "beautifulsoup4 >= 4.12.2" ] [project.urls] diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/run_tests.bat b/tests/run_tests.bat new file mode 100644 index 0000000..c1e6516 --- /dev/null +++ b/tests/run_tests.bat @@ -0,0 +1,26 @@ +@echo off + +:: Change dir to parent dir +echo Changing directory... +cd /d %~dp0.. + + +choice /C YN /M "Do you want to reset venv?" + +if errorlevel 2 ( + echo Activating excisting venv... + call .venv\Scripts\activate + +) else ( + echo Setting up new venv... + python -m venv .venv + call .venv\Scripts\activate + + echo Installing package... + pip install . +) + +echo Running unit tests... +python -m unittest discover + +pause \ No newline at end of file diff --git a/tests/test_graph/__init__.py b/tests/test_graph/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_graph/test_mail.py b/tests/test_graph/test_mail.py new file mode 100644 index 0000000..681baf2 --- /dev/null +++ b/tests/test_graph/test_mail.py @@ -0,0 +1,78 @@ +"""Tests relating to the graph.mail module.""" + +import unittest +import json +import os + +from itk_dev_shared_components.graph import authentication, mail + +class EmailTest(unittest.TestCase): + """Tests relating to the graph.mail module.""" + @classmethod + def setUpClass(cls) -> None: + # Get Graph credentials from the environment variables. + credentials = json.loads(os.environ['GraphAPI']) + client_id = credentials['client_id'] + tenant_id = credentials['tenant_id'] + username = credentials['username'] + password = credentials['password'] + + cls.graph_access = authentication.authorize_by_username_password(username, password, tenant_id=tenant_id, client_id=client_id) + + # Define mail user and folders + cls.user = "itk-rpa@mkb.aarhus.dk" + cls.folder1 = "Indbakke/Graph Test/Undermappe" + cls.folder2 = "Indbakke/Graph Test/Undermappe2" + + def test_correct_usage(self): + """Test all functions relating to the mail part of Graph. + Use a test email in the given users mailbox in the given folder. + Delete email permanently is not tested. + """ + # Get email from test folder + emails = mail.get_emails_from_folder(self.user, self.folder1, self.graph_access) + self.assertNotEqual(len(emails), 0, "No emails found in test folder!") + self.assertEqual(len(emails), 1, "More than 1 email found in test folder!") + email = emails[0] + + # Check email subject and body + self.assertEqual(email.subject, "Test subject") + self.assertTrue(email.get_text().startswith("Test text")) + + # List attachments + attachments = mail.list_email_attachments(email, self.graph_access) + self.assertEqual(len(attachments), 3) + + # Get attachment object 'Test document.pdf' + attachment = next((att for att in attachments if att.name == "Test document.pdf"), None) + self.assertIsNotNone(attachment) + + # Get attachment file and read first 4 bytes + file = mail.get_attachment_data(attachment, self.graph_access) + self.assertEqual(file.read(4), b'%PDF') + + # Get email as MIME and read first 8 bytes + file = mail.get_email_as_mime(email, self.graph_access) + self.assertEqual(file.read(8), b'Received') + + # Move the email + mail.move_email(email, self.folder2, self.graph_access) + + # Delete to deleted items folder + mail.delete_email(email, self.graph_access) + + # Move email back + mail.move_email(email, self.folder1, self.graph_access) + + + def test_wrong_usage(self): + """Test that raised errors actually get raised.""" + with self.assertRaises(ValueError): + mail.get_folder_id_from_path(self.user, "Foo/Bar", self.graph_access) + + with self.assertRaises(ValueError): + mail.get_folder_id_from_path(self.user, "Indbakke/FooBar", self.graph_access) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/test_gridview_util.py b/tests/test_gridview_util.py deleted file mode 100644 index e1bd259..0000000 --- a/tests/test_gridview_util.py +++ /dev/null @@ -1,75 +0,0 @@ -import unittest -import os -from ITK_dev_shared_components.SAP import gridview_util, sap_login, multi_session - -class test_gridview_util(unittest.TestCase): - @classmethod - def setUpClass(cls): - user, password = os.environ['SAP Login'].split(';') - sap_login.login_using_cli(user, password) - session = multi_session.get_all_SAP_sessions()[0] - session.startTransaction("fmcacov") - session.findById("wnd[0]/usr/ctxtGPART_DYN").text = "25564617" - session.findById("wnd[0]").sendVKey(0) - - @classmethod - def tearDownClass(cls): - sap_login.kill_sap() - - def setUp(self) -> None: - session = multi_session.get_all_SAP_sessions()[0] - self.table = session.findById("wnd[0]/usr/tabsDATA_DISP/tabpDATA_DISP_FC1/ssubDATA_DISP_SCA:RFMCA_COV:0202/cntlRFMCA_COV_0100_CONT5/shellcont/shell") - - def test_scroll_entire_table(self): - gridview_util.scroll_entire_table(self.table) - gridview_util.scroll_entire_table(self.table, True) - - def test_get_all_rows(self): - result = gridview_util.get_all_rows(self.table) - self.assertGreater(len(result), 0) - self.assertGreater(len(result[0]), 0) - - def test_get_row(self): - result = gridview_util.get_row(self.table, 0, False) - self.assertGreater(len(result), 0) - - result = gridview_util.get_row(self.table, 0, True) - self.assertGreater(len(result), 0) - - def test_iterate_rows(self): - for row in gridview_util.iterate_rows(self.table): - self.assertGreater(len(row), 0) - - def test_get_column_titles(self): - result = gridview_util.get_column_titles(self.table) - self.assertGreater(len(result), 0) - - def test_find_row_index_by_value(self): - result = gridview_util.find_row_index_by_value(self.table, "TXTU2", "Test Deltrans") - self.assertNotEqual(result, -1) - - result = gridview_util.find_row_index_by_value(self.table, "TXTU2", "Foo") - self.assertEqual(result, -1) - - with self.assertRaises(ValueError): - gridview_util.find_row_index_by_value(self.table, "Foo", "Bar") - - - def test_find_all_row_indecies_by_value(self): - result = gridview_util.find_all_row_indecies_by_value(self.table, "TXTU2", "Gebyr") - self.assertGreater(len(result), 0) - - result = gridview_util.find_all_row_indecies_by_value(self.table, "TXTU2", "Foo") - self.assertEqual(len(result), 0) - - with self.assertRaises(ValueError): - gridview_util.find_all_row_indecies_by_value(self.table, "Foo", "Bar") - - - - - - - -if __name__ == '__main__': - unittest.main() \ No newline at end of file diff --git a/tests/test_sap/__init__.py b/tests/test_sap/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/test_sap/test_gridview_util.py b/tests/test_sap/test_gridview_util.py new file mode 100644 index 0000000..b033a44 --- /dev/null +++ b/tests/test_sap/test_gridview_util.py @@ -0,0 +1,95 @@ +"""Tests relating to the module SAP.gridview_util.""" + +import unittest +import os +from itk_dev_shared_components.sap import gridview_util, sap_login, multi_session + +# Some tests might look similiar, and we want this. pylint: disable=duplicate-code + +class TestGridviewUtil(unittest.TestCase): + """Tests relating to the module SAP.gridview_util.""" + @classmethod + def setUpClass(cls): + """Launch SAP and navigate to fmcacov on FP 25564617 (Test person).""" + sap_login.kill_sap() + + user, password = os.environ['SAP Login'].split(';') + sap_login.login_using_cli(user, password) + session = multi_session.get_all_sap_sessions()[0] + session.startTransaction("fmcacov") + session.findById("wnd[0]/usr/ctxtGPART_DYN").text = "25564617" + session.findById("wnd[0]").sendVKey(0) + + @classmethod + def tearDownClass(cls): + sap_login.kill_sap() + + def setUp(self) -> None: + # Find SAP gridview (table) object for testing + session = multi_session.get_all_sap_sessions()[0] + self.table = session.findById("wnd[0]/usr/tabsDATA_DISP/tabpDATA_DISP_FC1/ssubDATA_DISP_SCA:RFMCA_COV:0202/cntlRFMCA_COV_0100_CONT5/shellcont/shell") + + def test_scroll_entire_table(self): + """Test scroll_entire_table. Assume success if no errors.""" + gridview_util.scroll_entire_table(self.table) + gridview_util.scroll_entire_table(self.table, True) + + def test_get_all_rows(self): + """Test get all rows of table. + Assume success if any rows and columns are loaded. + """ + result = gridview_util.get_all_rows(self.table) + self.assertGreater(len(result), 0) + self.assertGreater(len(result[0]), 0) + + def test_get_row(self): + """Test getting a single row. + Assume success if any columns are loaded. + """ + result = gridview_util.get_row(self.table, 0, False) + self.assertGreater(len(result), 0) + + result = gridview_util.get_row(self.table, 0, True) + self.assertGreater(len(result), 0) + + def test_iterate_rows(self): + """Test iterating through all rows. + Assume success if any columns are loaded for each row. + """ + for row in gridview_util.iterate_rows(self.table): + self.assertGreater(len(row), 0) + + def test_get_column_titles(self): + """Test getting column titles. + Assume success if any titles are loaded. + """ + result = gridview_util.get_column_titles(self.table) + self.assertGreater(len(result), 0) + + def test_find_row_index_by_value(self): + """Test finding a single row by column value.""" + # Test finding an actual value. + index = gridview_util.find_row_index_by_value(self.table, "TXTU2", "Test Deltrans") + self.assertNotEqual(index, -1) + + # Test NOT finding a wrong value. + index = gridview_util.find_row_index_by_value(self.table, "TXTU2", "Foo") + self.assertEqual(index, -1) + + # Test error on wrong column name. + with self.assertRaises(ValueError): + gridview_util.find_row_index_by_value(self.table, "Foo", "Bar") + + def test_find_all_row_indices_by_value(self): + """Test finding all rows by column value.""" + # Test finding an actual value. + indices = gridview_util.find_all_row_indices_by_value(self.table, "TXTU2", "Gebyr") + self.assertGreater(len(indices), 0) + + # Test NOT finding a wrong value. + indices = gridview_util.find_all_row_indices_by_value(self.table, "TXTU2", "Foo") + self.assertEqual(len(indices), 0) + + # Test error on wrong column name. + with self.assertRaises(ValueError): + gridview_util.find_all_row_indices_by_value(self.table, "Foo", "Bar") diff --git a/tests/test_sap/test_multi_session.py b/tests/test_sap/test_multi_session.py new file mode 100644 index 0000000..5e89358 --- /dev/null +++ b/tests/test_sap/test_multi_session.py @@ -0,0 +1,67 @@ +"""Tests relating to the module SAP.multi_session.""" + +import unittest +import os +import threading +from itk_dev_shared_components.sap import sap_login, multi_session, opret_kundekontakt + +class TestMultiSession(unittest.TestCase): + """Tests relating to the module SAP.multi_session.""" + + def setUp(self): + sap_login.kill_sap() + user, password = os.environ['SAP Login'].split(';') + sap_login.login_using_cli(user, password) + + def tearDown(self): + sap_login.kill_sap() + + def test_spawn_sessions(self): + """Test spawning of multiple sessions. + It should only be possible to spawn between 1-6 sessions. + Also test getting already open sessions. + """ + with self.assertRaises(ValueError): + multi_session.spawn_sessions(0) + + with self.assertRaises(ValueError): + multi_session.spawn_sessions(7) + + sessions = multi_session.spawn_sessions(1) + self.assertEqual(len(sessions), 1) + + sessions = multi_session.spawn_sessions(6) + self.assertEqual(len(sessions), 6) + + sessions = multi_session.get_all_sap_sessions() + self.assertEqual(len(sessions), 6) + + def test_run_batches(self): + """Test running a task in parallel in multiple sessions. + This also tests: + run_batch, run_with_session, ExThread + """ + + num_sessions = 6 + + multi_session.spawn_sessions(num_sessions) + + #Data + lock = threading.Lock() + + data = [ + ("25564617", None, 'Orientering', f"Hej {i}", lock) for i in range(12) + ] + + # Test with 12 cases + multi_session.run_batches(opret_kundekontakt.opret_kundekontakter, data, num_sessions) + + # Test with 5 cases + multi_session.run_batches(opret_kundekontakt.opret_kundekontakter, data[0:5], num_sessions) + + # Test with 1 case + multi_session.run_batches(opret_kundekontakt.opret_kundekontakter, data[0:1], num_sessions) + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_sap/test_opret_kundekontakt.py b/tests/test_sap/test_opret_kundekontakt.py new file mode 100644 index 0000000..78b13e3 --- /dev/null +++ b/tests/test_sap/test_opret_kundekontakt.py @@ -0,0 +1,36 @@ +"""Test relating to the module SAP.opret_kundekontakt.""" + +import unittest +import os +from itk_dev_shared_components.sap import sap_login, multi_session, opret_kundekontakt + +class TestOpretKundekontakt(unittest.TestCase): + """Test relating to the module SAP.opret_kundekontakt.""" + def setUp(self): + sap_login.kill_sap() + user, password = os.environ['SAP Login'].split(';') + sap_login.login_using_cli(user, password) + + def tearDown(self): + sap_login.kill_sap() + + + def test_opret_kundekontakt(self): + """Test the function opret_kundekontakter.""" + fp = "25564617" + aftaler = ("2544577", "1990437", "1473781") + + session = multi_session.get_all_sap_sessions()[0] + + # Test with 3 aftaler + opret_kundekontakt.opret_kundekontakter(session, fp, aftaler, 'Orientering', "Test 1") + + # Test with 1 aftale + opret_kundekontakt.opret_kundekontakter(session, fp, aftaler[0:1], 'Automatisk', "Test 2") + + # Test with 0 aftaler + opret_kundekontakt.opret_kundekontakter(session, fp, None, 'Returpost', "Test 3") + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_sap/test_sap_login.py b/tests/test_sap/test_sap_login.py new file mode 100644 index 0000000..4f4786d --- /dev/null +++ b/tests/test_sap/test_sap_login.py @@ -0,0 +1,56 @@ +"""Tests relating to the module SAP.sap_login.""" + +import os +import unittest +from tkinter import simpledialog + +from itk_dev_shared_components.sap import sap_login + +class TestSapLogin(unittest.TestCase): + """Tests relating to the module SAP.sap_login.""" + + @classmethod + def setUpClass(cls) -> None: + """Show popups that asks for username, password and new password + used in the following tests. + """ + cls.username, cls.password = os.environ['SAP Login'].split(';') + cls.new_password = simpledialog.askstring("Enter new password", "Enter the new password to be used in testing the change SAP password function.\nRemember to write down the new password!\nLeave empty to skip test_change_password.") + + def setUp(self) -> None: + sap_login.kill_sap() + + def tearDown(self) -> None: + sap_login.kill_sap() + + def test_login_with_cli(self): + """Test login using the SAP cli interface. + Username and password is found in a environment variable. + """ + sap_login.login_using_cli(self.username, self.password) + + sap_login.kill_sap() + + with self.assertRaises(ValueError): + sap_login.login_using_cli("Foo", "Bar") + + def test_change_password(self): + """Test the function change password. + Due to a limit in SAP you can only run this function once per day. + If no new_password is entered in the setup + """ + if not self.new_password: + raise unittest.SkipTest("Test not run because new_password was missing.") + + sap_login.change_password(self.username, self.password, self.new_password) + self.password = self.new_password + + with self.assertRaises(ValueError): + sap_login.change_password(self.username, "Foo", self.new_password) + + with self.assertRaises(ValueError): + sap_login.change_password(self.username, self.password, "") + + +if __name__ == '__main__': + unittest.main() diff --git a/tests/test_sap/test_tree_util.py b/tests/test_sap/test_tree_util.py new file mode 100644 index 0000000..eaadf33 --- /dev/null +++ b/tests/test_sap/test_tree_util.py @@ -0,0 +1,92 @@ +"""Tests relating to the module SAP.tree_util.""" + +import unittest +import os + +from itk_dev_shared_components.sap import tree_util, sap_login, multi_session + +class TestTreeUtil(unittest.TestCase): + """Tests relating to the module SAP.tree_util.""" + @classmethod + def setUpClass(cls): + sap_login.kill_sap() + navigate_to_test_page() + + @classmethod + def tearDownClass(cls): + sap_login.kill_sap() + + + def test_get_node_key_by_text(self): + """Test get_node_key_by_test. + Test that strict search and fuzzy search works + and throws errors on nonsense input. + """ + session = multi_session.get_all_sap_sessions()[0] + tree = session.findById("wnd[0]/shellcont/shell") + + result = tree_util.get_node_key_by_text(tree, "25564617") + self.assertEqual(result, "GP0000000001") + + result = tree_util.get_node_key_by_text(tree, "556461", True) + self.assertEqual(result, "GP0000000001") + + with self.assertRaises(ValueError): + tree_util.get_node_key_by_text(tree, "Foo Bar") + + with self.assertRaises(ValueError): + tree_util.get_node_key_by_text(tree, "Foo Bar", True) + + + def test_get_item_by_text(self): + """Test get_item_by_text. + Test that strict search and fuzzy search works + and throws errors on nonsense input. + """ + session = multi_session.get_all_sap_sessions()[0] + tree = session.findById("wnd[0]/shellcont/shell") + + result = tree_util.get_item_by_text(tree, "25564617") + self.assertEqual(result, ("GP0000000001", "Column1")) + + result = tree_util.get_item_by_text(tree, "556461", True) + self.assertEqual(result, ("GP0000000001", "Column1")) + + with self.assertRaises(ValueError): + tree_util.get_item_by_text(tree, "Foo Bar") + + with self.assertRaises(ValueError): + tree_util.get_item_by_text(tree, "Foo Bar", True) + + + def test_check_uncheck_all_check_boxes(self): + """Test check_all_check_boxes and uncheck_all_check_boxes.""" + # Open popup with tree containing many checkboxes. + session = multi_session.get_all_sap_sessions()[0] + session.findById("wnd[0]/shellcont/shell").nodeContextMenu("GP0000000001") + session.findById("wnd[0]/shellcont/shell").selectContextMenuItem("FLERE") + + # Test on tree with checkboxes + tree = session.findById("wnd[1]/usr/cntlCONTAINER_PSOBKEY/shellcont/shell/shellcont[1]/shell[1]") + tree_util.check_all_check_boxes(tree) + tree_util.uncheck_all_check_boxes(tree) + + session.findById("wnd[1]/usr/cntlCONTAINER_PSOBKEY/shellcont/shell/shellcont[1]/shell[0]").pressButton("CANCEL") + + # Test on tree without checkboxes + tree = session.findById("wnd[0]/shellcont/shell") + tree_util.check_all_check_boxes(tree) + tree_util.uncheck_all_check_boxes(tree) + + +def navigate_to_test_page(): + """Launch SAP and navigate to fmcacov on FP 25564617.""" + user, password = os.environ['SAP Login'].split(';') + sap_login.login_using_cli(user, password) + session = multi_session.get_all_sap_sessions()[0] + session.startTransaction("fmcacov") + session.findById("wnd[0]/usr/ctxtGPART_DYN").text = "25564617" + session.findById("wnd[0]").sendVKey(0) + +if __name__ == '__main__': + unittest.main()