In [1]:
import json
import requests
from pprint import pprint

# base_url = "http://localhost:8000" # local
base_url = "http://localhost:8002"  # docker

### Check API Health


In [2]:
path = "/healthz"
endpoint = f"{base_url}{path}"
print(endpoint)
response = requests.get(endpoint)
print(f"Health: {response.ok}")

http://localhost:8002/healthz
Health: True


## Get Data


### All Events


In [21]:
path = "/api/events"
endpoint = f"{base_url}{path}"
print(endpoint)
response = requests.get(endpoint)
print(f"status code {response.status_code}")
if response.ok:
    data = response.json()
    pprint(data)
    # for row in data.get("results"):
    #     print(row)

http://localhost:8002/api/events
status code 200
{'count': 3,
 'results': [{'created_at': '2025-06-15T14:08:49.027936Z',
              'description': 'inline event test',
              'id': 3,
              'page': '/my_third_page',
              'updated_at': '2025-06-15T14:08:55.719603Z'},
             {'created_at': '2025-06-15T14:08:45.086905Z',
              'description': 'my default description',
              'id': 2,
              'page': '/my_second_page',
              'updated_at': '2025-06-15T14:08:45.086926Z'},
             {'created_at': '2025-06-15T14:08:32.470390Z',
              'description': 'my default description',
              'id': 1,
              'page': '/my_new_webpage',
              'updated_at': '2025-06-15T14:08:32.470411Z'}]}


### Simple Data Types: Single Event


In [None]:
# path = "/api/events/1"
path = "/api/events/5"
endpoint = f"{base_url}{path}"


print(endpoint)
r = requests.get(endpoint)
print(r.ok, r.status_code)
if r.ok:
    data = r.json()
    print(data)

### List Data Types: Multiple Events


In [None]:
path = "/api/events"
endpoint = f"{base_url}{path}"
response = requests.get(endpoint)
print(endpoint, response.status_code)
if response.ok:
    data = response.json()
    print(data)

### Simple Data Types: Post Event

Not a valid json data


In [None]:
path = "/api/events"
endpoint = f"{base_url}{path}"
# print(endpoint)
# payload = {"page": "/test+"}
payload = {"id": 27}
response = requests.post(endpoint, data=payload)
print(response.headers)
print(response.text)
if response.ok:
    data = response.json()
    print(data)

Valid json data


In [None]:
# import json

payload = {"page": "/test+"}
headers = {"Content-Type": "application/json"}

response = requests.post(endpoint, data=json.dumps(payload), headers=headers)

# Print the response
print(response.status_code)
# print(response.headers) -> still receiving 'content-type': 'application/json', even if the headers changed to xml type
print(response.headers)
print(response.text)

In [None]:
# import requests
# import xml.etree.ElementTree as ET

# # API endpoint
# url = "http://localhost:8000/api/events"

# # Create the XML payload
# root = ET.Element("event")
# page = ET.SubElement(root, "page")
# page.text = "/test+"

# # Convert the XML to a string
# xml_data = ET.tostring(root, encoding="utf8").decode("utf8")

# # Set the headers
# headers = {"Content-Type": "application/xml"}

# # Send the POST request
# response = requests.post(url, data=xml_data, headers=headers)

# # Print the response
# print(response.status_code)
# print(response.headers)
# print(response.text)

## Send Data to the API


### Create Event Data


In [12]:
path = "/api/events/"
create_endpoint = f"{base_url}{path}"

# response = requests.post(create_endpoint, json={"page": "/my_new_webpage"})
# response = requests.post(create_endpoint, json={"page": "/my_second_page"})
response = requests.post(create_endpoint, json={"page": "/my_third_page"})

# print(response.status_code)
# print(response.headers)
# print(response.text)
if response.ok:
    data = response.json()
    print(data, type(data))
else:
    print(response.text)

{'page': '/my_third_page', 'created_at': '2025-06-15T14:08:49.027936Z', 'updated_at': '2025-06-15T14:08:49.027942Z', 'description': 'my default description', 'id': 3} <class 'dict'>


### Update Event Data


In [13]:
detail_path = "/api/events/3"
detail_endpoint = f"{base_url}{detail_path}"
r = requests.put(detail_endpoint, json={"description": "inline event test", "id": 555})
print(r.ok, r.status_code)
# print(r.headers)
if r.ok:
    data = r.json()
    print(type(data), data)
else:
    print(response.text)

True 200
<class 'dict'> {'page': '/my_third_page', 'created_at': '2025-06-15T14:08:49.027936Z', 'updated_at': '2025-06-15T14:08:55.719603Z', 'description': 'inline event test', 'id': 3}


## Delete Event Data


In [None]:
detail_path = "/api/events/6"
detail_endpoint = f"{base_url}{detail_path}"
r = requests.delete(detail_endpoint)
print(r.ok, r.status_code)
if r.ok:
    data = r.json()
    print(type(data), data)
else:
    print(response.text)
    print(r.headers)

False 404
{"status":"ok"}
{'date': 'Sun, 15 Jun 2025 11:31:34 GMT', 'server': 'uvicorn', 'content-length': '29', 'content-type': 'application/json'}


## Send multiple datasets to the API


In [3]:
import random

path = "/api/events/"
create_endpoint = f"{base_url}{path}"

event_count = 10_000
pages = ["/about", "/contact","/pages","/pricing"]

for i in range(event_count):
    page = random.choice(pages)
    response = requests.post(create_endpoint, json={"page":page})
    if response.ok:
        if i % 100==0:
            data = response.json()
            print(data, type(data), data.get("items"))
    else:
        print(response.text)

{'page': '/pricing', 'updated_at': '2025-06-17T16:53:56.731502', 'time': '2025-06-17T16:53:56.730638Z', 'id': 1, 'description': 'my default description'} <class 'dict'> None
{'page': '/pages', 'updated_at': '2025-06-17T16:53:58.851274', 'time': '2025-06-17T16:53:58.850912Z', 'id': 101, 'description': 'my default description'} <class 'dict'> None
{'page': '/pages', 'updated_at': '2025-06-17T16:54:00.889706', 'time': '2025-06-17T16:54:00.889154Z', 'id': 201, 'description': 'my default description'} <class 'dict'> None
{'page': '/contact', 'updated_at': '2025-06-17T16:54:02.745170', 'time': '2025-06-17T16:54:02.744837Z', 'id': 301, 'description': 'my default description'} <class 'dict'> None
{'page': '/contact', 'updated_at': '2025-06-17T16:54:04.886163', 'time': '2025-06-17T16:54:04.885608Z', 'id': 401, 'description': 'my default description'} <class 'dict'> None
{'page': '/contact', 'updated_at': '2025-06-17T16:54:07.350607', 'time': '2025-06-17T16:54:07.350145Z', 'id': 501, 'descriptio

## SQLModel Queries

Outside Fastapi


In [4]:
import sys
from pathlib import Path

In [5]:
src_path = Path("../src/").resolve()
# print(src_path)
sys.path.append(str(src_path))

In [7]:
from sqlmodel import Session, select
from pprint  import pprint
from api.db.session import engine
from api.events.models import EventModel

In [8]:
with Session(engine) as session:
    query = select(EventModel).order_by(EventModel.updated_at.desc()).limit(10)
    compiled_query = query.compile(compile_kwargs={"literal_binds":True})
    print(compiled_query, end="\n\n")
    print(str(query))
    results = session.exec(query).all()
    pprint(results)

SELECT eventmodel.id, eventmodel.time, eventmodel.page, eventmodel.description, eventmodel.updated_at 
FROM eventmodel ORDER BY eventmodel.updated_at DESC
 LIMIT 10

SELECT eventmodel.id, eventmodel.time, eventmodel.page, eventmodel.description, eventmodel.updated_at 
FROM eventmodel ORDER BY eventmodel.updated_at DESC
 LIMIT :param_1
[EventModel(page='/pages', updated_at=datetime.datetime(2025, 6, 17, 16, 58, 28, 111490), id=10000, time=datetime.datetime(2025, 6, 17, 16, 58, 28, 111220, tzinfo=datetime.timezone.utc), description='my default description'),
 EventModel(page='/pages', updated_at=datetime.datetime(2025, 6, 17, 16, 58, 28, 91531), id=9999, time=datetime.datetime(2025, 6, 17, 16, 58, 28, 91185, tzinfo=datetime.timezone.utc), description='my default description'),
 EventModel(page='/pricing', updated_at=datetime.datetime(2025, 6, 17, 16, 58, 28, 70763), id=9998, time=datetime.datetime(2025, 6, 17, 16, 58, 28, 70158, tzinfo=datetime.timezone.utc), description='my default desc

## Aggregate Data With Time Buckets


In [22]:
from sqlalchemy import func
from timescaledb.hyperfunctions import time_bucket
# from timescaledb.hyperfunctions import  time_bucket_gapfill 
from datetime import datetime, timedelta, timezone

with Session(engine) as session:
    # bucket = time_bucket("1 day", EventModel.time)  # bucket width and time field in the db table
    # bucket = time_bucket("1 hour", EventModel.time)  
    bucket = time_bucket("1 minute", EventModel.time)  
    # pages = ["/about", "/contact","/pages","/pricing"]
    pages = ["/about", "/contact","/pages"]
    start = datetime.now(timezone.utc) - timedelta(hours=1)
    query = (
        select(bucket, EventModel.page, func.count().label("event_count"))
        .where(EventModel.page.in_(pages))
        .group_by(bucket, EventModel.page)
        .order_by(bucket, EventModel.page)
        
        )
    # compiled_query = query.compile(compile_kwargs={"literal_binds":True})
    # print(compiled_query)    
    results = session.exec(query).fetchall()
    pprint(results)

[(datetime.datetime(2025, 6, 17, 16, 53, tzinfo=datetime.timezone.utc), '/about', 43),
 (datetime.datetime(2025, 6, 17, 16, 53, tzinfo=datetime.timezone.utc), '/contact', 41),
 (datetime.datetime(2025, 6, 17, 16, 53, tzinfo=datetime.timezone.utc), '/pages', 43),
 (datetime.datetime(2025, 6, 17, 16, 54, tzinfo=datetime.timezone.utc), '/about', 562),
 (datetime.datetime(2025, 6, 17, 16, 54, tzinfo=datetime.timezone.utc), '/contact', 577),
 (datetime.datetime(2025, 6, 17, 16, 54, tzinfo=datetime.timezone.utc), '/pages', 518),
 (datetime.datetime(2025, 6, 17, 16, 55, tzinfo=datetime.timezone.utc), '/about', 547),
 (datetime.datetime(2025, 6, 17, 16, 55, tzinfo=datetime.timezone.utc), '/contact', 563),
 (datetime.datetime(2025, 6, 17, 16, 55, tzinfo=datetime.timezone.utc), '/pages', 582),
 (datetime.datetime(2025, 6, 17, 16, 56, tzinfo=datetime.timezone.utc), '/about', 536),
 (datetime.datetime(2025, 6, 17, 16, 56, tzinfo=datetime.timezone.utc), '/contact', 530),
 (datetime.datetime(2025, 6