forked from intake/intake
-
Notifications
You must be signed in to change notification settings - Fork 0
/
secret.py
66 lines (53 loc) · 1.84 KB
/
secret.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
#-----------------------------------------------------------------------------
# Copyright (c) 2012 - 2018, Anaconda, Inc. and Intake contributors
# All rights reserved.
#
# The full license is in the LICENSE file, distributed with this software.
#-----------------------------------------------------------------------------
import logging
from .base import BaseAuth, BaseClientAuth
import uuid
logger = logging.getLogger('intake')
class SecretAuth(BaseAuth):
"""A very simple auth mechanism using a shared secret
Parameters
----------
secret: str
The string that must be matched in the requests. If None, a random UUID
is generated and logged.
key: str
Header entry in which to seek the secret
"""
def __init__(self, secret=None, key='intake-secret'):
if secret is None:
secret = uuid.uuid1().hex
logger.info('Random server secret: %s' % secret)
self.secret = secret
self.key = key
def allow_connect(self, header):
try:
return self.get_case_insensitive(header, self.key, '') \
== self.secret
except:
return False
def allow_access(self, header, source, catalog):
try:
return self.get_case_insensitive(header, self.key, '') \
== self.secret
except:
return False
class SecretClientAuth(BaseClientAuth):
"""Matching client auth plugin to SecretAuth
Parameters
----------
secret: str
The string that must be included requests.
key: str
HTTP Header key for the shared secret
"""
def __init__(self, secret, key='intake-secret'):
self.secret = secret
self.key = key
super(SecretClientAuth, self).__init__()
def get_headers(self):
return {self.key: self.secret}