In [1]:
import requests

In [2]:
endpoint = "http://127.0.0.1:5001"


In [3]:
def signup():

    signup_endpoint = f"{endpoint}/auth/signup"
    email = "test@test.nl"
    password = "tester123"
    login_data = {"email": email, "password": password}

    response = requests.post(signup_endpoint, json=login_data)
    returnable  = response.json()
    print(returnable)

    login_endpoint = f"{endpoint}/auth/login"

    response = requests.post(login_endpoint, json=login_data)
    access_token = response.json()["access_token"]
    print("access_token = ", access_token)
    return returnable, access_token

In [1]:
import requests
from typing import Dict, Optional

class ApplicationTester:
    endpoint = "http://127.0.0.1:5001"

    def __init__(self):
        self.access_token: Optional[str] = None
        self.refresh_token: Optional[str] = None
        self.session = requests.Session()
        self.login()

    # ---------- auth ----------
    def login(self):
        login_endpoint = f"{self.endpoint}/auth/login"
        data = {"email": "test@test.nl", "password": "tester123"}
        r = self.session.post(login_endpoint, json=data)
        r.raise_for_status()
        payload = r.json()
        # Keep both tokens
        self.access_token = payload["access_token"]
        self.refresh_token = payload["refresh_token"]

    def _auth_header(self, use_refresh: bool = False) -> Dict[str, str]:
        token = self.refresh_token if use_refresh else self.access_token
        return {"Authorization": f"Bearer {token}"} if token else {}

    def refresh(self) -> bool:
        """Use the refresh token to get a new pair of tokens. Returns True on success."""
        refresh_endpoint = f"{self.endpoint}/auth/refresh"
        r = self.session.post(refresh_endpoint, headers=self._auth_header(use_refresh=True))
        if r.status_code == 200:
            payload = r.json()
            self.access_token = payload["access_token"]
            self.refresh_token = payload["refresh_token"]
            return True
        return False

    # ---------- request wrapper that auto-refreshes ----------
    def _request(self, method: str, path: str, **kwargs):
        if not path.startswith("/"):
            raise ValueError("Path should start with /")
        url = f"{self.endpoint}{path}"

        # 1st attempt with access token
        headers = kwargs.pop("headers", {})
        headers.update(self._auth_header())
        resp = self.session.request(method, url, headers=headers, **kwargs)

        # If access token expired, try once to refresh and retry
        if resp.status_code == 401:
            # Optional: check msg to ensure itâ€™s token-expired, not other auth error
            try:
                msg = resp.json().get("msg", "").lower()
            except Exception:
                msg = ""
            if "token has expired" in msg or "signature verification failed" in msg or "not fresh" in msg or not msg:
                if self.refresh():
                    headers = kwargs.get("headers", {})
                    headers.update(self._auth_header())
                    return self.session.request(method, url, headers=headers, **kwargs)
        return resp

    # ---------- public HTTP helpers ----------
    def post(self, path: str, json_data: Dict):
        return self._request("POST", path, json=json_data)

    def get(self, path: str, search_params=None):
        return self._request("GET", path, params=search_params)

    def put(self, path: str, json_data: Dict):
        return self._request("PUT", path, json=json_data)

app = ApplicationTester()

### View all prior made scraper cluster instances

In [5]:
app.get("/scraper_cluster").json()

[{'cluster_entity_id': '68e10e7f9f0e01b7def2e413',
  'created_at': '2025-10-04T12:09:12.453000+00:00',
  'deleted_at': None,
  'id': '68e10e689f0e01b7def2e411',
  'scraper_entity_id': '68e10e699f0e01b7def2e412',
  'stages': {'cluster_enrich': 'initialized',
   'cluster_prep': 'completed',
   'clustering': 'initialized',
   'initialized': 'completed',
   'scraping': 'completed'},
  'updated_at': '2025-10-04T12:10:05.342000+00:00',
  'user_id': '68ac7022e24c87692ba648f4'},
 {'cluster_entity_id': '68e13064c04b5f3e7ed60c9d',
  'created_at': '2025-10-04T14:32:43.391000+00:00',
  'deleted_at': None,
  'id': '68e1300bc04b5f3e7ed60c93',
  'scraper_entity_id': '68e1300fc04b5f3e7ed60c94',
  'stages': {'cluster_enrich': 'initialized',
   'cluster_prep': 'completed',
   'clustering': 'initialized',
   'initialized': 'completed',
   'scraping': 'completed'},
  'updated_at': '2025-10-04T14:34:41.389000+00:00',
  'user_id': '68ac7022e24c87692ba648f4'}]

### Create a new instance of the scraper cluster

In [None]:
scraper_cluster_instance = app.post("/scraper_cluster", json_data={}).json()
scraper_cluster_instance

{'scraper_cluster_id': '68e1300bc04b5f3e7ed60c93'}

In [4]:
scraper_cluster_instance = {'scraper_cluster_id': '68e1300bc04b5f3e7ed60c93'}

### Create the scraping stage, where you define the keywords and the subreddits to look for

In [None]:
scraper_request = {"keywords": ["subtitles", "video", "grammar"], "subreddits": ["deaf", "asl", "bsl"]}
scraper_request["scraper_cluster_id"] = scraper_cluster_instance["scraper_cluster_id"]
print(scraper_request)

In [None]:


scraper_instance = app.post("/scraper", json_data=scraper_request).json()
scraper_instance

{'keywords': ['subtitles', 'video', 'grammar'], 'subreddits': ['deaf', 'asl', 'bsl'], 'scraper_cluster_id': '68e1300bc04b5f3e7ed60c93'}


In [19]:
scraper_instance

{'scraper_id': '68e1300fc04b5f3e7ed60c94'}

### Start the scraper

In [11]:
response_scraper = app.put("/scraper/start", {"scraper_cluster_id": scraper_request["scraper_cluster_id"]}).json()
response_scraper

{'message': 'successfully scraped the scraper instance on reddit',
 'paused': False,
 'processed': 9,
 'total': 9}

### Create a single dimension of the posts and comments in an unnested system

# :TODO Right now we don't do anything with user provided media. SO if the message was actually a picture or video, we disregard it. Which is bad

In [7]:
response_cluster_prep = app.post("/clustering/prepare_cluster", {"scraper_cluster_id": scraper_request["scraper_cluster_id"]}).json()
response_cluster_prep

{'message': 'preparing the cluster is successful, a total of 5819 cluster units are created'}

### Here should come the step for calling the route to create LLM summaries of the comment threads

In [None]:
# TODO do this later first make the baseline

{'message': 'preparing the cluster is successful, a total of 2532 cluster units are created'}

### Call the clustering route to cluster all the documents that are found

In [22]:
cluster_unit_entities = app.post("/clustering/get_cluster_units", json_data={"scraper_cluster_id": scraper_request["scraper_cluster_id"]})  

In [23]:
len(cluster_unit_entities.json()["cluster_unit_entities"])

2536

### We are now saving the data in a local format. So now we go futher in a different notebook for testing.

The size of the json is only 6 MB for 2532 cluster units

In [107]:
import json
with open("testfile.json", "w") as f:
    f.write(json.dumps(cluster_unit_entities.json(), indent=4))

### Call the Q&A route to get to the insights of the scraper