# Auth Log reader
This will read server logs saved by the pickleAuth addin

To get test data either visit a page or use:

In [1]:
import os
from datetime import datetime
from tqdm.auto import tqdm

def extract_timestamp(filename: str) -> str:
    # Extract the base filename without the directory
    base_name = os.path.basename(filename)
    
    # Split the filename to extract the timestamp part (assumes it's after 'auth_data_')
    timestamp_str = base_name.split('_')[2].replace('.pkl', '')
    
    # Convert to float (to handle sub-second precision)
    timestamp = float(timestamp_str)
    
    # Convert the timestamp to a human-readable datetime format
    readable_time = datetime.fromtimestamp(timestamp).strftime('%Y-%m-%d %H:%M:%S') 
    
    return readable_time

In [2]:
from pathlib import Path
import pickle 

authlogs = list(Path("data/auth_logs/").glob("*.pkl"))
testlog = authlogs[0]#"data/auth_logs/auth_data_1728130272.249.pkl"
with open(testlog, 'rb') as f:
    data = pickle.load(f)

# Inspect the data
print(f"File: {testlog} Time Saved: {extract_timestamp(testlog)}")
print(data)

File: data/auth_logs/auth_data_1728639980.461.pkl Time Saved: 2024-10-11 10:46:20
{'auth_payload': "'eyJraWQiOiJ3bzQ3dmNHcHFpNklZWk56M2FwM1FRb1p4NG1KbDJqZVJTaHZieG16VkdFPSIsImFsZyI6IlJTMjU2In0.eyJzdWIiOiJlNDc4NTQyOC00MDQxLTcwNjAtMWZiYS0wOGZjY2RjOWVjOWUiLCJpc3MiOiJodHRwczpcL1wvY29nbml0by1pZHAudXMtZWFzdC0xLmFtYXpvbmF3cy5jb21cL3VzLWVhc3QtMV94NTU0VXFVbjAiLCJ2ZXJzaW9uIjoyLCJjbGllbnRfaWQiOiJyZjM5ODh0ZjZqb2poYmZpOWF2NnNwaTNnIiwib3JpZ2luX2p0aSI6IjA3N2Y2Y2RhLWI3ZTYtNDQwOC1iOWM2LWM5NjAzZWJmNjIyYSIsInRva2VuX3VzZSI6ImFjY2VzcyIsInNjb3BlIjoib3BlbmlkIGVtYWlsIiwiYXV0aF90aW1lIjoxNzI4NjM5OTcyLCJleHAiOjE3Mjg2NDM1NzIsImlhdCI6MTcyODYzOTk3MywianRpIjoiYTA2MzIzYTMtYjE0OS00OGE0LThjM2EtOTM1NGIyMzJhZmIwIiwidXNlcm5hbWUiOiJlNDc4NTQyOC00MDQxLTcwNjAtMWZiYS0wOGZjY2RjOWVjOWUifQ.exiKA3cSCz8QatqGKZDBTytGHL9NyY-zPLtplbEB4wGhfasXjSNE2iEiVW-SBFlkvD_F0wiMCxNSSqB4OGET6sftzP8NhUI5OlVXWivNWX0lYDPj2S7-7I8CBFUPXeQQT_OzcQwJdGUWkUp4fkTUkIy3MI4rsWTHStGME7YXwUK-rV59UShCmj0ydc-JQ1I8k0gVgxXDCLdNUXt6ODIf_9hq3-KPGQbjvpY5RpzJAnDpR-GEp0Yf

In [3]:
for log in tqdm(authlogs):
    with open(log, 'rb') as f:
        data = pickle.load(f)
    auth_payload_str  = data['auth_payload']
    if auth_payload_str != "None":
        print("found one with a payload!")
        break

  0%|          | 0/1062 [00:00<?, ?it/s]

found one with a payload!


# Validating Header
Now that we have the Auth payload, let's validate it.

## Extra Imports

In [4]:
import time
import requests
import jwt
from botocore.exceptions import BotoCoreError, ClientError
from jwt.exceptions import DecodeError, ExpiredSignatureError
from pprint import pprint


## Settings

In [5]:
from dotenv import load_dotenv
load_dotenv("wsi_service/api/v3/integrations/.env")
idp_url = os.environ["idp_url"]
client_id = os.environ["client_id"]
jwks_url = os.environ["jwks_url"]
cognito_user_pool_id = os.environ["cognito_user_pool_id"]  # Add this to settings
aws_region = os.environ["aws_region"] # Add this to settings



## Special Function
Need to get the token from its' string representation due to the way it was saved:

In [6]:
import ast
auth_payload_str  = data['auth_payload']
auth_payload = auth_payload_str.strip("'")

token = auth_payload

## Functions

In [7]:
def validate_cognito_token(token):
    headers = jwt.get_unverified_header(token)
    kid = headers["kid"]

   
    response = requests.get(jwks_url)
    keys = response.json()["keys"]

    # Find the key that matches the kid in the JWT header
    key = next(k for k in keys if k["kid"] == kid)

    # Use the key to validate the token (you can use PyJWT or any other library here)
    public_key = jwt.algorithms.RSAAlgorithm.from_jwk(key)
    decoded_token = jwt.decode(token, public_key, algorithms=["RS256"],options={"verify_exp": False})

    return decoded_token

## Processing

In [8]:


try:
    # Validate the token against AWS Cognito
    decoded_token = validate_cognito_token( token)
    pprint(decoded_token)
    if decoded_token.get("client_id") != client_id:
        raise PermissionError("Invalid client ID")

    # Optionally, check custom claims or other parts of the token
    

except (DecodeError, ExpiredSignatureError) as e:
    raise PermissionError(f"Invalid token: {str(e)}")
except (BotoCoreError, ClientError) as e:
    raise PermissionError(f"Error validating token with Cognito: {str(e)}")

{'auth_time': 1728639972,
 'client_id': 'rf3988tf6jojhbfi9av6spi3g',
 'exp': 1728643572,
 'iat': 1728639973,
 'iss': 'https://cognito-idp.us-east-1.amazonaws.com/us-east-1_x554UqUn0',
 'jti': 'a06323a3-b149-48a4-8c3a-9354b232afb0',
 'origin_jti': '077f6cda-b7e6-4408-b9c6-c9603ebf622a',
 'scope': 'openid email',
 'sub': 'e4785428-4041-7060-1fba-08fccdc9ec9e',
 'token_use': 'access',
 'username': 'e4785428-4041-7060-1fba-08fccdc9ec9e',
 'version': 2}


# Check all saved tokens
Let's make sure it's being sent with every request.

In [9]:
fails = 0
goodlogs = []
for log in tqdm(authlogs):
    with open(log, 'rb') as f:
        data = pickle.load(f)
    auth_payload_str  = data['auth_payload']
    tokens = auth_payload_str.strip("'").split(" ") # split takes into account any "bearer= " style mess
    token=None
    for testtoken in tokens:
        if len(testtoken) > 20: # ignores anything too short in the token string
            token = testtoken
    try:
        decoded_token = validate_cognito_token( token)
        if decoded_token.get("client_id") != client_id:
            fails +=1
        else:
            goodlogs.append(log)
    except:
        #print("exception")
        #print(f"File: {log} Time Saved: {extract_timestamp(log)}")
        #pprint(data)
        #print(f"{data['auth_payload']=}")
        fails +=1
print(f"There were {fails} failed authorisation(s) out of {len(authlogs)} tokens")

  0%|          | 0/1062 [00:00<?, ?it/s]

There were 399 failed authorisation(s) out of 1062 tokens


In [13]:
log1 = goodlogs[1]
log2 = goodlogs[2]
with open(log1, 'rb') as f:
    data1 = pickle.load(f)
with open(log2, 'rb') as f:
    data2 = pickle.load(f)

auth_payload_str1  = data1['auth_payload']
auth_payload_str2  = data2['auth_payload']



print(auth_payload_str1==auth_payload_str2)

True


In [None]:
Subsequent tokens might be the same, as expiry time is only down to the second - if we can cache even the last request we might speed up response times substantially