Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 109 additions & 31 deletions polyapi/generate.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@
import requests
import os
import shutil
import logging
import tempfile
from typing import Any, List, Optional, Tuple, cast

from .auth import render_auth_function
Expand Down Expand Up @@ -426,48 +428,124 @@ def add_function_file(
function_name: str,
spec: SpecificationDto,
):
# first lets add the import to the __init__
init_the_init(full_path)
"""
Atomically add a function file to prevent partial corruption during generation failures.

This function generates all content first, then writes files atomically using temporary files
to ensure that either the entire operation succeeds or no changes are made to the filesystem.
"""
try:
# first lets add the import to the __init__
init_the_init(full_path)

func_str, func_type_defs = render_spec(spec)
func_str, func_type_defs = render_spec(spec)

if func_str:
# add function to init
init_path = os.path.join(full_path, "__init__.py")
with open(init_path, "a") as f:
f.write(f"\n\nfrom . import {to_func_namespace(function_name)}\n\n{func_str}")
if not func_str:
# If render_spec failed and returned empty string, don't create any files
raise Exception("Function rendering failed - empty function string returned")

# add type_defs to underscore file
file_path = os.path.join(full_path, f"{to_func_namespace(function_name)}.py")
with open(file_path, "w") as f:
f.write(func_type_defs)
# Prepare all content first before writing any files
func_namespace = to_func_namespace(function_name)
init_path = os.path.join(full_path, "__init__.py")
func_file_path = os.path.join(full_path, f"{func_namespace}.py")

# Read current __init__.py content if it exists
init_content = ""
if os.path.exists(init_path):
with open(init_path, "r") as f:
init_content = f.read()

# Prepare new content to append to __init__.py
new_init_content = init_content + f"\n\nfrom . import {func_namespace}\n\n{func_str}"

# Use temporary files for atomic writes
# Write to __init__.py atomically
with tempfile.NamedTemporaryFile(mode="w", delete=False, dir=full_path, suffix=".tmp") as temp_init:
temp_init.write(new_init_content)
temp_init_path = temp_init.name

# Write to function file atomically
with tempfile.NamedTemporaryFile(mode="w", delete=False, dir=full_path, suffix=".tmp") as temp_func:
temp_func.write(func_type_defs)
temp_func_path = temp_func.name

# Atomic operations: move temp files to final locations
shutil.move(temp_init_path, init_path)
shutil.move(temp_func_path, func_file_path)

except Exception as e:
# Clean up any temporary files that might have been created
try:
if 'temp_init_path' in locals() and os.path.exists(temp_init_path):
os.unlink(temp_init_path)
if 'temp_func_path' in locals() and os.path.exists(temp_func_path):
os.unlink(temp_func_path)
except:
pass # Best effort cleanup

# Re-raise the original exception
raise e


def create_function(
spec: SpecificationDto
) -> None:
"""
Create a function with atomic directory and file operations.

Tracks directory creation to enable cleanup on failure.
"""
full_path = os.path.dirname(os.path.abspath(__file__))
folders = f"poly.{spec['context']}.{spec['name']}".split(".")
for idx, folder in enumerate(folders):
if idx + 1 == len(folders):
# special handling for final level
add_function_file(
full_path,
folder,
spec,
)
else:
full_path = os.path.join(full_path, folder)
if not os.path.exists(full_path):
os.makedirs(full_path)

# append to __init__.py file if nested folders
next = folders[idx + 1] if idx + 2 < len(folders) else ""
if next:
init_the_init(full_path)
add_import_to_init(full_path, next)
created_dirs = [] # Track directories we create for cleanup on failure

try:
for idx, folder in enumerate(folders):
if idx + 1 == len(folders):
# special handling for final level
add_function_file(
full_path,
folder,
spec,
)
else:
full_path = os.path.join(full_path, folder)
if not os.path.exists(full_path):
os.makedirs(full_path)
created_dirs.append(full_path) # Track for cleanup

# append to __init__.py file if nested folders
next = folders[idx + 1] if idx + 2 < len(folders) else ""
if next:
init_the_init(full_path)
add_import_to_init(full_path, next)

except Exception as e:
# Clean up directories we created (in reverse order)
for dir_path in reversed(created_dirs):
try:
if os.path.exists(dir_path) and not os.listdir(dir_path): # Only remove if empty
os.rmdir(dir_path)
except:
pass # Best effort cleanup

# Re-raise the original exception
raise e


def generate_functions(functions: List[SpecificationDto]) -> None:
failed_functions = []
for func in functions:
create_function(func)
try:
create_function(func)
except Exception as e:
function_path = f"{func.get('context', 'unknown')}.{func.get('name', 'unknown')}"
function_id = func.get('id', 'unknown')
failed_functions.append(f"{function_path} (id: {function_id})")
logging.warning(f"WARNING: Failed to generate function {function_path} (id: {function_id}): {str(e)}")
continue

if failed_functions:
logging.warning(f"WARNING: {len(failed_functions)} function(s) failed to generate:")
for failed_func in failed_functions:
logging.warning(f" - {failed_func}")
164 changes: 125 additions & 39 deletions polyapi/poly_schemas.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
import os
import logging
import tempfile
import shutil
from typing import Any, Dict, List, Tuple

from polyapi.schema import wrapped_generate_schema_types
Expand All @@ -21,65 +24,148 @@


def generate_schemas(specs: List[SchemaSpecDto], limit_ids: List[str] = None):
failed_schemas = []
if limit_ids:
for spec in specs:
if spec["id"] in limit_ids:
create_schema(spec)
try:
create_schema(spec)
except Exception as e:
schema_path = f"{spec.get('context', 'unknown')}.{spec.get('name', 'unknown')}"
schema_id = spec.get('id', 'unknown')
failed_schemas.append(f"{schema_path} (id: {schema_id})")
logging.warning(f"WARNING: Failed to generate schema {schema_path} (id: {schema_id}): {str(e)}")
continue
else:
for spec in specs:
create_schema(spec)
try:
create_schema(spec)
except Exception as e:
schema_path = f"{spec.get('context', 'unknown')}.{spec.get('name', 'unknown')}"
schema_id = spec.get('id', 'unknown')
failed_schemas.append(f"{schema_path} (id: {schema_id})")
logging.warning(f"WARNING: Failed to generate schema {schema_path} (id: {schema_id}): {str(e)}")
continue

if failed_schemas:
logging.warning(f"WARNING: {len(failed_schemas)} schema(s) failed to generate:")
for failed_schema in failed_schemas:
logging.warning(f" - {failed_schema}")


def add_schema_file(
full_path: str,
schema_name: str,
spec: SchemaSpecDto,
):
# first lets add the import to the __init__
init_the_init(full_path, SCHEMA_CODE_IMPORTS)

if not spec["definition"].get("title"):
# very empty schemas like mews.Unit are possible
# add a title here to be sure they render
spec["definition"]["title"] = schema_name

schema_defs = render_poly_schema(spec)

if schema_defs:
# add function to init
"""
Atomically add a schema file to prevent partial corruption during generation failures.

This function generates all content first, then writes files atomically using temporary files
to ensure that either the entire operation succeeds or no changes are made to the filesystem.
"""
try:
# first lets add the import to the __init__
init_the_init(full_path, SCHEMA_CODE_IMPORTS)

if not spec["definition"].get("title"):
# very empty schemas like mews.Unit are possible
# add a title here to be sure they render
spec["definition"]["title"] = schema_name

schema_defs = render_poly_schema(spec)

if not schema_defs:
# If render_poly_schema failed and returned empty string, don't create any files
raise Exception("Schema rendering failed - empty schema content returned")

# Prepare all content first before writing any files
schema_namespace = to_func_namespace(schema_name)
init_path = os.path.join(full_path, "__init__.py")
with open(init_path, "a") as f:
f.write(f"\n\nfrom ._{to_func_namespace(schema_name)} import {schema_name}\n__all__.append('{schema_name}')\n")

# add type_defs to underscore file
file_path = os.path.join(full_path, f"_{to_func_namespace(schema_name)}.py")
with open(file_path, "w") as f:
f.write(schema_defs)
schema_file_path = os.path.join(full_path, f"_{schema_namespace}.py")

# Read current __init__.py content if it exists
init_content = ""
if os.path.exists(init_path):
with open(init_path, "r") as f:
init_content = f.read()

# Prepare new content to append to __init__.py
new_init_content = init_content + f"\n\nfrom ._{schema_namespace} import {schema_name}\n__all__.append('{schema_name}')\n"

# Use temporary files for atomic writes
# Write to __init__.py atomically
with tempfile.NamedTemporaryFile(mode="w", delete=False, dir=full_path, suffix=".tmp") as temp_init:
temp_init.write(new_init_content)
temp_init_path = temp_init.name

# Write to schema file atomically
with tempfile.NamedTemporaryFile(mode="w", delete=False, dir=full_path, suffix=".tmp") as temp_schema:
temp_schema.write(schema_defs)
temp_schema_path = temp_schema.name

# Atomic operations: move temp files to final locations
shutil.move(temp_init_path, init_path)
shutil.move(temp_schema_path, schema_file_path)

except Exception as e:
# Clean up any temporary files that might have been created
try:
if 'temp_init_path' in locals() and os.path.exists(temp_init_path):
os.unlink(temp_init_path)
if 'temp_schema_path' in locals() and os.path.exists(temp_schema_path):
os.unlink(temp_schema_path)
except:
pass # Best effort cleanup

# Re-raise the original exception
raise e


def create_schema(
spec: SchemaSpecDto
) -> None:
"""
Create a schema with atomic directory and file operations.

Tracks directory creation to enable cleanup on failure.
"""
full_path = os.path.dirname(os.path.abspath(__file__))
folders = f"schemas.{spec['context']}.{spec['name']}".split(".")
for idx, folder in enumerate(folders):
if idx + 1 == len(folders):
# special handling for final level
add_schema_file(
full_path,
folder,
spec,
)
else:
full_path = os.path.join(full_path, folder)
if not os.path.exists(full_path):
os.makedirs(full_path)

# append to __init__.py file if nested folders
next = folders[idx + 1] if idx + 2 < len(folders) else ""
if next:
init_the_init(full_path, SCHEMA_CODE_IMPORTS)
add_import_to_init(full_path, next)
created_dirs = [] # Track directories we create for cleanup on failure

try:
for idx, folder in enumerate(folders):
if idx + 1 == len(folders):
# special handling for final level
add_schema_file(
full_path,
folder,
spec,
)
else:
full_path = os.path.join(full_path, folder)
if not os.path.exists(full_path):
os.makedirs(full_path)
created_dirs.append(full_path) # Track for cleanup

# append to __init__.py file if nested folders
next = folders[idx + 1] if idx + 2 < len(folders) else ""
if next:
init_the_init(full_path, SCHEMA_CODE_IMPORTS)
add_import_to_init(full_path, next)

except Exception as e:
# Clean up directories we created (in reverse order)
for dir_path in reversed(created_dirs):
try:
if os.path.exists(dir_path) and not os.listdir(dir_path): # Only remove if empty
os.rmdir(dir_path)
except:
pass # Best effort cleanup

# Re-raise the original exception
raise e


def add_schema_to_init(full_path: str, spec: SchemaSpecDto):
Expand Down
Loading