diff --git a/.gitignore b/.gitignore index 6e8ad97..58e6a70 100644 --- a/.gitignore +++ b/.gitignore @@ -42,3 +42,6 @@ Network Trash Folder Temporary Items .apdisk .jenkins-external + +# Local virtual environment +.venv/ \ No newline at end of file diff --git a/root/app/ldap-backend-app.py b/root/app/ldap-backend-app.py index 6b72732..9f53123 100644 --- a/root/app/ldap-backend-app.py +++ b/root/app/ldap-backend-app.py @@ -9,7 +9,7 @@ # 1) accepts GET requests on /login and /ldaplogin and responds with a login form # 2) accepts POST requests on /login and /ldaplogin, sets a cookie, and responds with redirect -import sys, os, signal, base64, cgi +import sys, os, signal, base64, cgi, datetime if sys.version_info.major == 2: from urlparse import urlparse from Cookie import BaseCookie @@ -141,7 +141,9 @@ def do_POST(self): cipher_suite = Fernet(fernetkey) enc = cipher_suite.encrypt(ensure_bytes(user + ':' + passwd)) enc = enc.decode() - self.send_header('Set-Cookie', 'nginxauth=' + enc + '; httponly') + expires = datetime.datetime.now(datetime.UTC) + datetime.timedelta(weeks=1) + expires_str = expires.strftime('%a, %d-%b-%Y %H:%M:%S GMT') + self.send_header('Set-Cookie', 'nginxauth=' + enc + f'; Expires={expires_str}; httponly') self.send_header('Location', target) self.end_headers() @@ -171,4 +173,4 @@ def exit_handler(signal, frame): if __name__ == '__main__': server = AuthHTTPServer(Listen, AppHandler) signal.signal(signal.SIGINT, exit_handler) - server.serve_forever() + server.serve_forever() \ No newline at end of file diff --git a/root/app/nginx-ldap-auth-daemon.py b/root/app/nginx-ldap-auth-daemon.py index 0c5f1c6..78ee0b9 100644 --- a/root/app/nginx-ldap-auth-daemon.py +++ b/root/app/nginx-ldap-auth-daemon.py @@ -82,31 +82,69 @@ def do_GET(self): return True - ctx['action'] = 'decoding credentials' + ctx['action'] = 'decoding credentials with fernet key' + user: str | None + passwd: str | None + error: str | None + user, passwd, error = self.decode_credentials_with_fernet_key(ctx, auth_header) + + if error == "InvalidToken": + self.log_message('Incorrect token.') + ctx['action'] = 'decoding credentials with base64' + user, passwd = self.decode_credentials_with_base64(ctx, auth_header) + + if user is None and passwd is None: + return True + + ctx['user'] = ldap.filter.escape_filter_chars(user) + ctx['pass'] = passwd + + # Continue request processing + return False + + + def decode_credentials_with_fernet_key( + self, + ctx: dict, + auth_header: str + ) -> tuple[str, str, str]: + user: str | None = None + passwd: str | None = None + error: str | None = None try: fernetkey = os.getenv("FERNET_KEY").encode() cipher_suite = Fernet(fernetkey) - self.log_message('Trying to dechipher credentials...') + self.log_message('Trying to dechipher credentials with Fernet Key...') auth_decoded = auth_header[6:].encode() auth_decoded = cipher_suite.decrypt(auth_decoded) auth_decoded = auth_decoded.decode("utf-8") user, passwd = auth_decoded.split(':', 1) - except InvalidToken: - self.log_message('Incorrect token. Trying to decode credentials from BASE64...') + except (InvalidToken, TypeError, UnicodeDecodeError): + error = "InvalidToken" + except Exception as e: + self.auth_failed(ctx) + self.log_error(str(e)) + return user, passwd, error + + + def decode_credentials_with_base64( + self, + ctx: dict, + auth_header: str + ) -> tuple[str, str]: + user: str | None = None + passwd: str | None = None + try: + self.log_message('Trying to decode credentials from BASE64...') auth_decoded = base64.b64decode(auth_header[6:]) auth_decoded = auth_decoded.decode("utf-8") user, passwd = auth_decoded.split(':', 1) except Exception as e: self.auth_failed(ctx) - self.log_error(e) - return True - - ctx['user'] = ldap.filter.escape_filter_chars(user) - ctx['pass'] = passwd + self.log_error(str(e)) + return user, passwd - # Continue request processing - return False def get_cookie(self, name): cookies = self.headers.get('Cookie') @@ -217,7 +255,7 @@ def do_GET(self): return ldap.set_option(ldap.OPT_X_TLS_REQUIRE_CERT, ldap.OPT_X_TLS_ALLOW) - + ctx['action'] = 'initializing LDAP connection' ldap_obj = ldap.initialize(ctx['url']); @@ -254,7 +292,7 @@ def do_GET(self): searchfilter, ['objectclass'], 1) ctx['action'] = 'verifying search query results' - + nres = len(results) if nres < 1: @@ -363,4 +401,3 @@ def exit_handler(signal, frame): sys.stdout.write("Start listening on %s:%d...\n" % Listen) sys.stdout.flush() server.serve_forever() -