From b2a194db3f0b64ad78d90b9b12816394b02866ab Mon Sep 17 00:00:00 2001 From: Felix Deimel Date: Tue, 24 May 2022 16:12:58 +0200 Subject: [PATCH] Updated 1Password Dynamic Folder sample to V2.0.4 Improvements: * Support for importing regular credentials instead of dynamic credentials (not recommended though!) --- Dynamic Folder/1Password/1Password (Python).rdfe | 16 +++++++++++++--- 1 file changed, 13 insertions(+), 3 deletions(-) diff --git a/Dynamic Folder/1Password/1Password (Python).rdfe b/Dynamic Folder/1Password/1Password (Python).rdfe index aafccb0..05292e8 100644 --- a/Dynamic Folder/1Password/1Password (Python).rdfe +++ b/Dynamic Folder/1Password/1Password (Python).rdfe @@ -5,7 +5,7 @@ "Type": "DynamicFolder", "Name": "1Password (Python)", "Description": "This Dynamic Folder sample allows you to import dynamic credentials from 1Password.", - "Notes": "\n\n\n

1Password Dynamic Folder sample

\n\n

Version: 2.0.3
\nAuthor: Royal Apps

\n\n

This Dynamic Folder sample allows you to import credentials from 1Password. The 1Password command-line tool is required and the path where it is installed must be configured in the "Custom Properties" section. Also, your 1Password login details must be provided in the "Login" section of the custom properties.

\n\n

Items are imported as Dynamic Credentials. This means that the username and password fields will remain empty after reloading the dynamic folder and only be requested when a connection is established that uses one of the credentials of this dynamic folder.

\n\n

By default, items of all vaults are imported. If you only want to retrieve items of a specific vault, you can configure the vault's name in the "Filter" section of the "Custom Properties".

\n\n

Requirements

\n\n\n\n

 

\n\n

Setup

\n\n\n", + "Notes": "\n\n\n

1Password Dynamic Folder sample

\n\n

Version: 2.0.4
\nAuthor: Royal Apps

\n\n

This Dynamic Folder sample allows you to import credentials from 1Password. The 1Password command-line tool is required and the path where it is installed must be configured in the "Custom Properties" section. Also, your 1Password login details must be provided in the "Login" section of the custom properties.

\n\n

Items are imported as Dynamic Credentials. This means that the username and password fields will remain empty after reloading the dynamic folder and only be requested when a connection is established that uses one of the credentials of this dynamic folder. While you can instruct the script to import regular credentials (by setting the "Dynamic Credentials" setting in the "Custom Properties" section to "No", it's not(!) recommended to do so as it has many downsides. For instance, it will take much longer to retrieve all items because a web request to 1Password's web service will need to be made for each individual item. 1Password also enforces API rate limits, so if you have a lot of items, you might not be able to retrieve all of them before hitting the rate limit.

\n\n

By default, items of all vaults are imported. If you only want to retrieve items of a specific vault, you can configure the vault's name in the "Filter" section of the "Custom Properties".

\n\n

Requirements

\n\n\n\n

 

\n\n

Setup

\n\n\n", "CustomProperties": [ { "Name": "Command Line Tool Configuration", @@ -56,12 +56,22 @@ "Name": "Vault", "Type": "Text", "Value": "" + }, + { + "Name": "Configuration", + "Type": "Header", + "Value": "" + }, + { + "Name": "Dynamic Credentials", + "Type": "YesNo", + "Value": "True" } ], - "Script": "from __future__ import print_function\nfrom functools import partial\nfrom sys import platform as _platform\nfrom subprocess import Popen, PIPE\n\nimport tempfile\nimport sys\nimport json\nimport subprocess\nimport os\nimport base64\n\nop_path_windows = r\"$CustomProperty.OPPathWindows$\"\nop_path_macOS = r\"$CustomProperty.OPPathmacOS$\"\nsign_in_address = r\"$CustomProperty.SignInAddress$\"\nemail_address = r\"$CustomProperty.EmailAddress$\"\nsecret_key = r\"$CustomProperty.SecretKey$\"\nmaster_password = r\"$CustomProperty.MasterPassword$\"\nfilter_vault = r\"$CustomProperty.Vault$\"\n# item_id = r\"$DynamicCredential.EffectiveID$\"\n\nclass RoyalUtils:\n @staticmethod\n def is_macOS():\n plat = _platform.lower()\n\n return plat.startswith(\"darwin\")\n\n @staticmethod\n def get_last_line(the_string: str):\n stripped_str = the_string.strip()\n\n if \"\\n\" in stripped_str:\n lines = stripped_str.splitlines()\n stripped_str = lines[len(lines) - 1]\n\n return stripped_str\n\n @staticmethod\n def random_uuid():\n uuid = base64.b32encode(os.urandom(16)).decode().lower().rstrip(\"=\")\n\n return uuid\n\n @staticmethod\n def exit_with_error(message, exception=None):\n printError = partial(print, file=sys.stderr) # python2 compatibility\n\n exception_message = str(exception) if exception else \"N/A\"\n\n full_message = message + exception_message\n\n printError(full_message)\n sys.exit(1)\n\n @staticmethod\n def to_json(obj, pretty=False):\n return json.dumps(obj, indent=4) if pretty else json.dumps(obj)\n\n @staticmethod\n def decode_to_utf8_string(potential_bytes):\n if isinstance(potential_bytes, str):\n return potential_bytes\n else:\n return potential_bytes.decode(\"utf-8\")\n\nif RoyalUtils.is_macOS():\n import pexpect\nelse:\n import wexpect\n\nclass RoyalInputPrompt:\n @staticmethod\n def show(title: str, message: str, defaultValue: str):\n if RoyalUtils.is_macOS(): # macOS\n return RoyalInputPrompt._show_macOS(title, message, defaultValue)\n else: # Windows\n return RoyalInputPrompt._show_windows(title, message, defaultValue)\n\n @staticmethod\n def _escape_string_quotes_js(target_string: str):\n escaped_string = target_string.replace('\"', '\\\\\"')\n\n return escaped_string\n\n @staticmethod\n def _escape_string_quotes_vbs(target_string: str):\n escaped_string = target_string.replace('\"', '\\\"\\\"')\n\n return escaped_string\n\n @staticmethod\n def _show_macOS(title: str, message: str, defaultValue: str):\n script = f\"\"\"\n function showInputPrompt(title, message, defaultValue) {{\n let app = Application.currentApplication();\n app.includeStandardAdditions = true;\n \n var value = \"\";\n \n try {{\n let response = app.displayDialog(message, {{\n withTitle: title,\n defaultAnswer: defaultValue,\n withIcon: \"note\",\n buttons: [ \"Cancel\", \"OK\" ],\n defaultButton: \"OK\",\n cancelButton: \"Cancel\"\n }});\n \n if (response.buttonReturned == \"OK\") {{\n value = response.textReturned;\n }}\n }} catch {{ }}\n \n return value;\n }}\n\n function run(argv) {{\n let value = showInputPrompt(\"{RoyalInputPrompt._escape_string_quotes_js(title)}\", \"{RoyalInputPrompt._escape_string_quotes_js(message)}\", \"{RoyalInputPrompt._escape_string_quotes_js(defaultValue)}\");\n \n return value;\n }}\"\"\"\n\n encoding = \"utf-8\"\n\n cmd = [ \"osascript\", \"-l\", \"JavaScript\", \"-\" ]\n\n proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)\n stdout, _ = proc.communicate(script.encode(encoding))\n\n return_value = RoyalUtils.decode_to_utf8_string(stdout)\n\n return return_value\n\n @staticmethod\n def _show_windows(title: str, message: str, defaultValue: str):\n return_value = \"\"\n\n script = f\"\"\"\n WScript.Echo InputBox(\"{RoyalInputPrompt._escape_string_quotes_vbs(message)}\", \"{RoyalInputPrompt._escape_string_quotes_vbs(title)}\", \"{RoyalInputPrompt._escape_string_quotes_vbs(defaultValue)}\")\n \"\"\"\n\n encoding = \"utf-8\"\n\n temp_file = tempfile.NamedTemporaryFile(suffix=\".vbs\", mode=\"w\", encoding=encoding, delete=False)\n temp_file_path = temp_file.name\n\n temp_file.write(script)\n temp_file.flush()\n\n temp_file.close()\n\n try:\n cmd = [ \"cscript.exe\", \"/Nologo\", temp_file_path ]\n\n proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)\n stdout, _ = proc.communicate()\n\n return_value = RoyalUtils.decode_to_utf8_string(stdout)\n except:\n pass\n finally:\n if os.path.exists(temp_file_path):\n os.remove(temp_file_path)\n\n return return_value\n\nclass OnePassword:\n config_path = \"\"\n config_file_path = \"\"\n\n op_path = \"\"\n session_token = \"\"\n account_shorthand = \"\"\n\n op_cli_version = \"\"\n is_op_cli_v2 = False\n\n unknown_error_string = \"An unknown error occurred.\"\n\n def __init__(self, op_path):\n self.config_path = os.path.expanduser(\"~/.config/op\")\n self.config_file_path = os.path.join(self.config_path, \"config\")\n\n self.op_path = op_path\n\n op_cli_version = \"1\"\n\n try:\n op_cli_version = self.get_cli_version()\n except:\n pass\n\n if not op_cli_version:\n op_cli_version = \"1\"\n\n self.op_cli_version = op_cli_version\n self.is_op_cli_v2 = op_cli_version.startswith(\"2\")\n \n def get_cli_version(self):\n cmd_get_version = [ self.op_path, \"--version\" ]\n op = subprocess.Popen(cmd_get_version, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n (output, err) = op.communicate()\n exit_code = op.wait()\n\n success = exit_code == 0\n\n op_cli_version = \"\"\n\n if success:\n op_cli_version = RoyalUtils.decode_to_utf8_string(output)\n op_cli_version = RoyalUtils.get_last_line(op_cli_version)\n else:\n if not err:\n err = self.unknown_error_string\n else:\n err = RoyalUtils.decode_to_utf8_string(err)\n\n raise Exception(err)\n \n return op_cli_version\n \n def get_device_id(self):\n device_id = os.environ.get(\"OP_DEVICE\")\n\n return device_id\n \n def set_device_id(self, device_id):\n os.environ[\"OP_DEVICE\"] = device_id\n\n if not os.path.exists(self.config_file_path):\n config_json = RoyalUtils.to_json({ \"device\": device_id }, True)\n\n try:\n file = open(self.config_file_path, \"w\")\n\n file.write(config_json)\n file.close()\n\n if RoyalUtils.is_macOS:\n os.chmod(self.config_file_path, 0o600)\n \n return True\n except:\n return False\n \n return True\n \n def get_account_shorthand(self, sign_in_address, email_address):\n shorthand = sign_in_address.replace(\".\", \"_\") + \"__\" + email_address.replace(\".\", \"_\").replace(\"@\", \"_\").replace(\"+\", \"_\")\n\n return shorthand\n\n def sign_out(self):\n cmd_signout = [ ]\n\n if self.account_shorthand:\n cmd_signout = [\n self.op_path,\n \"signout\",\n \"--account\", self.account_shorthand\n ]\n else:\n cmd_signout = [\n self.op_path,\n \"signout\"\n ]\n\n op = subprocess.Popen(cmd_signout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n \n (output, err) = op.communicate()\n exit_code = op.wait()\n\n success = exit_code == 0\n\n if success:\n self.session_token = \"\"\n self.account_shorthand = \"\"\n else:\n if not err:\n err = self.unknown_error_string\n else:\n err = RoyalUtils.decode_to_utf8_string(err)\n\n raise Exception(err)\n \n def sign_in(self, sign_in_address, email_address, secret_key, master_password, second_try=False):\n shorthand = self.get_account_shorthand(sign_in_address, email_address)\n\n timeout = 10\n\n cmd = self.op_path\n\n args = [ ]\n\n if self.is_op_cli_v2:\n args = [\n \"account\", \"add\",\n \"--signin\",\n \"--address\", sign_in_address,\n \"--email\", email_address,\n \"--secret-key\", secret_key,\n \"--shorthand\", shorthand,\n \"--raw\"\n ]\n else:\n args = [\n \"signin\",\n sign_in_address,\n email_address,\n secret_key,\n \"--shorthand\", shorthand,\n \"--raw\"\n ]\n\n eof = None\n proc = None\n\n if RoyalUtils.is_macOS():\n eof = pexpect.EOF\n proc = pexpect.spawn(cmd, args)\n else:\n eof = wexpect.EOF\n proc = wexpect.spawn(cmd, args)\n\n exp_str_password = f\"Enter the password for.*:\"\n exp_str_error = \"\\[ERROR\\].*\"\n exp_str_biometric_unlock_enabled = \"Biometric unlock integration with the 1Password app is enabled*\"\n\n err = None\n\n try:\n exp_pw_prompt = proc.expect([ exp_str_password, exp_str_error, exp_str_biometric_unlock_enabled ], timeout=timeout)\n\n if exp_pw_prompt == 0: # Password Prompt\n proc.sendline(master_password)\n\n exp_str_mfa = \"Enter your.*authentication code.*:\"\n\n exp_mfa = proc.expect([ exp_str_mfa, exp_str_error, eof ], timeout=timeout)\n\n if exp_mfa == 0: # MFA Prompt\n mfa_code = RoyalInputPrompt.show(\"1Password Multi-Factor-Authentication\", f\"Enter multi-factor authentication code for {email_address}:\", \"\")\n mfa_code = mfa_code.replace(\"\\n\", \"\").replace(\"\\r\", \"\")\n\n if not mfa_code:\n raise Exception(\"No multi-factor code provided\")\n \n proc.sendline(mfa_code)\n exp_mfa_sent = proc.expect([ exp_str_error, eof ], timeout=timeout)\n\n if exp_mfa_sent == 0: # Error\n err = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n \n raise Exception(err)\n elif exp_mfa == 1: # Error\n err = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n \n raise Exception(err)\n\n output = proc.before\n\n if output:\n output = RoyalUtils.decode_to_utf8_string(output)\n \n proc.close()\n exit_code = proc.wait()\n \n success = exit_code == 0\n\n if success:\n self.session_token = RoyalUtils.get_last_line(output)\n self.account_shorthand = shorthand\n else:\n if not err:\n err = self.unknown_error_string\n\n raise Exception(err)\n elif exp_pw_prompt == 1: # Error\n err = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n\n if not second_try:\n if \"No saved device ID\" in err:\n export_start_str = \"export OP_DEVICE=\"\n device_id = \"\"\n\n if export_start_str in err:\n start_index = err.index(export_start_str) + len(export_start_str)\n device_id = err[start_index:]\n end_index = device_id.index(\"`\")\n device_id = device_id[:end_index]\n\n if not device_id or len(device_id) != 26:\n device_id = RoyalUtils.random_uuid()\n\n self.set_device_id(device_id)\n\n self.sign_in(sign_in_address, email_address, secret_key, master_password, second_try=True)\n\n return\n\n if not err:\n err = self.unknown_error_string\n\n raise Exception(err)\n elif exp_pw_prompt == 2: # Biometric unlock enabled\n err = \"Biometric unlock integration with the 1Password app is enabled. This feature is not supported by the 1Password Dynamic Folder Script. Please disable biometric unlock in the 1Password app.\"\n \n raise Exception(err)\n except Exception as ex:\n err = str(ex)\n\n if not err:\n err = self.unknown_error_string\n\n raise Exception(err)\n\n def get_vaults(self):\n cmd_list_vaults = [ ]\n\n if self.is_op_cli_v2:\n cmd_list_vaults = [\n self.op_path,\n \"vault\", \"list\",\n \"--format=json\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n else:\n cmd_list_vaults = [\n self.op_path,\n \"list\", \"vaults\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n \n op = subprocess.Popen(cmd_list_vaults, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n (output, err) = op.communicate()\n exit_code = op.wait()\n\n success = exit_code == 0\n\n if success:\n output = RoyalUtils.decode_to_utf8_string(output)\n\n vaults_str = \"\"\n\n if self.is_op_cli_v2:\n vaults_str = output\n else:\n vaults_str = RoyalUtils.get_last_line(output)\n \n vaults = json.loads(vaults_str)\n\n return vaults\n \n if not err:\n err = self.unknown_error_string\n else:\n err = RoyalUtils.decode_to_utf8_string(err)\n \n raise Exception(err)\n\n def get_items(self, vault):\n cmd_list_items = [ ]\n\n if self.is_op_cli_v2:\n cmd_list_items = [\n self.op_path,\n \"item\", \"list\",\n \"--format=json\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n else:\n cmd_list_items = [\n self.op_path,\n \"list\", \"items\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n \n if not vault:\n \tvault = \"\"\n \n vault = vault.strip()\n \n if vault:\n \tcmd_list_items.append(\"--vault\")\n \tcmd_list_items.append(vault)\n\n op = subprocess.Popen(cmd_list_items, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n (output, err) = op.communicate()\n exit_code = op.wait()\n\n success = exit_code == 0\n\n if success:\n output = RoyalUtils.decode_to_utf8_string(output)\n\n items_str = \"\"\n\n if self.is_op_cli_v2:\n items_str = output\n else:\n items_str = RoyalUtils.get_last_line(output)\n\n items = json.loads(items_str)\n \n return items\n else:\n if not err:\n err = self.unknown_error_string\n else:\n err = RoyalUtils.decode_to_utf8_string(err)\n \n raise Exception(err)\n\n def get_item_details(self, item_id):\n cmd_get_item = [ ]\n\n if self.is_op_cli_v2:\n cmd_get_item = [\n self.op_path,\n \"item\", \"get\", item_id,\n # \"--fields\", \"label=password,label=privatekey\",\n \"--format=JSON\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n else:\n cmd_get_item = [\n self.op_path,\n \"get\", \"item\", item_id,\n \"--fields\", \"username,password\",\n \"--format\", \"JSON\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n\n op = subprocess.Popen(cmd_get_item, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n (output, err) = op.communicate()\n exit_code = op.wait()\n\n success = exit_code == 0\n\n if success:\n output = RoyalUtils.decode_to_utf8_string(output)\n\n item_str = \"\"\n\n if self.is_op_cli_v2:\n item_str = output\n else:\n item_str = RoyalUtils.get_last_line(output)\n \n item = json.loads(item_str)\n \n return item\n else:\n if not err:\n err = self.unknown_error_string\n else:\n err = RoyalUtils.decode_to_utf8_string(err)\n \n raise Exception(err)\n\nclass Converter:\n @staticmethod\n def get_vault_name(vaults, vault_id, is_op_cli_v2):\n for vault in vaults:\n if is_op_cli_v2:\n if vault.get(\"id\", \"\") == vault_id:\n return vault.get(\"name\", \"\")\n else:\n if vault.get(\"uuid\", \"\") == vault_id:\n return vault.get(\"name\", \"\")\n \n return \"\"\n\n @staticmethod\n def convert_items(vaults, items, is_op_cli_v2):\n objects = []\n\n for item in items:\n overview = None\n\n if is_op_cli_v2:\n overview = item\n else:\n overview = item.get(\"overview\", None)\n\n if overview == None:\n continue\n\n id = \"\"\n\n if is_op_cli_v2:\n id = item.get(\"id\", \"\")\n else:\n id = item.get(\"uuid\", \"\")\n \n title = overview.get(\"title\", \"N/A\")\n url = overview.get(\"url\", \"\")\n\n vault_id = \"\"\n\n if is_op_cli_v2:\n item_vault = item.get(\"vault\", None)\n\n if item_vault:\n vault_id = item_vault.get(\"id\", \"\")\n else:\n vault_id = item.get(\"vaultUuid\", \"\")\n\n vault_name = Converter.get_vault_name(vaults, vault_id, is_op_cli_v2)\n\n cred = {\n \"Type\": \"DynamicCredential\",\n \"ID\": id,\n \"Name\": title,\n \"Path\": vault_name\n }\n\n if url != \"\":\n cred[\"URL\"] = url\n\n objects.append(cred)\n\n objects = sorted(objects, key = lambda i: (i[\"Path\"], i[\"Name\"]))\n\n store = {\n \"Objects\": objects\n }\n\n return store\n \n @staticmethod\n def convert_item(item_details, is_op_cli_v2):\n username = None\n password = None\n private_key = None\n\n if is_op_cli_v2:\n fields = item_details.get(\"fields\")\n\n if not fields:\n fields = item_details\n\n for field in fields:\n field_id = field.get(\"id\", None)\n field_label = field.get(\"label\", None)\n field_value = field.get(\"value\", None)\n\n if field_id == \"username\" or field_label == \"username\":\n username = field_value\n elif field_id == \"password\" or field_label == \"password\":\n password = field_value\n elif field_id == \"private_key\" or field_label == \"private key\":\n private_key = field_value\n else:\n username = item_details.get(\"username\", None)\n password = item_details.get(\"password\", None)\n\n cred = { }\n\n if username is not None:\n cred[\"Username\"] = username\n\n if password is not None:\n cred[\"Password\"] = password\n\n if private_key is not None:\n cred[\"KeyFileContent\"] = private_key\n\n return cred\n\nclass Coordinator:\n op_path = \"\"\n sign_in_address = \"\"\n email_address = \"\"\n secret_key = \"\"\n master_password = \"\"\n\n error_message_sign_in = \"Error while signing in: \"\n error_message_sign_out = \"Error while signing out: \"\n error_message_get_vaults = \"Error while getting vaults: \"\n error_message_get_items = \"Error while getting items: \"\n error_message_get_item_details = \"Error while getting item details: \"\n\n def __init__(self, op_path_windows, op_path_macOS, sign_in_address, email_address, secret_key, master_password):\n self.op_path = op_path_macOS if RoyalUtils.is_macOS() else op_path_windows\n self.sign_in_address = sign_in_address\n self.email_address = email_address\n self.secret_key = secret_key\n self.master_password = master_password\n \n def get_items(self, vault):\n op = OnePassword(self.op_path)\n\n try:\n op.sign_out()\n except Exception:\n pass\n\n try:\n op.sign_in(self.sign_in_address, self.email_address, self.secret_key, self.master_password)\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_sign_in, e)\n\n vaults = None\n\n try:\n vaults = op.get_vaults()\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_get_vaults, e)\n \n items = None\n\n try:\n items = op.get_items(vault)\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_get_items, e)\n\n try:\n op.sign_out()\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_sign_out, e)\n\n store = Converter.convert_items(vaults, items, op.is_op_cli_v2)\n store_json = RoyalUtils.to_json(store, True)\n\n print(store_json)\n \n def get_item_details(self, item_id):\n op = OnePassword(self.op_path)\n\n try:\n op.sign_out()\n except Exception:\n pass\n\n try:\n op.sign_in(self.sign_in_address, self.email_address, self.secret_key, self.master_password)\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_sign_in, e)\n\n item_details = None\n\n try:\n item_details = op.get_item_details(item_id)\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_get_item_details, e)\n\n try:\n op.sign_out()\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_sign_out, e)\n\n store = Converter.convert_item(item_details, op.is_op_cli_v2)\n store_json = RoyalUtils.to_json(store, True)\n\n print(store_json)\n\ncoordinator = Coordinator(op_path_windows, op_path_macOS, sign_in_address, email_address, secret_key, master_password)\ncoordinator.get_items(filter_vault)\n# coordinator.get_item_details(item_id)", + "Script": "from __future__ import print_function\nfrom functools import partial\nfrom sys import platform as _platform\nfrom subprocess import Popen, PIPE\n\nimport tempfile\nimport sys\nimport json\nimport subprocess\nimport os\nimport base64\n\nop_path_windows = r\"$CustomProperty.OPPathWindows$\"\nop_path_macOS = r\"$CustomProperty.OPPathmacOS$\"\nsign_in_address = r\"$CustomProperty.SignInAddress$\"\nemail_address = r\"$CustomProperty.EmailAddress$\"\nsecret_key = r\"$CustomProperty.SecretKey$\"\nmaster_password = r\"$CustomProperty.MasterPassword$\"\nfilter_vault = r\"$CustomProperty.Vault$\"\ncreate_dynamic_credentials = r\"$CustomProperty.DynamicCredentials$\".lower() != \"no\"\n# item_id = r\"$DynamicCredential.EffectiveID$\"\n\nclass RoyalUtils:\n\t@staticmethod\n\tdef is_macOS():\n\t\tplat = _platform.lower()\n\n\t\treturn plat.startswith(\"darwin\")\n\n\t@staticmethod\n\tdef get_last_line(the_string: str):\n\t\tstripped_str = the_string.strip()\n\n\t\tif \"\\n\" in stripped_str:\n\t\t\tlines = stripped_str.splitlines()\n\t\t\tstripped_str = lines[len(lines) - 1]\n\n\t\treturn stripped_str\n\n\t@staticmethod\n\tdef random_uuid():\n\t\tuuid = base64.b32encode(os.urandom(16)).decode().lower().rstrip(\"=\")\n\n\t\treturn uuid\n\n\t@staticmethod\n\tdef exit_with_error(message, exception=None):\n\t\tprintError = partial(print, file=sys.stderr) # python2 compatibility\n\n\t\texception_message = str(exception) if exception else \"N/A\"\n\n\t\tfull_message = message + exception_message\n\n\t\tprintError(full_message)\n\t\tsys.exit(1)\n\n\t@staticmethod\n\tdef to_json(obj, pretty=False):\n\t\treturn json.dumps(obj, indent=4) if pretty else json.dumps(obj)\n\n\t@staticmethod\n\tdef decode_to_utf8_string(potential_bytes):\n\t\tif isinstance(potential_bytes, str):\n\t\t\treturn potential_bytes\n\t\telse:\n\t\t\treturn potential_bytes.decode(\"utf-8\")\n\nif RoyalUtils.is_macOS():\n\timport pexpect\nelse:\n\timport wexpect\n\nclass RoyalInputPrompt:\n\t@staticmethod\n\tdef show(title: str, message: str, defaultValue: str):\n\t\tif RoyalUtils.is_macOS(): # macOS\n\t\t\treturn RoyalInputPrompt._show_macOS(title, message, defaultValue)\n\t\telse: # Windows\n\t\t\treturn RoyalInputPrompt._show_windows(title, message, defaultValue)\n\n\t@staticmethod\n\tdef _escape_string_quotes_js(target_string: str):\n\t\tescaped_string = target_string.replace('\"', '\\\\\"')\n\n\t\treturn escaped_string\n\n\t@staticmethod\n\tdef _escape_string_quotes_vbs(target_string: str):\n\t\tescaped_string = target_string.replace('\"', '\\\"\\\"')\n\n\t\treturn escaped_string\n\n\t@staticmethod\n\tdef _show_macOS(title: str, message: str, defaultValue: str):\n\t\tscript = f\"\"\"\n\t\tfunction showInputPrompt(title, message, defaultValue) {{\n\t\t\tlet app = Application.currentApplication();\n\t\t\tapp.includeStandardAdditions = true;\n\t\t\t\n\t\t\tvar value = \"\";\n\t\t\t\n\t\t\ttry {{\n\t\t\t\tlet response = app.displayDialog(message, {{\n\t\t\t\t\twithTitle: title,\n\t\t\t\t\tdefaultAnswer: defaultValue,\n\t\t\t\t\twithIcon: \"note\",\n\t\t\t\t\tbuttons: [ \"Cancel\", \"OK\" ],\n\t\t\t\t\tdefaultButton: \"OK\",\n\t\t\t\t\tcancelButton: \"Cancel\"\n\t\t\t\t}});\n\t\t\t\t\n\t\t\t\tif (response.buttonReturned == \"OK\") {{\n\t\t\t\t\tvalue = response.textReturned;\n\t\t\t\t}}\n\t\t\t}} catch {{ }}\n\t\t\t\n\t\t\treturn value;\n\t\t}}\n\n\t\tfunction run(argv) {{\n\t\t\tlet value = showInputPrompt(\"{RoyalInputPrompt._escape_string_quotes_js(title)}\", \"{RoyalInputPrompt._escape_string_quotes_js(message)}\", \"{RoyalInputPrompt._escape_string_quotes_js(defaultValue)}\");\n\t\t\t\n\t\t\treturn value;\n\t\t}}\"\"\"\n\n\t\tencoding = \"utf-8\"\n\n\t\tcmd = [ \"osascript\", \"-l\", \"JavaScript\", \"-\" ]\n\n\t\tproc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)\n\t\tstdout, _ = proc.communicate(script.encode(encoding))\n\n\t\treturn_value = RoyalUtils.decode_to_utf8_string(stdout)\n\n\t\treturn return_value\n\n\t@staticmethod\n\tdef _show_windows(title: str, message: str, defaultValue: str):\n\t\treturn_value = \"\"\n\n\t\tscript = f\"\"\"\n\t\tWScript.Echo InputBox(\"{RoyalInputPrompt._escape_string_quotes_vbs(message)}\", \"{RoyalInputPrompt._escape_string_quotes_vbs(title)}\", \"{RoyalInputPrompt._escape_string_quotes_vbs(defaultValue)}\")\n\t\t\"\"\"\n\n\t\tencoding = \"utf-8\"\n\n\t\ttemp_file = tempfile.NamedTemporaryFile(suffix=\".vbs\", mode=\"w\", encoding=encoding, delete=False)\n\t\ttemp_file_path = temp_file.name\n\n\t\ttemp_file.write(script)\n\t\ttemp_file.flush()\n\n\t\ttemp_file.close()\n\n\t\ttry:\n\t\t\tcmd = [ \"cscript.exe\", \"/Nologo\", temp_file_path ]\n\n\t\t\tproc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)\n\t\t\tstdout, _ = proc.communicate()\n\n\t\t\treturn_value = RoyalUtils.decode_to_utf8_string(stdout)\n\t\texcept:\n\t\t\tpass\n\t\tfinally:\n\t\t\tif os.path.exists(temp_file_path):\n\t\t\t\tos.remove(temp_file_path)\n\n\t\treturn return_value\n\nclass OnePassword:\n\tconfig_path = \"\"\n\tconfig_file_path = \"\"\n\n\top_path = \"\"\n\tsession_token = \"\"\n\taccount_shorthand = \"\"\n\n\top_cli_version = \"\"\n\tis_op_cli_v2 = False\n\n\tunknown_error_string = \"An unknown error occurred.\"\n\n\tdef __init__(self, op_path):\n\t\tself.config_path = os.path.expanduser(\"~/.config/op\")\n\t\tself.config_file_path = os.path.join(self.config_path, \"config\")\n\n\t\tself.op_path = op_path\n\n\t\top_cli_version = \"1\"\n\n\t\ttry:\n\t\t\top_cli_version = self.get_cli_version()\n\t\texcept:\n\t\t\tpass\n\n\t\tif not op_cli_version:\n\t\t\top_cli_version = \"1\"\n\n\t\tself.op_cli_version = op_cli_version\n\t\tself.is_op_cli_v2 = op_cli_version.startswith(\"2\")\n\t\n\tdef get_cli_version(self):\n\t\tcmd_get_version = [ self.op_path, \"--version\" ]\n\t\top = subprocess.Popen(cmd_get_version, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n\t\t(output, err) = op.communicate()\n\t\texit_code = op.wait()\n\n\t\tsuccess = exit_code == 0\n\n\t\top_cli_version = \"\"\n\n\t\tif success:\n\t\t\top_cli_version = RoyalUtils.decode_to_utf8_string(output)\n\t\t\top_cli_version = RoyalUtils.get_last_line(op_cli_version)\n\t\telse:\n\t\t\tif not err:\n\t\t\t\terr = self.unknown_error_string\n\t\t\telse:\n\t\t\t\terr = RoyalUtils.decode_to_utf8_string(err)\n\n\t\t\traise Exception(err)\n\t\t\n\t\treturn op_cli_version\n\t\n\tdef get_device_id(self):\n\t\tdevice_id = os.environ.get(\"OP_DEVICE\")\n\n\t\treturn device_id\n\t\n\tdef set_device_id(self, device_id):\n\t\tos.environ[\"OP_DEVICE\"] = device_id\n\n\t\tif not os.path.exists(self.config_file_path):\n\t\t\tconfig_json = RoyalUtils.to_json({ \"device\": device_id }, True)\n\n\t\t\ttry:\n\t\t\t\tfile = open(self.config_file_path, \"w\")\n\n\t\t\t\tfile.write(config_json)\n\t\t\t\tfile.close()\n\n\t\t\t\tif RoyalUtils.is_macOS:\n\t\t\t\t\tos.chmod(self.config_file_path, 0o600)\n\t\t\t\t\n\t\t\t\treturn True\n\t\t\texcept:\n\t\t\t\treturn False\n\t\t\n\t\treturn True\n\t\n\tdef get_account_shorthand(self, sign_in_address, email_address):\n\t\tshorthand = sign_in_address.replace(\".\", \"_\") + \"__\" + email_address.replace(\".\", \"_\").replace(\"@\", \"_\").replace(\"+\", \"_\")\n\n\t\treturn shorthand\n\n\tdef sign_out(self):\n\t\tcmd_signout = [ ]\n\n\t\tif self.account_shorthand:\n\t\t\tcmd_signout = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"signout\",\n\t\t\t\t\"--account\", self.account_shorthand\n\t\t\t]\n\t\telse:\n\t\t\tcmd_signout = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"signout\"\n\t\t\t]\n\n\t\top = subprocess.Popen(cmd_signout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\t\t\n\t\t(output, err) = op.communicate()\n\t\texit_code = op.wait()\n\n\t\tsuccess = exit_code == 0\n\n\t\tif success:\n\t\t\tself.session_token = \"\"\n\t\t\tself.account_shorthand = \"\"\n\t\telse:\n\t\t\tif not err:\n\t\t\t\terr = self.unknown_error_string\n\t\t\telse:\n\t\t\t\terr = RoyalUtils.decode_to_utf8_string(err)\n\n\t\t\traise Exception(err)\n\t\n\tdef sign_in(self, sign_in_address, email_address, secret_key, master_password, second_try=False):\n\t\tshorthand = self.get_account_shorthand(sign_in_address, email_address)\n\n\t\ttimeout = 10\n\n\t\tcmd = self.op_path\n\n\t\targs = [ ]\n\n\t\tif self.is_op_cli_v2:\n\t\t\targs = [\n\t\t\t\t\"account\", \"add\",\n\t\t\t\t\"--signin\",\n\t\t\t\t\"--address\", sign_in_address,\n\t\t\t\t\"--email\", email_address,\n\t\t\t\t\"--secret-key\", secret_key,\n\t\t\t\t\"--shorthand\", shorthand,\n\t\t\t\t\"--raw\"\n\t\t\t]\n\t\telse:\n\t\t\targs = [\n\t\t\t\t\"signin\",\n\t\t\t\tsign_in_address,\n\t\t\t\temail_address,\n\t\t\t\tsecret_key,\n\t\t\t\t\"--shorthand\", shorthand,\n\t\t\t\t\"--raw\"\n\t\t\t]\n\n\t\teof = None\n\t\tproc = None\n\n\t\tif RoyalUtils.is_macOS():\n\t\t\teof = pexpect.EOF\n\t\t\tproc = pexpect.spawn(cmd, args)\n\t\telse:\n\t\t\teof = wexpect.EOF\n\t\t\tproc = wexpect.spawn(cmd, args)\n\n\t\texp_str_password = f\"Enter the password for.*:\"\n\t\texp_str_error = \"\\[ERROR\\].*\"\n\t\texp_str_biometric_unlock_enabled = \"Biometric unlock integration with the 1Password app is enabled*\"\n\n\t\terr = None\n\n\t\ttry:\n\t\t\texp_pw_prompt = proc.expect([ exp_str_password, exp_str_error, exp_str_biometric_unlock_enabled ], timeout=timeout)\n\n\t\t\tif exp_pw_prompt == 0: # Password Prompt\n\t\t\t\tproc.sendline(master_password)\n\n\t\t\t\texp_str_mfa = \"Enter your.*authentication code.*:\"\n\n\t\t\t\texp_mfa = proc.expect([ exp_str_mfa, exp_str_error, eof ], timeout=timeout)\n\n\t\t\t\tif exp_mfa == 0: # MFA Prompt\n\t\t\t\t\tmfa_code = RoyalInputPrompt.show(\"1Password Multi-Factor-Authentication\", f\"Enter multi-factor authentication code for {email_address}:\", \"\")\n\t\t\t\t\tmfa_code = mfa_code.replace(\"\\n\", \"\").replace(\"\\r\", \"\")\n\n\t\t\t\t\tif not mfa_code:\n\t\t\t\t\t\traise Exception(\"No multi-factor code provided\")\n\t\t\t\t\t\n\t\t\t\t\tproc.sendline(mfa_code)\n\t\t\t\t\texp_mfa_sent = proc.expect([ exp_str_error, eof ], timeout=timeout)\n\n\t\t\t\t\tif exp_mfa_sent == 0: # Error\n\t\t\t\t\t\terr = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n\t\t\t\t\t\t\n\t\t\t\t\t\traise Exception(err)\n\t\t\t\telif exp_mfa == 1: # Error\n\t\t\t\t\terr = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n\t\t\t\t\t\n\t\t\t\t\traise Exception(err)\n\n\t\t\t\toutput = proc.before\n\n\t\t\t\tif output:\n\t\t\t\t\toutput = RoyalUtils.decode_to_utf8_string(output)\n\t\t\t\t\n\t\t\t\tproc.close()\n\t\t\t\texit_code = proc.wait()\n\t\t\t\t\n\t\t\t\tsuccess = exit_code == 0\n\n\t\t\t\tif success:\n\t\t\t\t\tself.session_token = RoyalUtils.get_last_line(output)\n\t\t\t\t\tself.account_shorthand = shorthand\n\t\t\t\telse:\n\t\t\t\t\tif not err:\n\t\t\t\t\t\terr = self.unknown_error_string\n\n\t\t\t\t\traise Exception(err)\n\t\t\telif exp_pw_prompt == 1: # Error\n\t\t\t\terr = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n\n\t\t\t\tif not second_try:\n\t\t\t\t\tif \"No saved device ID\" in err:\n\t\t\t\t\t\texport_start_str = \"export OP_DEVICE=\"\n\t\t\t\t\t\tdevice_id = \"\"\n\n\t\t\t\t\t\tif export_start_str in err:\n\t\t\t\t\t\t\tstart_index = err.index(export_start_str) + len(export_start_str)\n\t\t\t\t\t\t\tdevice_id = err[start_index:]\n\t\t\t\t\t\t\tend_index = device_id.index(\"`\")\n\t\t\t\t\t\t\tdevice_id = device_id[:end_index]\n\n\t\t\t\t\t\tif not device_id or len(device_id) != 26:\n\t\t\t\t\t\t\tdevice_id = RoyalUtils.random_uuid()\n\n\t\t\t\t\t\tself.set_device_id(device_id)\n\n\t\t\t\t\t\tself.sign_in(sign_in_address, email_address, secret_key, master_password, second_try=True)\n\n\t\t\t\t\t\treturn\n\n\t\t\t\tif not err:\n\t\t\t\t\terr = self.unknown_error_string\n\n\t\t\t\traise Exception(err)\n\t\t\telif exp_pw_prompt == 2: # Biometric unlock enabled\n\t\t\t\terr = \"Biometric unlock integration with the 1Password app is enabled. This feature is not supported by the 1Password Dynamic Folder Script. Please disable biometric unlock in the 1Password app.\"\n\t\t\t\t\n\t\t\t\traise Exception(err)\n\t\texcept Exception as ex:\n\t\t\terr = str(ex)\n\n\t\t\tif not err:\n\t\t\t\terr = self.unknown_error_string\n\n\t\t\traise Exception(err)\n\n\tdef get_vaults(self):\n\t\tcmd_list_vaults = [ ]\n\n\t\tif self.is_op_cli_v2:\n\t\t\tcmd_list_vaults = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"vault\", \"list\",\n\t\t\t\t\"--format=json\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\t\telse:\n\t\t\tcmd_list_vaults = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"list\", \"vaults\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\t\t\n\t\top = subprocess.Popen(cmd_list_vaults, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n\t\t(output, err) = op.communicate()\n\t\texit_code = op.wait()\n\n\t\tsuccess = exit_code == 0\n\n\t\tif success:\n\t\t\toutput = RoyalUtils.decode_to_utf8_string(output)\n\n\t\t\tvaults_str = \"\"\n\n\t\t\tif self.is_op_cli_v2:\n\t\t\t\tvaults_str = output\n\t\t\telse:\n\t\t\t\tvaults_str = RoyalUtils.get_last_line(output)\n\t\t\t\n\t\t\tvaults = json.loads(vaults_str)\n\n\t\t\treturn vaults\n\t\t\n\t\tif not err:\n\t\t\terr = self.unknown_error_string\n\t\telse:\n\t\t\terr = RoyalUtils.decode_to_utf8_string(err)\n\t\t\n\t\traise Exception(err)\n\n\tdef get_items(self, vault):\n\t\tcmd_list_items = [ ]\n\n\t\tif self.is_op_cli_v2:\n\t\t\tcmd_list_items = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"item\", \"list\",\n\t\t\t\t\"--format=json\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\t\telse:\n\t\t\tcmd_list_items = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"list\", \"items\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\t\t\n\t\tif not vault:\n\t\t\tvault = \"\"\n\t\t\n\t\tvault = vault.strip()\n\t\t\n\t\tif vault:\n\t\t\tcmd_list_items.append(\"--vault\")\n\t\t\tcmd_list_items.append(vault)\n\n\t\top = subprocess.Popen(cmd_list_items, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n\t\t(output, err) = op.communicate()\n\t\texit_code = op.wait()\n\n\t\tsuccess = exit_code == 0\n\n\t\tif success:\n\t\t\toutput = RoyalUtils.decode_to_utf8_string(output)\n\n\t\t\titems_str = \"\"\n\n\t\t\tif self.is_op_cli_v2:\n\t\t\t\titems_str = output\n\t\t\telse:\n\t\t\t\titems_str = RoyalUtils.get_last_line(output)\n\n\t\t\titems = json.loads(items_str)\n\t\t\n\t\t\treturn items\n\t\telse:\n\t\t\tif not err:\n\t\t\t\terr = self.unknown_error_string\n\t\t\telse:\n\t\t\t\terr = RoyalUtils.decode_to_utf8_string(err)\n\t\t\n\t\t\traise Exception(err)\n\n\tdef get_item_details(self, item_id):\n\t\tcmd_get_item = [ ]\n\n\t\tif self.is_op_cli_v2:\n\t\t\tcmd_get_item = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"item\", \"get\", item_id,\n\t\t\t\t# \"--fields\", \"label=password,label=privatekey\",\n\t\t\t\t\"--format=JSON\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\t\telse:\n\t\t\tcmd_get_item = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"get\", \"item\", item_id,\n\t\t\t\t\"--fields\", \"username,password\",\n\t\t\t\t\"--format\", \"JSON\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\n\t\top = subprocess.Popen(cmd_get_item, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n\t\t(output, err) = op.communicate()\n\t\texit_code = op.wait()\n\n\t\tsuccess = exit_code == 0\n\n\t\tif success:\n\t\t\toutput = RoyalUtils.decode_to_utf8_string(output)\n\n\t\t\titem_str = \"\"\n\n\t\t\tif self.is_op_cli_v2:\n\t\t\t\titem_str = output\n\t\t\telse:\n\t\t\t\titem_str = RoyalUtils.get_last_line(output)\n\t\t\t\n\t\t\titem = json.loads(item_str)\n\t\t\n\t\t\treturn item\n\t\telse:\n\t\t\tif not err:\n\t\t\t\terr = self.unknown_error_string\n\t\t\telse:\n\t\t\t\terr = RoyalUtils.decode_to_utf8_string(err)\n\t\t\t\n\t\t\traise Exception(err)\n\nclass Converter:\n\t@staticmethod\n\tdef get_vault_name(vaults, vault_id, is_op_cli_v2):\n\t\tfor vault in vaults:\n\t\t\tif is_op_cli_v2:\n\t\t\t\tif vault.get(\"id\", \"\") == vault_id:\n\t\t\t\t\treturn vault.get(\"name\", \"\")\n\t\t\telse:\n\t\t\t\tif vault.get(\"uuid\", \"\") == vault_id:\n\t\t\t\t\treturn vault.get(\"name\", \"\")\n\t\t\n\t\treturn \"\"\n\n\t@staticmethod\n\tdef convert_items(vaults, items, is_op_cli_v2):\n\t\tobjects = []\n\n\t\tfor item in items:\n\t\t\toverview = None\n\n\t\t\tif is_op_cli_v2:\n\t\t\t\toverview = item\n\t\t\telse:\n\t\t\t\toverview = item.get(\"overview\", None)\n\n\t\t\tif overview == None:\n\t\t\t\tcontinue\n\n\t\t\tid = \"\"\n\n\t\t\tif is_op_cli_v2:\n\t\t\t\tid = item.get(\"id\", \"\")\n\t\t\telse:\n\t\t\t\tid = item.get(\"uuid\", \"\")\n\t\t\t\n\t\t\ttitle = overview.get(\"title\", \"N/A\")\n\t\t\turl = overview.get(\"url\", \"\")\n\n\t\t\tvault_id = \"\"\n\n\t\t\tif is_op_cli_v2:\n\t\t\t\titem_vault = item.get(\"vault\", None)\n\n\t\t\t\tif item_vault:\n\t\t\t\t\tvault_id = item_vault.get(\"id\", \"\")\n\t\t\telse:\n\t\t\t\tvault_id = item.get(\"vaultUuid\", \"\")\n\n\t\t\tvault_name = Converter.get_vault_name(vaults, vault_id, is_op_cli_v2)\n\n\t\t\titem_details = item.get(\"__item_details\", None)\n\t\t\tconverted_item_details = None\n\n\t\t\tif item_details is not None:\n\t\t\t\tconverted_item_details = Converter.convert_item(item_details, is_op_cli_v2)\n\n\t\t\tcred_type = \"DynamicCredential\" if converted_item_details is None else \"Credential\"\n\n\t\t\tcred = {\n\t\t\t\t\"Type\": cred_type,\n\t\t\t\t\"ID\": id,\n\t\t\t\t\"Name\": title,\n\t\t\t\t\"Path\": vault_name\n\t\t\t}\n\n\t\t\tif url != \"\":\n\t\t\t\tcred[\"URL\"] = url\n\t\t\t\n\t\t\tif converted_item_details is not None:\n\t\t\t\tfor key in converted_item_details:\n\t\t\t\t\tcred[key] = converted_item_details[key]\n\n\t\t\tobjects.append(cred)\n\n\t\tobjects = sorted(objects, key = lambda i: (i[\"Path\"], i[\"Name\"]))\n\n\t\tstore = {\n\t\t\t\"Objects\": objects\n\t\t}\n\n\t\treturn store\n\t\n\t@staticmethod\n\tdef convert_item(item_details, is_op_cli_v2):\n\t\tusername = None\n\t\tpassword = None\n\t\tprivate_key = None\n\n\t\tif is_op_cli_v2:\n\t\t\tfields = item_details.get(\"fields\")\n\n\t\t\tif not fields:\n\t\t\t\tfields = item_details\n\n\t\t\tfor field in fields:\n\t\t\t\tfield_id = field.get(\"id\", None)\n\t\t\t\tfield_label = field.get(\"label\", None)\n\t\t\t\tfield_value = field.get(\"value\", None)\n\n\t\t\t\tif field_id == \"username\" or field_label == \"username\":\n\t\t\t\t\tusername = field_value\n\t\t\t\telif field_id == \"password\" or field_label == \"password\":\n\t\t\t\t\tpassword = field_value\n\t\t\t\telif field_id == \"private_key\" or field_label == \"private key\":\n\t\t\t\t\tprivate_key = field_value\n\t\telse:\n\t\t\tusername = item_details.get(\"username\", None)\n\t\t\tpassword = item_details.get(\"password\", None)\n\n\t\tcred = { }\n\n\t\tif username is not None:\n\t\t\tcred[\"Username\"] = username\n\n\t\tif password is not None:\n\t\t\tcred[\"Password\"] = password\n\n\t\tif private_key is not None:\n\t\t\tcred[\"KeyFileContent\"] = private_key\n\n\t\treturn cred\n\nclass Coordinator:\n\top_path = \"\"\n\tsign_in_address = \"\"\n\temail_address = \"\"\n\tsecret_key = \"\"\n\tmaster_password = \"\"\n\tcreate_dynamic_credentials = True\n\n\terror_message_sign_in = \"Error while signing in: \"\n\terror_message_sign_out = \"Error while signing out: \"\n\terror_message_get_vaults = \"Error while getting vaults: \"\n\terror_message_get_items = \"Error while getting items: \"\n\terror_message_get_item_details = \"Error while getting item details: \"\n\n\tdef __init__(self, op_path_windows, op_path_macOS, sign_in_address, email_address, secret_key, master_password, create_dynamic_credentials):\n\t\tself.op_path = op_path_macOS if RoyalUtils.is_macOS() else op_path_windows\n\t\tself.sign_in_address = sign_in_address\n\t\tself.email_address = email_address\n\t\tself.secret_key = secret_key\n\t\tself.master_password = master_password\n\t\tself.create_dynamic_credentials = create_dynamic_credentials\n\t\t\n\tdef get_items(self, vault):\n\t\top = OnePassword(self.op_path)\n\n\t\ttry:\n\t\t\top.sign_out()\n\t\texcept Exception:\n\t\t\tpass\n\n\t\ttry:\n\t\t\top.sign_in(self.sign_in_address, self.email_address, self.secret_key, self.master_password)\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_sign_in, e)\n\n\t\tvaults = None\n\n\t\ttry:\n\t\t\tvaults = op.get_vaults()\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_get_vaults, e)\n\t\t\n\t\titems = None\n\n\t\ttry:\n\t\t\titems = op.get_items(vault)\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_get_items, e)\n\t\t\n\t\titems_details = [ ]\n\n\t\tif not self.create_dynamic_credentials:\n\t\t\tfor item in items:\n\t\t\t\tid = \"\"\n\n\t\t\t\tif op.is_op_cli_v2:\n\t\t\t\t\tid = item.get(\"id\", \"\")\n\t\t\t\telse:\n\t\t\t\t\tid = item.get(\"uuid\", \"\")\n\t\t\t\t\n\t\t\t\titem_details = op.get_item_details(id)\n\n\t\t\t\titem[\"__item_details\"] = item_details\n\n\t\ttry:\n\t\t\top.sign_out()\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_sign_out, e)\n\n\t\tstore = Converter.convert_items(vaults, items, op.is_op_cli_v2)\n\t\tstore_json = RoyalUtils.to_json(store, True)\n\n\t\tprint(store_json)\n\t\n\tdef get_item_details(self, item_id):\n\t\top = OnePassword(self.op_path)\n\n\t\ttry:\n\t\t\top.sign_out()\n\t\texcept Exception:\n\t\t\tpass\n\n\t\ttry:\n\t\t\top.sign_in(self.sign_in_address, self.email_address, self.secret_key, self.master_password)\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_sign_in, e)\n\n\t\titem_details = None\n\n\t\ttry:\n\t\t\titem_details = op.get_item_details(item_id)\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_get_item_details, e)\n\n\t\ttry:\n\t\t\top.sign_out()\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_sign_out, e)\n\n\t\tstore = Converter.convert_item(item_details, op.is_op_cli_v2)\n\t\tstore_json = RoyalUtils.to_json(store, True)\n\n\t\tprint(store_json)\n\ncoordinator = Coordinator(op_path_windows, op_path_macOS, sign_in_address, email_address, secret_key, master_password, create_dynamic_credentials)\ncoordinator.get_items(filter_vault)\n# coordinator.get_item_details(item_id)", "ScriptInterpreter": "python", "DynamicCredentialScriptInterpreter": "python", - "DynamicCredentialScript": "from __future__ import print_function\nfrom functools import partial\nfrom sys import platform as _platform\nfrom subprocess import Popen, PIPE\n\nimport tempfile\nimport sys\nimport json\nimport subprocess\nimport os\nimport base64\n\nop_path_windows = r\"$CustomProperty.OPPathWindows$\"\nop_path_macOS = r\"$CustomProperty.OPPathmacOS$\"\nsign_in_address = r\"$CustomProperty.SignInAddress$\"\nemail_address = r\"$CustomProperty.EmailAddress$\"\nsecret_key = r\"$CustomProperty.SecretKey$\"\nmaster_password = r\"$CustomProperty.MasterPassword$\"\nfilter_vault = r\"$CustomProperty.Vault$\"\nitem_id = r\"$DynamicCredential.EffectiveID$\"\n\nclass RoyalUtils:\n @staticmethod\n def is_macOS():\n plat = _platform.lower()\n\n return plat.startswith(\"darwin\")\n\n @staticmethod\n def get_last_line(the_string: str):\n stripped_str = the_string.strip()\n\n if \"\\n\" in stripped_str:\n lines = stripped_str.splitlines()\n stripped_str = lines[len(lines) - 1]\n\n return stripped_str\n\n @staticmethod\n def random_uuid():\n uuid = base64.b32encode(os.urandom(16)).decode().lower().rstrip(\"=\")\n\n return uuid\n\n @staticmethod\n def exit_with_error(message, exception=None):\n printError = partial(print, file=sys.stderr) # python2 compatibility\n\n exception_message = str(exception) if exception else \"N/A\"\n\n full_message = message + exception_message\n\n printError(full_message)\n sys.exit(1)\n\n @staticmethod\n def to_json(obj, pretty=False):\n return json.dumps(obj, indent=4) if pretty else json.dumps(obj)\n\n @staticmethod\n def decode_to_utf8_string(potential_bytes):\n if isinstance(potential_bytes, str):\n return potential_bytes\n else:\n return potential_bytes.decode(\"utf-8\")\n\nif RoyalUtils.is_macOS():\n import pexpect\nelse:\n import wexpect\n\nclass RoyalInputPrompt:\n @staticmethod\n def show(title: str, message: str, defaultValue: str):\n if RoyalUtils.is_macOS(): # macOS\n return RoyalInputPrompt._show_macOS(title, message, defaultValue)\n else: # Windows\n return RoyalInputPrompt._show_windows(title, message, defaultValue)\n\n @staticmethod\n def _escape_string_quotes_js(target_string: str):\n escaped_string = target_string.replace('\"', '\\\\\"')\n\n return escaped_string\n\n @staticmethod\n def _escape_string_quotes_vbs(target_string: str):\n escaped_string = target_string.replace('\"', '\\\"\\\"')\n\n return escaped_string\n\n @staticmethod\n def _show_macOS(title: str, message: str, defaultValue: str):\n script = f\"\"\"\n function showInputPrompt(title, message, defaultValue) {{\n let app = Application.currentApplication();\n app.includeStandardAdditions = true;\n \n var value = \"\";\n \n try {{\n let response = app.displayDialog(message, {{\n withTitle: title,\n defaultAnswer: defaultValue,\n withIcon: \"note\",\n buttons: [ \"Cancel\", \"OK\" ],\n defaultButton: \"OK\",\n cancelButton: \"Cancel\"\n }});\n \n if (response.buttonReturned == \"OK\") {{\n value = response.textReturned;\n }}\n }} catch {{ }}\n \n return value;\n }}\n\n function run(argv) {{\n let value = showInputPrompt(\"{RoyalInputPrompt._escape_string_quotes_js(title)}\", \"{RoyalInputPrompt._escape_string_quotes_js(message)}\", \"{RoyalInputPrompt._escape_string_quotes_js(defaultValue)}\");\n \n return value;\n }}\"\"\"\n\n encoding = \"utf-8\"\n\n cmd = [ \"osascript\", \"-l\", \"JavaScript\", \"-\" ]\n\n proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)\n stdout, _ = proc.communicate(script.encode(encoding))\n\n return_value = RoyalUtils.decode_to_utf8_string(stdout)\n\n return return_value\n\n @staticmethod\n def _show_windows(title: str, message: str, defaultValue: str):\n return_value = \"\"\n\n script = f\"\"\"\n WScript.Echo InputBox(\"{RoyalInputPrompt._escape_string_quotes_vbs(message)}\", \"{RoyalInputPrompt._escape_string_quotes_vbs(title)}\", \"{RoyalInputPrompt._escape_string_quotes_vbs(defaultValue)}\")\n \"\"\"\n\n encoding = \"utf-8\"\n\n temp_file = tempfile.NamedTemporaryFile(suffix=\".vbs\", mode=\"w\", encoding=encoding, delete=False)\n temp_file_path = temp_file.name\n\n temp_file.write(script)\n temp_file.flush()\n\n temp_file.close()\n\n try:\n cmd = [ \"cscript.exe\", \"/Nologo\", temp_file_path ]\n\n proc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)\n stdout, _ = proc.communicate()\n\n return_value = RoyalUtils.decode_to_utf8_string(stdout)\n except:\n pass\n finally:\n if os.path.exists(temp_file_path):\n os.remove(temp_file_path)\n\n return return_value\n\nclass OnePassword:\n config_path = \"\"\n config_file_path = \"\"\n\n op_path = \"\"\n session_token = \"\"\n account_shorthand = \"\"\n\n op_cli_version = \"\"\n is_op_cli_v2 = False\n\n unknown_error_string = \"An unknown error occurred.\"\n\n def __init__(self, op_path):\n self.config_path = os.path.expanduser(\"~/.config/op\")\n self.config_file_path = os.path.join(self.config_path, \"config\")\n\n self.op_path = op_path\n\n op_cli_version = \"1\"\n\n try:\n op_cli_version = self.get_cli_version()\n except:\n pass\n\n if not op_cli_version:\n op_cli_version = \"1\"\n\n self.op_cli_version = op_cli_version\n self.is_op_cli_v2 = op_cli_version.startswith(\"2\")\n \n def get_cli_version(self):\n cmd_get_version = [ self.op_path, \"--version\" ]\n op = subprocess.Popen(cmd_get_version, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n (output, err) = op.communicate()\n exit_code = op.wait()\n\n success = exit_code == 0\n\n op_cli_version = \"\"\n\n if success:\n op_cli_version = RoyalUtils.decode_to_utf8_string(output)\n op_cli_version = RoyalUtils.get_last_line(op_cli_version)\n else:\n if not err:\n err = self.unknown_error_string\n else:\n err = RoyalUtils.decode_to_utf8_string(err)\n\n raise Exception(err)\n \n return op_cli_version\n \n def get_device_id(self):\n device_id = os.environ.get(\"OP_DEVICE\")\n\n return device_id\n \n def set_device_id(self, device_id):\n os.environ[\"OP_DEVICE\"] = device_id\n\n if not os.path.exists(self.config_file_path):\n config_json = RoyalUtils.to_json({ \"device\": device_id }, True)\n\n try:\n file = open(self.config_file_path, \"w\")\n\n file.write(config_json)\n file.close()\n\n if RoyalUtils.is_macOS:\n os.chmod(self.config_file_path, 0o600)\n \n return True\n except:\n return False\n \n return True\n \n def get_account_shorthand(self, sign_in_address, email_address):\n shorthand = sign_in_address.replace(\".\", \"_\") + \"__\" + email_address.replace(\".\", \"_\").replace(\"@\", \"_\").replace(\"+\", \"_\")\n\n return shorthand\n\n def sign_out(self):\n cmd_signout = [ ]\n\n if self.account_shorthand:\n cmd_signout = [\n self.op_path,\n \"signout\",\n \"--account\", self.account_shorthand\n ]\n else:\n cmd_signout = [\n self.op_path,\n \"signout\"\n ]\n\n op = subprocess.Popen(cmd_signout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n \n (output, err) = op.communicate()\n exit_code = op.wait()\n\n success = exit_code == 0\n\n if success:\n self.session_token = \"\"\n self.account_shorthand = \"\"\n else:\n if not err:\n err = self.unknown_error_string\n else:\n err = RoyalUtils.decode_to_utf8_string(err)\n\n raise Exception(err)\n \n def sign_in(self, sign_in_address, email_address, secret_key, master_password, second_try=False):\n shorthand = self.get_account_shorthand(sign_in_address, email_address)\n\n timeout = 10\n\n cmd = self.op_path\n\n args = [ ]\n\n if self.is_op_cli_v2:\n args = [\n \"account\", \"add\",\n \"--signin\",\n \"--address\", sign_in_address,\n \"--email\", email_address,\n \"--secret-key\", secret_key,\n \"--shorthand\", shorthand,\n \"--raw\"\n ]\n else:\n args = [\n \"signin\",\n sign_in_address,\n email_address,\n secret_key,\n \"--shorthand\", shorthand,\n \"--raw\"\n ]\n\n eof = None\n proc = None\n\n if RoyalUtils.is_macOS():\n eof = pexpect.EOF\n proc = pexpect.spawn(cmd, args)\n else:\n eof = wexpect.EOF\n proc = wexpect.spawn(cmd, args)\n\n exp_str_password = f\"Enter the password for.*:\"\n exp_str_error = \"\\[ERROR\\].*\"\n exp_str_biometric_unlock_enabled = \"Biometric unlock integration with the 1Password app is enabled*\"\n\n err = None\n\n try:\n exp_pw_prompt = proc.expect([ exp_str_password, exp_str_error, exp_str_biometric_unlock_enabled ], timeout=timeout)\n\n if exp_pw_prompt == 0: # Password Prompt\n proc.sendline(master_password)\n\n exp_str_mfa = \"Enter your.*authentication code.*:\"\n\n exp_mfa = proc.expect([ exp_str_mfa, exp_str_error, eof ], timeout=timeout)\n\n if exp_mfa == 0: # MFA Prompt\n mfa_code = RoyalInputPrompt.show(\"1Password Multi-Factor-Authentication\", f\"Enter multi-factor authentication code for {email_address}:\", \"\")\n mfa_code = mfa_code.replace(\"\\n\", \"\").replace(\"\\r\", \"\")\n\n if not mfa_code:\n raise Exception(\"No multi-factor code provided\")\n \n proc.sendline(mfa_code)\n exp_mfa_sent = proc.expect([ exp_str_error, eof ], timeout=timeout)\n\n if exp_mfa_sent == 0: # Error\n err = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n \n raise Exception(err)\n elif exp_mfa == 1: # Error\n err = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n \n raise Exception(err)\n\n output = proc.before\n\n if output:\n output = RoyalUtils.decode_to_utf8_string(output)\n \n proc.close()\n exit_code = proc.wait()\n \n success = exit_code == 0\n\n if success:\n self.session_token = RoyalUtils.get_last_line(output)\n self.account_shorthand = shorthand\n else:\n if not err:\n err = self.unknown_error_string\n\n raise Exception(err)\n elif exp_pw_prompt == 1: # Error\n err = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n\n if not second_try:\n if \"No saved device ID\" in err:\n export_start_str = \"export OP_DEVICE=\"\n device_id = \"\"\n\n if export_start_str in err:\n start_index = err.index(export_start_str) + len(export_start_str)\n device_id = err[start_index:]\n end_index = device_id.index(\"`\")\n device_id = device_id[:end_index]\n\n if not device_id or len(device_id) != 26:\n device_id = RoyalUtils.random_uuid()\n\n self.set_device_id(device_id)\n\n self.sign_in(sign_in_address, email_address, secret_key, master_password, second_try=True)\n\n return\n\n if not err:\n err = self.unknown_error_string\n\n raise Exception(err)\n elif exp_pw_prompt == 2: # Biometric unlock enabled\n err = \"Biometric unlock integration with the 1Password app is enabled. This feature is not supported by the 1Password Dynamic Folder Script. Please disable biometric unlock in the 1Password app.\"\n \n raise Exception(err)\n except Exception as ex:\n err = str(ex)\n\n if not err:\n err = self.unknown_error_string\n\n raise Exception(err)\n\n def get_vaults(self):\n cmd_list_vaults = [ ]\n\n if self.is_op_cli_v2:\n cmd_list_vaults = [\n self.op_path,\n \"vault\", \"list\",\n \"--format=json\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n else:\n cmd_list_vaults = [\n self.op_path,\n \"list\", \"vaults\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n \n op = subprocess.Popen(cmd_list_vaults, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n (output, err) = op.communicate()\n exit_code = op.wait()\n\n success = exit_code == 0\n\n if success:\n output = RoyalUtils.decode_to_utf8_string(output)\n\n vaults_str = \"\"\n\n if self.is_op_cli_v2:\n vaults_str = output\n else:\n vaults_str = RoyalUtils.get_last_line(output)\n \n vaults = json.loads(vaults_str)\n\n return vaults\n \n if not err:\n err = self.unknown_error_string\n else:\n err = RoyalUtils.decode_to_utf8_string(err)\n \n raise Exception(err)\n\n def get_items(self, vault):\n cmd_list_items = [ ]\n\n if self.is_op_cli_v2:\n cmd_list_items = [\n self.op_path,\n \"item\", \"list\",\n \"--format=json\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n else:\n cmd_list_items = [\n self.op_path,\n \"list\", \"items\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n \n if not vault:\n \tvault = \"\"\n \n vault = vault.strip()\n \n if vault:\n \tcmd_list_items.append(\"--vault\")\n \tcmd_list_items.append(vault)\n\n op = subprocess.Popen(cmd_list_items, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n (output, err) = op.communicate()\n exit_code = op.wait()\n\n success = exit_code == 0\n\n if success:\n output = RoyalUtils.decode_to_utf8_string(output)\n\n items_str = \"\"\n\n if self.is_op_cli_v2:\n items_str = output\n else:\n items_str = RoyalUtils.get_last_line(output)\n\n items = json.loads(items_str)\n \n return items\n else:\n if not err:\n err = self.unknown_error_string\n else:\n err = RoyalUtils.decode_to_utf8_string(err)\n \n raise Exception(err)\n\n def get_item_details(self, item_id):\n cmd_get_item = [ ]\n\n if self.is_op_cli_v2:\n cmd_get_item = [\n self.op_path,\n \"item\", \"get\", item_id,\n # \"--fields\", \"label=password,label=privatekey\",\n \"--format=JSON\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n else:\n cmd_get_item = [\n self.op_path,\n \"get\", \"item\", item_id,\n \"--fields\", \"username,password\",\n \"--format\", \"JSON\",\n \"--account\", self.account_shorthand,\n \"--session\", self.session_token\n ]\n\n op = subprocess.Popen(cmd_get_item, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n (output, err) = op.communicate()\n exit_code = op.wait()\n\n success = exit_code == 0\n\n if success:\n output = RoyalUtils.decode_to_utf8_string(output)\n\n item_str = \"\"\n\n if self.is_op_cli_v2:\n item_str = output\n else:\n item_str = RoyalUtils.get_last_line(output)\n \n item = json.loads(item_str)\n \n return item\n else:\n if not err:\n err = self.unknown_error_string\n else:\n err = RoyalUtils.decode_to_utf8_string(err)\n \n raise Exception(err)\n\nclass Converter:\n @staticmethod\n def get_vault_name(vaults, vault_id, is_op_cli_v2):\n for vault in vaults:\n if is_op_cli_v2:\n if vault.get(\"id\", \"\") == vault_id:\n return vault.get(\"name\", \"\")\n else:\n if vault.get(\"uuid\", \"\") == vault_id:\n return vault.get(\"name\", \"\")\n \n return \"\"\n\n @staticmethod\n def convert_items(vaults, items, is_op_cli_v2):\n objects = []\n\n for item in items:\n overview = None\n\n if is_op_cli_v2:\n overview = item\n else:\n overview = item.get(\"overview\", None)\n\n if overview == None:\n continue\n\n id = \"\"\n\n if is_op_cli_v2:\n id = item.get(\"id\", \"\")\n else:\n id = item.get(\"uuid\", \"\")\n \n title = overview.get(\"title\", \"N/A\")\n url = overview.get(\"url\", \"\")\n\n vault_id = \"\"\n\n if is_op_cli_v2:\n item_vault = item.get(\"vault\", None)\n\n if item_vault:\n vault_id = item_vault.get(\"id\", \"\")\n else:\n vault_id = item.get(\"vaultUuid\", \"\")\n\n vault_name = Converter.get_vault_name(vaults, vault_id, is_op_cli_v2)\n\n cred = {\n \"Type\": \"DynamicCredential\",\n \"ID\": id,\n \"Name\": title,\n \"Path\": vault_name\n }\n\n if url != \"\":\n cred[\"URL\"] = url\n\n objects.append(cred)\n\n objects = sorted(objects, key = lambda i: (i[\"Path\"], i[\"Name\"]))\n\n store = {\n \"Objects\": objects\n }\n\n return store\n \n @staticmethod\n def convert_item(item_details, is_op_cli_v2):\n username = None\n password = None\n private_key = None\n\n if is_op_cli_v2:\n fields = item_details.get(\"fields\")\n\n if not fields:\n fields = item_details\n\n for field in fields:\n field_id = field.get(\"id\", None)\n field_label = field.get(\"label\", None)\n field_value = field.get(\"value\", None)\n\n if field_id == \"username\" or field_label == \"username\":\n username = field_value\n elif field_id == \"password\" or field_label == \"password\":\n password = field_value\n elif field_id == \"private_key\" or field_label == \"private key\":\n private_key = field_value\n else:\n username = item_details.get(\"username\", None)\n password = item_details.get(\"password\", None)\n\n cred = { }\n\n if username is not None:\n cred[\"Username\"] = username\n\n if password is not None:\n cred[\"Password\"] = password\n\n if private_key is not None:\n cred[\"KeyFileContent\"] = private_key\n\n return cred\n\nclass Coordinator:\n op_path = \"\"\n sign_in_address = \"\"\n email_address = \"\"\n secret_key = \"\"\n master_password = \"\"\n\n error_message_sign_in = \"Error while signing in: \"\n error_message_sign_out = \"Error while signing out: \"\n error_message_get_vaults = \"Error while getting vaults: \"\n error_message_get_items = \"Error while getting items: \"\n error_message_get_item_details = \"Error while getting item details: \"\n\n def __init__(self, op_path_windows, op_path_macOS, sign_in_address, email_address, secret_key, master_password):\n self.op_path = op_path_macOS if RoyalUtils.is_macOS() else op_path_windows\n self.sign_in_address = sign_in_address\n self.email_address = email_address\n self.secret_key = secret_key\n self.master_password = master_password\n \n def get_items(self, vault):\n op = OnePassword(self.op_path)\n\n try:\n op.sign_out()\n except Exception:\n pass\n\n try:\n op.sign_in(self.sign_in_address, self.email_address, self.secret_key, self.master_password)\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_sign_in, e)\n\n vaults = None\n\n try:\n vaults = op.get_vaults()\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_get_vaults, e)\n \n items = None\n\n try:\n items = op.get_items(vault)\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_get_items, e)\n\n try:\n op.sign_out()\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_sign_out, e)\n\n store = Converter.convert_items(vaults, items, op.is_op_cli_v2)\n store_json = RoyalUtils.to_json(store, True)\n\n print(store_json)\n \n def get_item_details(self, item_id):\n op = OnePassword(self.op_path)\n\n try:\n op.sign_out()\n except Exception:\n pass\n\n try:\n op.sign_in(self.sign_in_address, self.email_address, self.secret_key, self.master_password)\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_sign_in, e)\n\n item_details = None\n\n try:\n item_details = op.get_item_details(item_id)\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_get_item_details, e)\n\n try:\n op.sign_out()\n except Exception as e:\n RoyalUtils.exit_with_error(self.error_message_sign_out, e)\n\n store = Converter.convert_item(item_details, op.is_op_cli_v2)\n store_json = RoyalUtils.to_json(store, True)\n\n print(store_json)\n\ncoordinator = Coordinator(op_path_windows, op_path_macOS, sign_in_address, email_address, secret_key, master_password)\n# coordinator.get_items(filter_vault)\ncoordinator.get_item_details(item_id)" + "DynamicCredentialScript": "from __future__ import print_function\nfrom functools import partial\nfrom sys import platform as _platform\nfrom subprocess import Popen, PIPE\n\nimport tempfile\nimport sys\nimport json\nimport subprocess\nimport os\nimport base64\n\nop_path_windows = r\"$CustomProperty.OPPathWindows$\"\nop_path_macOS = r\"$CustomProperty.OPPathmacOS$\"\nsign_in_address = r\"$CustomProperty.SignInAddress$\"\nemail_address = r\"$CustomProperty.EmailAddress$\"\nsecret_key = r\"$CustomProperty.SecretKey$\"\nmaster_password = r\"$CustomProperty.MasterPassword$\"\nfilter_vault = r\"$CustomProperty.Vault$\"\ncreate_dynamic_credentials = r\"$CustomProperty.DynamicCredentials$\".lower() != \"no\"\nitem_id = r\"$DynamicCredential.EffectiveID$\"\n\nclass RoyalUtils:\n\t@staticmethod\n\tdef is_macOS():\n\t\tplat = _platform.lower()\n\n\t\treturn plat.startswith(\"darwin\")\n\n\t@staticmethod\n\tdef get_last_line(the_string: str):\n\t\tstripped_str = the_string.strip()\n\n\t\tif \"\\n\" in stripped_str:\n\t\t\tlines = stripped_str.splitlines()\n\t\t\tstripped_str = lines[len(lines) - 1]\n\n\t\treturn stripped_str\n\n\t@staticmethod\n\tdef random_uuid():\n\t\tuuid = base64.b32encode(os.urandom(16)).decode().lower().rstrip(\"=\")\n\n\t\treturn uuid\n\n\t@staticmethod\n\tdef exit_with_error(message, exception=None):\n\t\tprintError = partial(print, file=sys.stderr) # python2 compatibility\n\n\t\texception_message = str(exception) if exception else \"N/A\"\n\n\t\tfull_message = message + exception_message\n\n\t\tprintError(full_message)\n\t\tsys.exit(1)\n\n\t@staticmethod\n\tdef to_json(obj, pretty=False):\n\t\treturn json.dumps(obj, indent=4) if pretty else json.dumps(obj)\n\n\t@staticmethod\n\tdef decode_to_utf8_string(potential_bytes):\n\t\tif isinstance(potential_bytes, str):\n\t\t\treturn potential_bytes\n\t\telse:\n\t\t\treturn potential_bytes.decode(\"utf-8\")\n\nif RoyalUtils.is_macOS():\n\timport pexpect\nelse:\n\timport wexpect\n\nclass RoyalInputPrompt:\n\t@staticmethod\n\tdef show(title: str, message: str, defaultValue: str):\n\t\tif RoyalUtils.is_macOS(): # macOS\n\t\t\treturn RoyalInputPrompt._show_macOS(title, message, defaultValue)\n\t\telse: # Windows\n\t\t\treturn RoyalInputPrompt._show_windows(title, message, defaultValue)\n\n\t@staticmethod\n\tdef _escape_string_quotes_js(target_string: str):\n\t\tescaped_string = target_string.replace('\"', '\\\\\"')\n\n\t\treturn escaped_string\n\n\t@staticmethod\n\tdef _escape_string_quotes_vbs(target_string: str):\n\t\tescaped_string = target_string.replace('\"', '\\\"\\\"')\n\n\t\treturn escaped_string\n\n\t@staticmethod\n\tdef _show_macOS(title: str, message: str, defaultValue: str):\n\t\tscript = f\"\"\"\n\t\tfunction showInputPrompt(title, message, defaultValue) {{\n\t\t\tlet app = Application.currentApplication();\n\t\t\tapp.includeStandardAdditions = true;\n\t\t\t\n\t\t\tvar value = \"\";\n\t\t\t\n\t\t\ttry {{\n\t\t\t\tlet response = app.displayDialog(message, {{\n\t\t\t\t\twithTitle: title,\n\t\t\t\t\tdefaultAnswer: defaultValue,\n\t\t\t\t\twithIcon: \"note\",\n\t\t\t\t\tbuttons: [ \"Cancel\", \"OK\" ],\n\t\t\t\t\tdefaultButton: \"OK\",\n\t\t\t\t\tcancelButton: \"Cancel\"\n\t\t\t\t}});\n\t\t\t\t\n\t\t\t\tif (response.buttonReturned == \"OK\") {{\n\t\t\t\t\tvalue = response.textReturned;\n\t\t\t\t}}\n\t\t\t}} catch {{ }}\n\t\t\t\n\t\t\treturn value;\n\t\t}}\n\n\t\tfunction run(argv) {{\n\t\t\tlet value = showInputPrompt(\"{RoyalInputPrompt._escape_string_quotes_js(title)}\", \"{RoyalInputPrompt._escape_string_quotes_js(message)}\", \"{RoyalInputPrompt._escape_string_quotes_js(defaultValue)}\");\n\t\t\t\n\t\t\treturn value;\n\t\t}}\"\"\"\n\n\t\tencoding = \"utf-8\"\n\n\t\tcmd = [ \"osascript\", \"-l\", \"JavaScript\", \"-\" ]\n\n\t\tproc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)\n\t\tstdout, _ = proc.communicate(script.encode(encoding))\n\n\t\treturn_value = RoyalUtils.decode_to_utf8_string(stdout)\n\n\t\treturn return_value\n\n\t@staticmethod\n\tdef _show_windows(title: str, message: str, defaultValue: str):\n\t\treturn_value = \"\"\n\n\t\tscript = f\"\"\"\n\t\tWScript.Echo InputBox(\"{RoyalInputPrompt._escape_string_quotes_vbs(message)}\", \"{RoyalInputPrompt._escape_string_quotes_vbs(title)}\", \"{RoyalInputPrompt._escape_string_quotes_vbs(defaultValue)}\")\n\t\t\"\"\"\n\n\t\tencoding = \"utf-8\"\n\n\t\ttemp_file = tempfile.NamedTemporaryFile(suffix=\".vbs\", mode=\"w\", encoding=encoding, delete=False)\n\t\ttemp_file_path = temp_file.name\n\n\t\ttemp_file.write(script)\n\t\ttemp_file.flush()\n\n\t\ttemp_file.close()\n\n\t\ttry:\n\t\t\tcmd = [ \"cscript.exe\", \"/Nologo\", temp_file_path ]\n\n\t\t\tproc = Popen(cmd, stdin=PIPE, stdout=PIPE, stderr=PIPE)\n\t\t\tstdout, _ = proc.communicate()\n\n\t\t\treturn_value = RoyalUtils.decode_to_utf8_string(stdout)\n\t\texcept:\n\t\t\tpass\n\t\tfinally:\n\t\t\tif os.path.exists(temp_file_path):\n\t\t\t\tos.remove(temp_file_path)\n\n\t\treturn return_value\n\nclass OnePassword:\n\tconfig_path = \"\"\n\tconfig_file_path = \"\"\n\n\top_path = \"\"\n\tsession_token = \"\"\n\taccount_shorthand = \"\"\n\n\top_cli_version = \"\"\n\tis_op_cli_v2 = False\n\n\tunknown_error_string = \"An unknown error occurred.\"\n\n\tdef __init__(self, op_path):\n\t\tself.config_path = os.path.expanduser(\"~/.config/op\")\n\t\tself.config_file_path = os.path.join(self.config_path, \"config\")\n\n\t\tself.op_path = op_path\n\n\t\top_cli_version = \"1\"\n\n\t\ttry:\n\t\t\top_cli_version = self.get_cli_version()\n\t\texcept:\n\t\t\tpass\n\n\t\tif not op_cli_version:\n\t\t\top_cli_version = \"1\"\n\n\t\tself.op_cli_version = op_cli_version\n\t\tself.is_op_cli_v2 = op_cli_version.startswith(\"2\")\n\t\n\tdef get_cli_version(self):\n\t\tcmd_get_version = [ self.op_path, \"--version\" ]\n\t\top = subprocess.Popen(cmd_get_version, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n\t\t(output, err) = op.communicate()\n\t\texit_code = op.wait()\n\n\t\tsuccess = exit_code == 0\n\n\t\top_cli_version = \"\"\n\n\t\tif success:\n\t\t\top_cli_version = RoyalUtils.decode_to_utf8_string(output)\n\t\t\top_cli_version = RoyalUtils.get_last_line(op_cli_version)\n\t\telse:\n\t\t\tif not err:\n\t\t\t\terr = self.unknown_error_string\n\t\t\telse:\n\t\t\t\terr = RoyalUtils.decode_to_utf8_string(err)\n\n\t\t\traise Exception(err)\n\t\t\n\t\treturn op_cli_version\n\t\n\tdef get_device_id(self):\n\t\tdevice_id = os.environ.get(\"OP_DEVICE\")\n\n\t\treturn device_id\n\t\n\tdef set_device_id(self, device_id):\n\t\tos.environ[\"OP_DEVICE\"] = device_id\n\n\t\tif not os.path.exists(self.config_file_path):\n\t\t\tconfig_json = RoyalUtils.to_json({ \"device\": device_id }, True)\n\n\t\t\ttry:\n\t\t\t\tfile = open(self.config_file_path, \"w\")\n\n\t\t\t\tfile.write(config_json)\n\t\t\t\tfile.close()\n\n\t\t\t\tif RoyalUtils.is_macOS:\n\t\t\t\t\tos.chmod(self.config_file_path, 0o600)\n\t\t\t\t\n\t\t\t\treturn True\n\t\t\texcept:\n\t\t\t\treturn False\n\t\t\n\t\treturn True\n\t\n\tdef get_account_shorthand(self, sign_in_address, email_address):\n\t\tshorthand = sign_in_address.replace(\".\", \"_\") + \"__\" + email_address.replace(\".\", \"_\").replace(\"@\", \"_\").replace(\"+\", \"_\")\n\n\t\treturn shorthand\n\n\tdef sign_out(self):\n\t\tcmd_signout = [ ]\n\n\t\tif self.account_shorthand:\n\t\t\tcmd_signout = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"signout\",\n\t\t\t\t\"--account\", self.account_shorthand\n\t\t\t]\n\t\telse:\n\t\t\tcmd_signout = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"signout\"\n\t\t\t]\n\n\t\top = subprocess.Popen(cmd_signout, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\t\t\n\t\t(output, err) = op.communicate()\n\t\texit_code = op.wait()\n\n\t\tsuccess = exit_code == 0\n\n\t\tif success:\n\t\t\tself.session_token = \"\"\n\t\t\tself.account_shorthand = \"\"\n\t\telse:\n\t\t\tif not err:\n\t\t\t\terr = self.unknown_error_string\n\t\t\telse:\n\t\t\t\terr = RoyalUtils.decode_to_utf8_string(err)\n\n\t\t\traise Exception(err)\n\t\n\tdef sign_in(self, sign_in_address, email_address, secret_key, master_password, second_try=False):\n\t\tshorthand = self.get_account_shorthand(sign_in_address, email_address)\n\n\t\ttimeout = 10\n\n\t\tcmd = self.op_path\n\n\t\targs = [ ]\n\n\t\tif self.is_op_cli_v2:\n\t\t\targs = [\n\t\t\t\t\"account\", \"add\",\n\t\t\t\t\"--signin\",\n\t\t\t\t\"--address\", sign_in_address,\n\t\t\t\t\"--email\", email_address,\n\t\t\t\t\"--secret-key\", secret_key,\n\t\t\t\t\"--shorthand\", shorthand,\n\t\t\t\t\"--raw\"\n\t\t\t]\n\t\telse:\n\t\t\targs = [\n\t\t\t\t\"signin\",\n\t\t\t\tsign_in_address,\n\t\t\t\temail_address,\n\t\t\t\tsecret_key,\n\t\t\t\t\"--shorthand\", shorthand,\n\t\t\t\t\"--raw\"\n\t\t\t]\n\n\t\teof = None\n\t\tproc = None\n\n\t\tif RoyalUtils.is_macOS():\n\t\t\teof = pexpect.EOF\n\t\t\tproc = pexpect.spawn(cmd, args)\n\t\telse:\n\t\t\teof = wexpect.EOF\n\t\t\tproc = wexpect.spawn(cmd, args)\n\n\t\texp_str_password = f\"Enter the password for.*:\"\n\t\texp_str_error = \"\\[ERROR\\].*\"\n\t\texp_str_biometric_unlock_enabled = \"Biometric unlock integration with the 1Password app is enabled*\"\n\n\t\terr = None\n\n\t\ttry:\n\t\t\texp_pw_prompt = proc.expect([ exp_str_password, exp_str_error, exp_str_biometric_unlock_enabled ], timeout=timeout)\n\n\t\t\tif exp_pw_prompt == 0: # Password Prompt\n\t\t\t\tproc.sendline(master_password)\n\n\t\t\t\texp_str_mfa = \"Enter your.*authentication code.*:\"\n\n\t\t\t\texp_mfa = proc.expect([ exp_str_mfa, exp_str_error, eof ], timeout=timeout)\n\n\t\t\t\tif exp_mfa == 0: # MFA Prompt\n\t\t\t\t\tmfa_code = RoyalInputPrompt.show(\"1Password Multi-Factor-Authentication\", f\"Enter multi-factor authentication code for {email_address}:\", \"\")\n\t\t\t\t\tmfa_code = mfa_code.replace(\"\\n\", \"\").replace(\"\\r\", \"\")\n\n\t\t\t\t\tif not mfa_code:\n\t\t\t\t\t\traise Exception(\"No multi-factor code provided\")\n\t\t\t\t\t\n\t\t\t\t\tproc.sendline(mfa_code)\n\t\t\t\t\texp_mfa_sent = proc.expect([ exp_str_error, eof ], timeout=timeout)\n\n\t\t\t\t\tif exp_mfa_sent == 0: # Error\n\t\t\t\t\t\terr = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n\t\t\t\t\t\t\n\t\t\t\t\t\traise Exception(err)\n\t\t\t\telif exp_mfa == 1: # Error\n\t\t\t\t\terr = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n\t\t\t\t\t\n\t\t\t\t\traise Exception(err)\n\n\t\t\t\toutput = proc.before\n\n\t\t\t\tif output:\n\t\t\t\t\toutput = RoyalUtils.decode_to_utf8_string(output)\n\t\t\t\t\n\t\t\t\tproc.close()\n\t\t\t\texit_code = proc.wait()\n\t\t\t\t\n\t\t\t\tsuccess = exit_code == 0\n\n\t\t\t\tif success:\n\t\t\t\t\tself.session_token = RoyalUtils.get_last_line(output)\n\t\t\t\t\tself.account_shorthand = shorthand\n\t\t\t\telse:\n\t\t\t\t\tif not err:\n\t\t\t\t\t\terr = self.unknown_error_string\n\n\t\t\t\t\traise Exception(err)\n\t\t\telif exp_pw_prompt == 1: # Error\n\t\t\t\terr = RoyalUtils.get_last_line(RoyalUtils.decode_to_utf8_string(proc.match.string))\n\n\t\t\t\tif not second_try:\n\t\t\t\t\tif \"No saved device ID\" in err:\n\t\t\t\t\t\texport_start_str = \"export OP_DEVICE=\"\n\t\t\t\t\t\tdevice_id = \"\"\n\n\t\t\t\t\t\tif export_start_str in err:\n\t\t\t\t\t\t\tstart_index = err.index(export_start_str) + len(export_start_str)\n\t\t\t\t\t\t\tdevice_id = err[start_index:]\n\t\t\t\t\t\t\tend_index = device_id.index(\"`\")\n\t\t\t\t\t\t\tdevice_id = device_id[:end_index]\n\n\t\t\t\t\t\tif not device_id or len(device_id) != 26:\n\t\t\t\t\t\t\tdevice_id = RoyalUtils.random_uuid()\n\n\t\t\t\t\t\tself.set_device_id(device_id)\n\n\t\t\t\t\t\tself.sign_in(sign_in_address, email_address, secret_key, master_password, second_try=True)\n\n\t\t\t\t\t\treturn\n\n\t\t\t\tif not err:\n\t\t\t\t\terr = self.unknown_error_string\n\n\t\t\t\traise Exception(err)\n\t\t\telif exp_pw_prompt == 2: # Biometric unlock enabled\n\t\t\t\terr = \"Biometric unlock integration with the 1Password app is enabled. This feature is not supported by the 1Password Dynamic Folder Script. Please disable biometric unlock in the 1Password app.\"\n\t\t\t\t\n\t\t\t\traise Exception(err)\n\t\texcept Exception as ex:\n\t\t\terr = str(ex)\n\n\t\t\tif not err:\n\t\t\t\terr = self.unknown_error_string\n\n\t\t\traise Exception(err)\n\n\tdef get_vaults(self):\n\t\tcmd_list_vaults = [ ]\n\n\t\tif self.is_op_cli_v2:\n\t\t\tcmd_list_vaults = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"vault\", \"list\",\n\t\t\t\t\"--format=json\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\t\telse:\n\t\t\tcmd_list_vaults = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"list\", \"vaults\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\t\t\n\t\top = subprocess.Popen(cmd_list_vaults, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n\t\t(output, err) = op.communicate()\n\t\texit_code = op.wait()\n\n\t\tsuccess = exit_code == 0\n\n\t\tif success:\n\t\t\toutput = RoyalUtils.decode_to_utf8_string(output)\n\n\t\t\tvaults_str = \"\"\n\n\t\t\tif self.is_op_cli_v2:\n\t\t\t\tvaults_str = output\n\t\t\telse:\n\t\t\t\tvaults_str = RoyalUtils.get_last_line(output)\n\t\t\t\n\t\t\tvaults = json.loads(vaults_str)\n\n\t\t\treturn vaults\n\t\t\n\t\tif not err:\n\t\t\terr = self.unknown_error_string\n\t\telse:\n\t\t\terr = RoyalUtils.decode_to_utf8_string(err)\n\t\t\n\t\traise Exception(err)\n\n\tdef get_items(self, vault):\n\t\tcmd_list_items = [ ]\n\n\t\tif self.is_op_cli_v2:\n\t\t\tcmd_list_items = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"item\", \"list\",\n\t\t\t\t\"--format=json\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\t\telse:\n\t\t\tcmd_list_items = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"list\", \"items\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\t\t\n\t\tif not vault:\n\t\t\tvault = \"\"\n\t\t\n\t\tvault = vault.strip()\n\t\t\n\t\tif vault:\n\t\t\tcmd_list_items.append(\"--vault\")\n\t\t\tcmd_list_items.append(vault)\n\n\t\top = subprocess.Popen(cmd_list_items, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n\t\t(output, err) = op.communicate()\n\t\texit_code = op.wait()\n\n\t\tsuccess = exit_code == 0\n\n\t\tif success:\n\t\t\toutput = RoyalUtils.decode_to_utf8_string(output)\n\n\t\t\titems_str = \"\"\n\n\t\t\tif self.is_op_cli_v2:\n\t\t\t\titems_str = output\n\t\t\telse:\n\t\t\t\titems_str = RoyalUtils.get_last_line(output)\n\n\t\t\titems = json.loads(items_str)\n\t\t\n\t\t\treturn items\n\t\telse:\n\t\t\tif not err:\n\t\t\t\terr = self.unknown_error_string\n\t\t\telse:\n\t\t\t\terr = RoyalUtils.decode_to_utf8_string(err)\n\t\t\n\t\t\traise Exception(err)\n\n\tdef get_item_details(self, item_id):\n\t\tcmd_get_item = [ ]\n\n\t\tif self.is_op_cli_v2:\n\t\t\tcmd_get_item = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"item\", \"get\", item_id,\n\t\t\t\t# \"--fields\", \"label=password,label=privatekey\",\n\t\t\t\t\"--format=JSON\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\t\telse:\n\t\t\tcmd_get_item = [\n\t\t\t\tself.op_path,\n\t\t\t\t\"get\", \"item\", item_id,\n\t\t\t\t\"--fields\", \"username,password\",\n\t\t\t\t\"--format\", \"JSON\",\n\t\t\t\t\"--account\", self.account_shorthand,\n\t\t\t\t\"--session\", self.session_token\n\t\t\t]\n\n\t\top = subprocess.Popen(cmd_get_item, stdout=subprocess.PIPE, stderr=subprocess.PIPE)\n\n\t\t(output, err) = op.communicate()\n\t\texit_code = op.wait()\n\n\t\tsuccess = exit_code == 0\n\n\t\tif success:\n\t\t\toutput = RoyalUtils.decode_to_utf8_string(output)\n\n\t\t\titem_str = \"\"\n\n\t\t\tif self.is_op_cli_v2:\n\t\t\t\titem_str = output\n\t\t\telse:\n\t\t\t\titem_str = RoyalUtils.get_last_line(output)\n\t\t\t\n\t\t\titem = json.loads(item_str)\n\t\t\n\t\t\treturn item\n\t\telse:\n\t\t\tif not err:\n\t\t\t\terr = self.unknown_error_string\n\t\t\telse:\n\t\t\t\terr = RoyalUtils.decode_to_utf8_string(err)\n\t\t\t\n\t\t\traise Exception(err)\n\nclass Converter:\n\t@staticmethod\n\tdef get_vault_name(vaults, vault_id, is_op_cli_v2):\n\t\tfor vault in vaults:\n\t\t\tif is_op_cli_v2:\n\t\t\t\tif vault.get(\"id\", \"\") == vault_id:\n\t\t\t\t\treturn vault.get(\"name\", \"\")\n\t\t\telse:\n\t\t\t\tif vault.get(\"uuid\", \"\") == vault_id:\n\t\t\t\t\treturn vault.get(\"name\", \"\")\n\t\t\n\t\treturn \"\"\n\n\t@staticmethod\n\tdef convert_items(vaults, items, is_op_cli_v2):\n\t\tobjects = []\n\n\t\tfor item in items:\n\t\t\toverview = None\n\n\t\t\tif is_op_cli_v2:\n\t\t\t\toverview = item\n\t\t\telse:\n\t\t\t\toverview = item.get(\"overview\", None)\n\n\t\t\tif overview == None:\n\t\t\t\tcontinue\n\n\t\t\tid = \"\"\n\n\t\t\tif is_op_cli_v2:\n\t\t\t\tid = item.get(\"id\", \"\")\n\t\t\telse:\n\t\t\t\tid = item.get(\"uuid\", \"\")\n\t\t\t\n\t\t\ttitle = overview.get(\"title\", \"N/A\")\n\t\t\turl = overview.get(\"url\", \"\")\n\n\t\t\tvault_id = \"\"\n\n\t\t\tif is_op_cli_v2:\n\t\t\t\titem_vault = item.get(\"vault\", None)\n\n\t\t\t\tif item_vault:\n\t\t\t\t\tvault_id = item_vault.get(\"id\", \"\")\n\t\t\telse:\n\t\t\t\tvault_id = item.get(\"vaultUuid\", \"\")\n\n\t\t\tvault_name = Converter.get_vault_name(vaults, vault_id, is_op_cli_v2)\n\n\t\t\titem_details = item.get(\"__item_details\", None)\n\t\t\tconverted_item_details = None\n\n\t\t\tif item_details is not None:\n\t\t\t\tconverted_item_details = Converter.convert_item(item_details, is_op_cli_v2)\n\n\t\t\tcred_type = \"DynamicCredential\" if converted_item_details is None else \"Credential\"\n\n\t\t\tcred = {\n\t\t\t\t\"Type\": cred_type,\n\t\t\t\t\"ID\": id,\n\t\t\t\t\"Name\": title,\n\t\t\t\t\"Path\": vault_name\n\t\t\t}\n\n\t\t\tif url != \"\":\n\t\t\t\tcred[\"URL\"] = url\n\t\t\t\n\t\t\tif converted_item_details is not None:\n\t\t\t\tfor key in converted_item_details:\n\t\t\t\t\tcred[key] = converted_item_details[key]\n\n\t\t\tobjects.append(cred)\n\n\t\tobjects = sorted(objects, key = lambda i: (i[\"Path\"], i[\"Name\"]))\n\n\t\tstore = {\n\t\t\t\"Objects\": objects\n\t\t}\n\n\t\treturn store\n\t\n\t@staticmethod\n\tdef convert_item(item_details, is_op_cli_v2):\n\t\tusername = None\n\t\tpassword = None\n\t\tprivate_key = None\n\n\t\tif is_op_cli_v2:\n\t\t\tfields = item_details.get(\"fields\")\n\n\t\t\tif not fields:\n\t\t\t\tfields = item_details\n\n\t\t\tfor field in fields:\n\t\t\t\tfield_id = field.get(\"id\", None)\n\t\t\t\tfield_label = field.get(\"label\", None)\n\t\t\t\tfield_value = field.get(\"value\", None)\n\n\t\t\t\tif field_id == \"username\" or field_label == \"username\":\n\t\t\t\t\tusername = field_value\n\t\t\t\telif field_id == \"password\" or field_label == \"password\":\n\t\t\t\t\tpassword = field_value\n\t\t\t\telif field_id == \"private_key\" or field_label == \"private key\":\n\t\t\t\t\tprivate_key = field_value\n\t\telse:\n\t\t\tusername = item_details.get(\"username\", None)\n\t\t\tpassword = item_details.get(\"password\", None)\n\n\t\tcred = { }\n\n\t\tif username is not None:\n\t\t\tcred[\"Username\"] = username\n\n\t\tif password is not None:\n\t\t\tcred[\"Password\"] = password\n\n\t\tif private_key is not None:\n\t\t\tcred[\"KeyFileContent\"] = private_key\n\n\t\treturn cred\n\nclass Coordinator:\n\top_path = \"\"\n\tsign_in_address = \"\"\n\temail_address = \"\"\n\tsecret_key = \"\"\n\tmaster_password = \"\"\n\tcreate_dynamic_credentials = True\n\n\terror_message_sign_in = \"Error while signing in: \"\n\terror_message_sign_out = \"Error while signing out: \"\n\terror_message_get_vaults = \"Error while getting vaults: \"\n\terror_message_get_items = \"Error while getting items: \"\n\terror_message_get_item_details = \"Error while getting item details: \"\n\n\tdef __init__(self, op_path_windows, op_path_macOS, sign_in_address, email_address, secret_key, master_password, create_dynamic_credentials):\n\t\tself.op_path = op_path_macOS if RoyalUtils.is_macOS() else op_path_windows\n\t\tself.sign_in_address = sign_in_address\n\t\tself.email_address = email_address\n\t\tself.secret_key = secret_key\n\t\tself.master_password = master_password\n\t\tself.create_dynamic_credentials = create_dynamic_credentials\n\t\t\n\tdef get_items(self, vault):\n\t\top = OnePassword(self.op_path)\n\n\t\ttry:\n\t\t\top.sign_out()\n\t\texcept Exception:\n\t\t\tpass\n\n\t\ttry:\n\t\t\top.sign_in(self.sign_in_address, self.email_address, self.secret_key, self.master_password)\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_sign_in, e)\n\n\t\tvaults = None\n\n\t\ttry:\n\t\t\tvaults = op.get_vaults()\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_get_vaults, e)\n\t\t\n\t\titems = None\n\n\t\ttry:\n\t\t\titems = op.get_items(vault)\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_get_items, e)\n\t\t\n\t\titems_details = [ ]\n\n\t\tif not self.create_dynamic_credentials:\n\t\t\tfor item in items:\n\t\t\t\tid = \"\"\n\n\t\t\t\tif op.is_op_cli_v2:\n\t\t\t\t\tid = item.get(\"id\", \"\")\n\t\t\t\telse:\n\t\t\t\t\tid = item.get(\"uuid\", \"\")\n\t\t\t\t\n\t\t\t\titem_details = op.get_item_details(id)\n\n\t\t\t\titem[\"__item_details\"] = item_details\n\n\t\ttry:\n\t\t\top.sign_out()\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_sign_out, e)\n\n\t\tstore = Converter.convert_items(vaults, items, op.is_op_cli_v2)\n\t\tstore_json = RoyalUtils.to_json(store, True)\n\n\t\tprint(store_json)\n\t\n\tdef get_item_details(self, item_id):\n\t\top = OnePassword(self.op_path)\n\n\t\ttry:\n\t\t\top.sign_out()\n\t\texcept Exception:\n\t\t\tpass\n\n\t\ttry:\n\t\t\top.sign_in(self.sign_in_address, self.email_address, self.secret_key, self.master_password)\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_sign_in, e)\n\n\t\titem_details = None\n\n\t\ttry:\n\t\t\titem_details = op.get_item_details(item_id)\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_get_item_details, e)\n\n\t\ttry:\n\t\t\top.sign_out()\n\t\texcept Exception as e:\n\t\t\tRoyalUtils.exit_with_error(self.error_message_sign_out, e)\n\n\t\tstore = Converter.convert_item(item_details, op.is_op_cli_v2)\n\t\tstore_json = RoyalUtils.to_json(store, True)\n\n\t\tprint(store_json)\n\ncoordinator = Coordinator(op_path_windows, op_path_macOS, sign_in_address, email_address, secret_key, master_password, create_dynamic_credentials)\n# coordinator.get_items(filter_vault)\ncoordinator.get_item_details(item_id)" } ] } \ No newline at end of file