Skip to content

Commit

Permalink
Merge pull request #557 from phospho-app/dev
Browse files Browse the repository at this point in the history
feat: import from langfuse
  • Loading branch information
oulianov committed Jun 24, 2024
2 parents 8fde400 + 2caebc6 commit 5c5ebdc
Show file tree
Hide file tree
Showing 13 changed files with 695 additions and 35 deletions.
55 changes: 53 additions & 2 deletions backend/app/api/platform/endpoints/projects.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
search_sessions_in_project,
)

from app.services.mongo.extractor import collect_langsmith_data
from app.services.mongo.extractor import collect_langsmith_data, collect_langfuse_data
from app.security.authorization import get_quota
from app.core import config

Expand Down Expand Up @@ -503,7 +503,7 @@ async def connect_langsmith(
_ = [run for run in runs]
except Exception as e:
raise HTTPException(
status_code=400, detail=f"Error: Could not connect to Langsmith. {e}"
status_code=400, detail=f"Error: Could not connect to Langsmith: {e}"
)

org_plan = await get_quota(project_id)
Expand All @@ -519,3 +519,54 @@ async def connect_langsmith(
max_usage=max_usage,
)
return {"status": "ok", "message": "Langsmith connected successfully."}


@router.post(
"/projects/{project_id}/connect-langfuse",
response_model=dict,
)
async def connect_langfuse(
project_id: str,
credentials: dict,
background_tasks: BackgroundTasks,
user: User = Depends(propelauth.require_user),
) -> dict:
"""
Import data from Langfuse to a Phospho project
"""
project = await get_project_by_id(project_id)
propelauth.require_org_member(user, project.org_id)

logger.debug(f"Connecting LangFuse to project {project_id}")

try:
# This snippet is used to test the connection with Langsmith and verify the API key/project name
from langfuse import Langfuse

langfuse = Langfuse(
public_key=credentials["langfuse_public_key"],
secret_key=credentials["langfuse_secret_key"],
host="https://cloud.langfuse.com",
)
langfuse.auth_check()
langfuse.shutdown()
logger.debug("LangFuse connected successfully.")

except Exception as e:
raise HTTPException(
status_code=400, detail=f"Error: Could not connect to LangFuse: {e}"
)

org_plan = await get_quota(project_id)
current_usage = org_plan.get("current_usage", 0)
max_usage = org_plan.get("max_usage", config.PLAN_HOBBY_MAX_NB_DETECTIONS)

background_tasks.add_task(
collect_langfuse_data,
project_id=project_id,
org_id=project.org_id,
langfuse_credentials=credentials,
current_usage=current_usage,
max_usage=max_usage,
)
return {"status": "ok", "message": "LangFuse connected successfully."}
35 changes: 32 additions & 3 deletions backend/app/api/v2/endpoints/cron.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,22 @@
from fastapi import APIRouter, BackgroundTasks

from loguru import logger
from app.services.mongo.extractor import collect_langsmith_data
from app.services.mongo.extractor import collect_langsmith_data, collect_langfuse_data
from app.security.authorization import get_quota
from app.core import config
from app.services.mongo.cron import fetch_projects_to_sync
from app.services.mongo.cron import fetch_and_decrypt_langsmith_credentials
from app.services.mongo.cron import (
fetch_and_decrypt_langsmith_credentials,
fetch_and_decrypt_langfuse_credentials,
)

router = APIRouter(tags=["cron"])


async def run_langsmith_sync_pipeline():
logger.debug("Running Langsmith synchronisation pipeline")

projects_ids = await fetch_projects_to_sync()
projects_ids = await fetch_projects_to_sync(type="langsmith")

for project_id in projects_ids:
langsmith_credentials = await fetch_and_decrypt_langsmith_credentials(
Expand All @@ -36,3 +39,29 @@ async def run_langsmith_sync_pipeline():
)

return {"status": "ok", "message": "Pipeline ran successfully"}


async def run_langfuse_sync_pipeline():
logger.debug("Running Langfuse synchronisation pipeline")

projects_ids = await fetch_projects_to_sync(type="langfuse")

for project_id in projects_ids:
langfuse_credentials = await fetch_and_decrypt_langfuse_credentials(project_id)

org_plan = await get_quota(project_id)
current_usage = org_plan.get("current_usage", 0)
max_usage = org_plan.get("max_usage", config.PLAN_HOBBY_MAX_NB_DETECTIONS)

background_tasks = BackgroundTasks()

background_tasks.add_task(
collect_langfuse_data,
project_id,
org_plan["org_id"],
langfuse_credentials,
current_usage,
max_usage,
)

return {"status": "ok", "message": "Pipeline ran successfully"}
6 changes: 5 additions & 1 deletion backend/app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,10 @@
from app.db.qdrant import close_qdrant, init_qdrant
from app.services.mongo.extractor import check_health
from app.services.mongo.ai_hub import check_health_ai_hub
from app.api.v2.endpoints.cron import run_langsmith_sync_pipeline
from app.api.v2.endpoints.cron import (
run_langsmith_sync_pipeline,
run_langfuse_sync_pipeline,
)

from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.jobstores.memory import MemoryJobStore
Expand Down Expand Up @@ -213,3 +216,4 @@ def check_health():
@scheduler.scheduled_job("interval", seconds=3600)
async def run_cron_job():
await run_langsmith_sync_pipeline()
await run_langfuse_sync_pipeline()
82 changes: 80 additions & 2 deletions backend/app/services/mongo/cron.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,12 @@
from Crypto import Random


async def fetch_projects_to_sync():
async def fetch_projects_to_sync(type: str = "langsmith"):
"""
Fetch the project ids to sync
"""
mongo_db = await get_mongo_db()
project_ids = await mongo_db["keys"].distinct("project_id")
project_ids = await mongo_db["keys"].distinct("project_id", {"type": type})
return project_ids


Expand Down Expand Up @@ -84,9 +84,87 @@ async def encrypt_and_store_langsmith_credentials(
{"project_id": project_id},
{
"$set": {
"type": "langsmith",
"langsmith_api_key": base64.b64encode(data).decode("latin-1"),
"langsmith_project_name": langsmith_project_name,
},
},
upsert=True,
)


async def fetch_and_decrypt_langfuse_credentials(
project_id: str,
):
"""
Fetch and decrypt the Langfuse credentials from the database
"""

mongo_db = await get_mongo_db()
encryption_key = os.getenv("EXTRACTOR_SECRET_KEY")

# Fetch the encrypted credentials from the database
credentials = await mongo_db["keys"].find_one(
{"project_id": project_id},
)

# Decrypt the credentials
source = base64.b64decode(credentials["langfuse_secret_key"].encode("latin-1"))

key = SHA256.new(
encryption_key.encode("utf-8")
).digest() # use SHA-256 over our key to get a proper-sized AES key
IV = source[: AES.block_size] # extract the IV from the beginning
decryptor = AES.new(key, AES.MODE_CBC, IV)
data = decryptor.decrypt(source[AES.block_size :]) # decrypt the data
padding = data[-1] # extract the padding length
logger.debug(
f"Decrypted Langfuse credentials for project : {data[:-padding].decode('utf-8')}"
)
return {
"langfuse_secret_key": data[:-padding].decode("utf-8"),
"langfuse_public_key": credentials["langfuse_public_key"],
}


async def encrypt_and_store_langfuse_credentials(
project_id: str,
langfuse_secret_key: str,
langfuse_public_key: str,
):
"""
Store the encrypted Langsmith credentials in the database
"""

mongo_db = await get_mongo_db()

encryption_key = os.getenv("EXTRACTOR_SECRET_KEY")
api_key_as_bytes = langfuse_secret_key.encode("utf-8")

# Encrypt the credentials
key = SHA256.new(
encryption_key.encode("utf-8")
).digest() # use SHA-256 over our key to get a proper-sized AES key

IV = Random.new().read(AES.block_size) # generate IV
encryptor = AES.new(key, AES.MODE_CBC, IV)
padding = (
AES.block_size - len(api_key_as_bytes) % AES.block_size
) # calculate needed padding
api_key_as_bytes += bytes([padding]) * padding
data = IV + encryptor.encrypt(
api_key_as_bytes
) # store the IV at the beginning and encrypt

# Store the encrypted credentials in the database
await mongo_db["keys"].update_one(
{"project_id": project_id},
{
"$set": {
"type": "langfuse",
"langfuse_secret_key": base64.b64encode(data).decode("latin-1"),
"langfuse_public_key": langfuse_public_key,
},
},
upsert=True,
)
40 changes: 38 additions & 2 deletions backend/app/services/mongo/extractor.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,42 @@ async def collect_langsmith_data(
)

except Exception as e:
error_id = generate_uuid()
error_message = f"Caught error while collecting langsmith data (error_id: {error_id}): {e}\n{traceback.format_exception(e)}"
error_message = f"Caught error while collecting langsmith data: {e}\n{traceback.format_exception(e)}"
logger.error(error_message)


async def collect_langfuse_data(
project_id: str,
org_id: str,
langfuse_credentials: dict,
current_usage: int,
max_usage: int,
):
async with httpx.AsyncClient() as client:
logger.debug(
f"Calling the extractor API for collecting langfuse data: {config.EXTRACTOR_URL}/v1/pipelines/langfuse"
)
try:
response = await client.post(
f"{config.EXTRACTOR_URL}/v1/pipelines/langfuse", # WARNING: hardcoded API version
json={
"langfuse_credentials": langfuse_credentials,
"project_id": project_id,
"org_id": org_id,
"current_usage": current_usage,
"max_usage": max_usage,
},
headers={
"Authorization": f"Bear {config.EXTRACTOR_SECRET_KEY}",
"Content-Type": "application/json",
},
timeout=60,
)
if response.status_code != 200:
logger.error(
f"Error returned when collecting langfuse data (status code: {response.status_code}): {response.text}"
)

except Exception as e:
error_message = f"Caught error while collecting langfuse data: {e}\n{traceback.format_exception(e)}"
logger.error(error_message)
34 changes: 29 additions & 5 deletions backend/poetry.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions backend/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ langsmith = "^0.1.63"
apscheduler = "^3.10.4"
pycryptodome = "^3.20.0"
xlsxwriter = "^3.2.0"
langfuse = "^2.36.2"

[tool.poetry.group.dev]
optional = true
Expand Down
Loading

0 comments on commit 5c5ebdc

Please sign in to comment.