Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
84 changes: 84 additions & 0 deletions tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,3 +194,87 @@ def client(token: Optional[str]):
async def async_client(token: Optional[str]):
async with AsyncClient({"auth": token}) as client:
yield client


@pytest.fixture(scope="function")
def multiple_test_pages(client, parent_page_id):
page_ids = []
for i in range(5):
response = client.pages.create(
parent={"page_id": parent_page_id},
properties={
"title": [
{
"text": {
"content": f"Test Page (test_iterate_paginated_api iteration {i})"
}
}
]
},
children=[],
)
page_ids.append(response["id"])

yield page_ids

for page_id in page_ids:
try:
client.blocks.delete(block_id=page_id)
except Exception:
pass


@pytest.fixture(scope="function")
async def async_multiple_test_pages(async_client, parent_page_id):
page_ids = []
for i in range(5):
response = await async_client.pages.create(
parent={"page_id": parent_page_id},
properties={
"title": [
{
"text": {
"content": f"Test Page (test_async_iterate_paginated_api iteration {i})"
}
}
]
},
children=[],
)
page_ids.append(response["id"])

yield page_ids

for page_id in page_ids:
try:
await async_client.blocks.delete(block_id=page_id)
except Exception:
pass


@pytest.fixture(scope="function")
async def async_test_data_source(async_client, parent_page_id):
database = await async_client.databases.create(
parent={"type": "page_id", "page_id": parent_page_id},
title=[{"type": "text", "text": {"content": "Test DB"}}],
properties={"Name": {"type": "title", "title": {}}},
)
database_id = database["id"]

data_source = await async_client.data_sources.create(
parent={"type": "database_id", "database_id": database_id},
properties={"Name": {"type": "title", "title": {}}},
title=[{"type": "text", "text": {"content": "Test Data Source"}}],
)
data_source_id = data_source["id"]

yield data_source_id

try:
await async_client.data_sources.update(data_source_id, archived=True)
except Exception:
pass
try:
await async_client.blocks.delete(block_id=database_id)
except Exception:
pass
78 changes: 14 additions & 64 deletions tests/test_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,21 +77,9 @@ def test_get_url():

@pytest.mark.vcr()
@pytest.mark.timeout(90)
def test_iterate_paginated_api(client, parent_page_id):
def create_page(page_name):
page = client.pages.create(
parent={"page_id": parent_page_id},
properties={"title": [{"text": {"content": page_name}}]},
children=[],
)
return page["id"]

page_ids = []
for i in range(0, 5):
page_id = create_page(f"Test Page (test_iterate_paginated_api iteration {i})")
page_ids.append(page_id)

# give time to Notion to index these pages
def test_iterate_paginated_api(client, multiple_test_pages):
page_ids = multiple_test_pages

time.sleep(20)

generator = iterate_paginated_api(
Expand Down Expand Up @@ -132,23 +120,9 @@ def test_collect_paginated_api(client):

@pytest.mark.vcr()
@pytest.mark.timeout(90)
async def test_async_iterate_paginated_api(async_client, parent_page_id):
async def create_page(page_name):
page = await async_client.pages.create(
parent={"page_id": parent_page_id},
properties={"title": [{"text": {"content": page_name}}]},
children=[],
)
return page["id"]

page_ids = []
for i in range(0, 5):
page_id = await create_page(
f"Test Page (test_async_iterate_paginated_api iteration {i})"
)
page_ids.append(page_id)

# give time to Notion to index these pages
async def test_async_iterate_paginated_api(async_client, async_multiple_test_pages):
page_ids = async_multiple_test_pages

await asyncio.sleep(20)

generator = async_iterate_paginated_api(
Expand Down Expand Up @@ -549,19 +523,10 @@ def test_collect_data_source_templates(client, data_source_id):


@pytest.mark.vcr()
async def test_async_iterate_data_source_templates(async_client, parent_page_id):
database = await async_client.databases.create(
parent={"type": "page_id", "page_id": parent_page_id},
title=[{"type": "text", "text": {"content": "Async Test DB"}}],
)
database_id = database["id"]

data_source_response = await async_client.data_sources.create(
parent={"type": "database_id", "database_id": database_id},
properties={"Name": {"type": "title", "title": {}}},
title=[{"type": "text", "text": {"content": "Async Test Data Source"}}],
)
data_source_id = data_source_response["id"]
async def test_async_iterate_data_source_templates(
async_client, async_test_data_source
):
data_source_id = async_test_data_source

generator = async_iterate_data_source_templates(
async_client.data_sources.list_templates,
Expand All @@ -571,31 +536,16 @@ async def test_async_iterate_data_source_templates(async_client, parent_page_id)
templates = [template async for template in generator]
assert isinstance(templates, list)

await async_client.data_sources.update(data_source_id, archived=True)
await async_client.blocks.delete(block_id=database_id)


@pytest.mark.vcr()
async def test_async_collect_data_source_templates(async_client, parent_page_id):
database = await async_client.databases.create(
parent={"type": "page_id", "page_id": parent_page_id},
title=[{"type": "text", "text": {"content": "Async Test DB 2"}}],
)
database_id = database["id"]

data_source_response = await async_client.data_sources.create(
parent={"type": "database_id", "database_id": database_id},
properties={"Name": {"type": "title", "title": {}}},
title=[{"type": "text", "text": {"content": "Async Test Data Source 2"}}],
)
data_source_id = data_source_response["id"]
async def test_async_collect_data_source_templates(
async_client, async_test_data_source
):
data_source_id = async_test_data_source

templates = await async_collect_data_source_templates(
async_client.data_sources.list_templates,
data_source_id=data_source_id,
)

assert isinstance(templates, list)

await async_client.data_sources.update(data_source_id, archived=True)
await async_client.blocks.delete(block_id=database_id)