Skip to content

Commit

Permalink
Merge pull request #501 from phospho-app/dev
Browse files Browse the repository at this point in the history
Fix onboarding flow when uploading data
  • Loading branch information
oulianov committed Jun 11, 2024
2 parents 41ee28b + d38d435 commit 8ee66b2
Show file tree
Hide file tree
Showing 34 changed files with 1,318 additions and 695 deletions.
31 changes: 27 additions & 4 deletions backend/app/api/platform/endpoints/events.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,17 @@
from app.services.mongo.recipes import (
get_recipe_from_event_id,
run_recipe_on_tasks_batched,
)
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from phospho.models import ProjectDataFilters
from propelauth_fastapi import User

from app.api.platform.models import EventBackfillRequest, Event
from app.security.authentification import (
propelauth,
verify_if_propelauth_user_can_access_project,
)
from app.services.mongo.events import confirm_event, run_event_detection_on_timeframe
from app.services.mongo.events import confirm_event
from app.core import config

router = APIRouter(tags=["Events"])
Expand Down Expand Up @@ -40,11 +45,29 @@ async def post_backfill_event(
detail="You need to add a payment method to access this service. Please update your payment details: https://platform.phospho.ai/org/settings/billing",
)

if event_backfill_request.created_at_end is not None:
event_backfill_request.created_at_end = round(
event_backfill_request.created_at_end
)
if event_backfill_request.created_at_start is not None:
event_backfill_request.created_at_start = round(
event_backfill_request.created_at_start
)
filters = ProjectDataFilters(
created_at_start=event_backfill_request.created_at_start,
created_at_end=event_backfill_request.created_at_end,
)
recipe = await get_recipe_from_event_id(
project_id=project_id, event_id=event_backfill_request.event_id
)

background_tasks.add_task(
run_event_detection_on_timeframe,
org_id=org_id,
run_recipe_on_tasks_batched,
project_id=project_id,
event_backfill_request=event_backfill_request,
recipe=recipe,
org_id=org_id,
filters=filters,
sample_rate=event_backfill_request.sample_rate,
)
return {"status": "ok"}

Expand Down
15 changes: 11 additions & 4 deletions backend/app/api/platform/endpoints/organizations.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,24 @@
import stripe
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException, Header, Request
from fastapi import APIRouter, BackgroundTasks, Depends, Header, HTTPException, Request
from loguru import logger
from propelauth_fastapi import User

from app.api.platform.models import (
CreateCheckoutRequest,
Project,
ProjectCreationRequest,
Projects,
)
from app.core import config
from app.security.authentification import propelauth
from app.services.mongo.emails import email_user_onboarding, send_payment_issue_email
from app.services.mongo.projects import populate_default
from app.services.mongo.organizations import (
change_organization_plan,
create_project_by_org,
get_projects_from_org_id,
get_usage_quota,
change_organization_plan,
)
from app.services.mongo.projects import populate_default
from app.services.slack import slack_notification

router = APIRouter(tags=["Organizations"])
Expand Down Expand Up @@ -202,12 +203,14 @@ async def get_org_metadata(
)
async def post_create_checkout_session(
org_id: str,
create_checkout_request: CreateCheckoutRequest,
user: User = Depends(propelauth.require_user),
):
_ = propelauth.require_org_member(user, org_id)
org = propelauth.fetch_org(org_id)
org_metadata = org.get("metadata", {})
org_plan = org_metadata.get("plan", "hobby")

if org_plan == "pro":
# Organization already has a pro plan
return {"error": "Organization already has a pro plan"}
Expand All @@ -218,6 +221,10 @@ async def post_create_checkout_session(
f"Creating checkout session for org {org_id} and user {user.email}"
)

success_url = f"{config.PHOSPHO_FRONTEND_URL}/checkout/thank-you"
if create_checkout_request.project_id:
success_url += f"?project_id={create_checkout_request.project_id}"

checkout_session = stripe.checkout.Session.create(
line_items=[
{
Expand All @@ -227,7 +234,7 @@ async def post_create_checkout_session(
},
],
mode="subscription",
success_url=f"{config.PHOSPHO_FRONTEND_URL}/checkout/thank-you",
success_url=success_url,
cancel_url=f"{config.PHOSPHO_FRONTEND_URL}/checkout/cancel",
customer_email=user.email,
metadata={"org_id": org_id},
Expand Down
50 changes: 50 additions & 0 deletions backend/app/api/platform/endpoints/recipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
from app.services.mongo.recipes import run_recipe_types_on_tasks
from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from propelauth_fastapi import User

from app.api.platform.models import RunRecipeRequest
from app.security.authentification import (
propelauth,
verify_if_propelauth_user_can_access_project,
)
from app.core import config

router = APIRouter(tags=["Recipes"])


@router.post(
"/recipes/{project_id}/run",
response_model=dict,
description="Run multiple recipes on tasks of a project",
)
async def post_run_recipes(
project_id: str,
run_recipe_request: RunRecipeRequest,
background_tasks: BackgroundTasks,
user: User = Depends(propelauth.require_user),
) -> dict:
"""
Run multiple recipes of different types on tasks of a project
"""
org_id = await verify_if_propelauth_user_can_access_project(user, project_id)
org = propelauth.fetch_org(org_id)
org_metadata = org.get("metadata", {})
customer_id = None

if "customer_id" in org_metadata.keys():
customer_id = org_metadata.get("customer_id", None)

if not customer_id and org_id != config.PHOSPHO_ORG_ID:
raise HTTPException(
status_code=402,
detail="You need to add a payment method to access this service. Please update your payment details: https://platform.phospho.ai/org/settings/billing",
)

background_tasks.add_task(
run_recipe_types_on_tasks,
org_id=org_id,
project_id=project_id,
recipe_types=run_recipe_request.recipe_type_list,
filters=run_recipe_request.filters,
)
return {"status": "ok"}
10 changes: 6 additions & 4 deletions backend/app/api/platform/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,20 @@
)

from .abtests import ABTest, ABTests
from .clusters import Cluster, Clustering, ClusteringRequest, Clusterings, Clusters
from .events import EventBackfillRequest
from .explore import (
AggregateMetricsRequest,
DashboardMetricsFilter,
DetectClustersRequest,
EventsMetricsFilter,
ProjectDataFilters,
FetchClustersRequest,
Pagination,
ProjectDataFilters,
QuerySessionsTasksRequest,
DetectClustersRequest,
FetchClustersRequest,
)
from .metadata import MetadataPivotQuery, MetadataPivotResponse, MetadataValueResponse
from .organizations import CreateCheckoutRequest
from .projects import AddEventsQuery, OnboardingSurvey, UploadTasksRequest
from .recipes import RunRecipeRequest
from .tasks import AddEventRequest, RemoveEventRequest
from .clusters import Cluster, Clusters, Clustering, Clusterings, ClusteringRequest
6 changes: 6 additions & 0 deletions backend/app/api/platform/models/organizations.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import Optional
from pydantic import BaseModel


class CreateCheckoutRequest(BaseModel):
project_id: Optional[str] = None
8 changes: 8 additions & 0 deletions backend/app/api/platform/models/recipes.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
from typing import List, Optional
from phospho.models import ProjectDataFilters
from pydantic import BaseModel


class RunRecipeRequest(BaseModel):
recipe_type_list: List[str]
filters: Optional[ProjectDataFilters] = None
2 changes: 2 additions & 0 deletions backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ def check_health():
projects,
sessions,
tasks,
recipes,
onboarding,
)

Expand All @@ -193,6 +194,7 @@ def check_health():
api_platform.include_router(events.router)
api_platform.include_router(explore.router)
api_platform.include_router(metadata.router)
api_platform.include_router(recipes.router)
api_platform.include_router(onboarding.router)

app.mount("/api", api_platform)
Expand Down
72 changes: 1 addition & 71 deletions backend/app/services/mongo/events.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,7 @@
from typing import Dict, List, Optional

from app.api.platform.models import EventBackfillRequest
from app.api.platform.models.explore import Pagination
from app.db.models import EventDefinition, Recipe
from app.db.models import EventDefinition
from app.db.mongo import get_mongo_db
from app.services.mongo.extractor import run_recipe_on_tasks
from app.services.mongo.tasks import get_all_tasks
from app.services.mongo.tasks import get_total_nb_of_tasks
from app.utils import cast_datetime_or_timestamp_to_timestamp
from fastapi import HTTPException
from loguru import logger
Expand Down Expand Up @@ -75,70 +70,6 @@ async def get_event_from_name_and_project_id(
return validated_event


async def get_recipe_by_id(recipe_id: str) -> Recipe:
"""
Get a recipe by its id
"""
mongo_db = await get_mongo_db()
recipe = await mongo_db["recipes"].find_one({"id": recipe_id})
if not recipe:
raise HTTPException(status_code=404, detail="Recipe not found")
validated_recipe = Recipe.model_validate(recipe)
return validated_recipe


async def run_event_detection_on_timeframe(
org_id: str, project_id: str, event_backfill_request: EventBackfillRequest
) -> None:
"""
Run event detection on a given event_id and event_data
"""
event_definition = await get_event_definition_from_event_id(
project_id, event_backfill_request.event_id
)
if event_definition.recipe_id is None:
logger.error(
f"Event {event_definition.event_name} has no recipe_id for project {project_id}. Canceling."
)
return
recipe = await get_recipe_by_id(recipe_id=event_definition.recipe_id)
if event_backfill_request.created_at_end is not None:
event_backfill_request.created_at_end = round(
event_backfill_request.created_at_end
)
if event_backfill_request.created_at_start is not None:
event_backfill_request.created_at_start = round(
event_backfill_request.created_at_start
)

filters = ProjectDataFilters(
created_at_start=event_backfill_request.created_at_start,
created_at_end=event_backfill_request.created_at_end,
)
total_nb_tasks = await get_total_nb_of_tasks(
project_id=project_id,
filters=filters,
)
if event_backfill_request.sample_rate is not None:
sample_size = int(total_nb_tasks * event_backfill_request.sample_rate)
else:
sample_size = total_nb_tasks

# Batch the tasks to avoid memory issues
batch_size = 256
nb_batches = sample_size // batch_size

for i in range(nb_batches + 1):
tasks = await get_all_tasks(
project_id=project_id,
filters=filters,
pagination=Pagination(page=i, per_page=batch_size),
)
await run_recipe_on_tasks(tasks=tasks, recipe=recipe, org_id=org_id)

return None


async def get_all_events(
project_id: str,
limit: Optional[int] = None,
Expand Down Expand Up @@ -202,7 +133,6 @@ async def get_all_events(
async def confirm_event(
project_id: str,
event_id: str,
event_source: str = "owner",
) -> Event:
mongo_db = await get_mongo_db()
# Get the event
Expand Down
Loading

0 comments on commit 8ee66b2

Please sign in to comment.