/
_py_op_cli.py
181 lines (149 loc) · 7.06 KB
/
_py_op_cli.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import os
import pathlib
import json
import logging
from json.decoder import JSONDecodeError
import subprocess
from os import environ as env
from .py_op_exceptions import (
OPConfigNotFoundException,
OPSigninException,
OPNotFoundException
)
"""
Module to hold stuff that interacts directly with 'op' or its config
TODO: Move other code that closely touches 'op' here
"""
class OPCLIConfig(dict):
OP_CONFIG_RELPATH = os.path.join(".op", "config")
def __init__(self, configpath=None):
super().__init__()
if configpath is None:
configpath = self._get_config_path()
if configpath is None:
raise OPConfigNotFoundException("No op configuration found")
try:
config_json = open(configpath, "r").read()
except FileNotFoundError as e:
raise OPConfigNotFoundException(
"op config not found at path: {}".format(configpath)) from e
except PermissionError as e:
raise OPConfigNotFoundException(
"Permission denied accessing op config at path: {}".format(configpath)) from e
try:
config = json.loads(config_json)
self.update(config)
except JSONDecodeError as e:
raise OPConfigNotFoundException(
"Unable to json decode config at path: {}".format(configpath)) from e
def _get_config_path(self):
try:
xdg_path = os.environ['XDG_CONFIG_HOME']
configpath = os.path.join(xdg_path, self.OP_CONFIG_RELPATH)
except KeyError:
configpath = None
if configpath is None:
configpath = os.path.join(
pathlib.Path.home(), self.OP_CONFIG_RELPATH)
return configpath
class _OPCLIExecute:
"""
Class for logging into and querying a 1Password account via the 'op' cli command.
"""
def __init__(self, account_shorthand=None, signin_address=None, email_address=None,
secret_key=None, password=None, logger=None, op_path='op'):
"""
Create an OP object. The 1Password sign-in happens during object instantiation.
If 'password' is not provided, the 'op' command will prompt on the console for a password.
If all components of a 1Password account are provided, an initial sign-in is performed,
otherwise, a normal sign-in is performed. See `op --help` for further explanation.
Arguments:
- 'account_shorthand': The shorthand name for the account on this device.
You may choose this during initial signin, otherwise
1Password converts it from your account address.
See 'op signin --help' for more information.
- 'signin_address': Fully qualified address of the 1Password account.
E.g., 'my-account.1password.com'
- 'email_address': Email of the address for the user of the account
- 'secret_key': Secret key for the account
- 'password': The user's master password
- 'logger': A logging object. If not provided a basic logger is created and used.
- 'op_path': optional path to the `op` command, if it's not at the default location
Raises:
- OPSigninException if 1Password sign-in fails for any reason.
- OPNotFoundException if the 1Password command can't be found.
"""
if not logger:
logging.basicConfig(format="%(message)s", level=logging.DEBUG)
logger = logging.getLogger()
self.logger = logger
if account_shorthand is None:
config = OPCLIConfig()
try:
account_shorthand = config['latest_signin']
self.logger.debug(
"Using account shorthand found in op config: {}".format(account_shorthand))
except KeyError:
account_shorthand = None
if account_shorthand is None:
raise OPSigninException(
"Account shorthand not provided and not found in 'op' config")
self.account_shorthand = account_shorthand
self.op_path = op_path
initial_signin_args = [account_shorthand,
signin_address,
email_address,
secret_key,
password]
initial_signin = (None not in initial_signin_args)
if initial_signin:
self.token = self._do_initial_signin(*initial_signin_args)
# export OP_SESSION_<signin_address>
else:
self.token = self._do_normal_signin(password)
sess_var_name = 'OP_SESSION_{}'.format(self.account_shorthand)
# TODO: return alread-decoded token from sign-in
env[sess_var_name] = self.token.decode()
def _do_normal_signin(self, password):
self.logger.info("Doing normal (non-initial) 1Password sign-in")
signin_argv = [self.op_path, "signin", "--output=raw"]
print("")
token = self._run_signin(signin_argv, password=password).rstrip()
return token
def _do_initial_signin(self, account_shorthand, signin_address, email_address, secret_key, password):
self.logger.info(
"Performing initial 1Password sign-in to {} as {}".format(signin_address, email_address))
signin_argv = [self.op_path, "signin", signin_address,
email_address, secret_key, "--output=raw"]
print("")
token = self._run_signin(signin_argv, password=password).rstrip()
return token
def _run_signin(self, argv, password=None):
return self._run(argv, OPSigninException, capture_stdout=True, input_string=password)
def _run(self, argv, op_exception_class, capture_stdout=False, input_string=None, decode=None):
_ran = None
stdout = subprocess.PIPE if capture_stdout else None
if input_string:
if isinstance(input_string, str):
input_string = input_string.encode("utf-8")
try:
_ran = subprocess.run(argv, input=input_string,
stderr=subprocess.PIPE, stdout=stdout, env=env)
except FileNotFoundError as err:
self.logger.error(
"1Password 'op' command not found at: {}".format(argv[0]))
self.logger.error(
"See https://support.1password.com/command-line-getting-started/ for more information,")
self.logger.error(
"or install from Homebrew with: 'brew install 1password-cli")
raise OPNotFoundException(argv[0], err.errno) from err
output = None
try:
_ran.check_returncode()
if capture_stdout:
output = _ran.stdout.decode(decode) if decode else _ran.stdout
except subprocess.CalledProcessError as err:
stderr_output = _ran.stderr.decode("utf-8").rstrip()
returncode = _ran.returncode
raise op_exception_class(stderr_output, returncode) from err
return output