In [2]:
import httpx

In [3]:
LANGFUSE_PUBLIC_KEY = "pk-lf-f6ea7cfd-fb41-4ee5-813e-8152763a5762"
LANGFUSE_SECRET_KEY = "sk-lf-ff29c0d1-50fd-413b-bfb1-136e4782e9d3"

## Get Specific Item by ID

In [4]:
# Function to get a specific item from an annotation queue
async def get_queue_item(queue_id, item_id):
    """
    Get a specific item from an annotation queue.
    
    Args:
        queue_id (str): The unique identifier of the annotation queue
        item_id (str): The unique identifier of the annotation queue item
        
    Returns:
        dict: Item details including:
            - id: Item ID
            - queueId: Queue ID
            - objectId: ID of the object being annotated
            - objectType: Type of object (e.g., "TRACE")
            - status: Status (e.g., "PENDING", "COMPLETED")
            - completedAt: Completion timestamp (if completed)
            - createdAt: Creation timestamp
            - updatedAt: Last update timestamp
    """
    import base64
    
    auth_string = f"{LANGFUSE_PUBLIC_KEY}:{LANGFUSE_SECRET_KEY}"
    auth_bytes = auth_string.encode('ascii')
    auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
    
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://us.cloud.langfuse.com/api/public/annotation-queues/{queue_id}/items/{item_id}",
            headers={
                "Authorization": f"Basic {auth_b64}"
            }
        )
        return response.json()

# Example: Get a specific item from the queue
queue_id = "cmccb2ym402tpad07asml1mb7"  # test - vishal queue
item_id = "cmel01iqn08szad08a89nek6f"

item_details = await get_queue_item(queue_id, item_id)

print("Specific Item Details:")
print(f"  Item ID: {item_details['id']}")
print(f"  Queue ID: {item_details['queueId']}")
print(f"  Object ID: {item_details['objectId']}")
print(f"  Object Type: {item_details['objectType']}")
print(f"  Status: {item_details['status']}")
print(f"  Created: {item_details['createdAt']}")
print(f"  Updated: {item_details['updatedAt']}")
if item_details.get('completedAt'):
    print(f"  Completed: {item_details['completedAt']}")


Specific Item Details:
  Item ID: cmel01iqn08szad08a89nek6f
  Queue ID: cmccb2ym402tpad07asml1mb7
  Object ID: 3fbb955f-94d3-43ec-80a9-d893e761c6ab
  Object Type: SESSION
  Status: PENDING
  Created: 2025-08-21T06:07:03.454Z
  Updated: 2025-08-21T06:07:03.454Z


In [7]:
async def get_session_by_id(session_id):
    """
    Get a session by its ID from Langfuse API.
    
    Args:
        session_id (str): The unique identifier of the session
        
    Returns:
        dict: Session details including traces and metadata
    """
    import base64
    
    auth_string = f"{LANGFUSE_PUBLIC_KEY}:{LANGFUSE_SECRET_KEY}"
    auth_bytes = auth_string.encode('ascii')
    auth_b64 = base64.b64encode(auth_bytes).decode('ascii')
    
    async with httpx.AsyncClient() as client:
        response = await client.get(
            f"https://us.cloud.langfuse.com/api/public/sessions/{session_id}",
            headers={
                "Authorization": f"Basic {auth_b64}"
            }
        )
        if response.status_code == 200:
            return response.json()
        else:
            return {"error": f"Failed to fetch session: {response.status_code}", "details": response.text}

# Example: Fetch session data
session_id = "090cc1ff-a8c0-482d-af7a-64109ae09d57"  # From the queue item above
session_data = await get_session_by_id(session_id)

print("Session Data:")
if "error" in session_data:
    print(f"Error: {session_data['error']}")
    print(f"Details: {session_data['details']}")
else:
    print(f"  Session ID: {session_data['id']}")
    print(f"  Created At: {session_data['createdAt']}")
    print(f"  Project ID: {session_data['projectId']}")
    print(f"  Environment: {session_data.get('environment', 'None')}")
    print(f"  Number of traces: {len(session_data.get('traces', []))}")
    
    if session_data.get('traces'):
        print("\n  Traces:")
        for i, trace in enumerate(session_data['traces']):  # Show first 3 traces
            print(f"    Trace {i+1}:")
            print(f"      ID: {trace['id']}")
            print(f"      Timestamp: {trace['timestamp']}")
            print(f"      Name: {trace.get('name', 'None')}")
            print(f"      Input: {trace.get('input', 'None')}")
            print(f"      Output: {trace.get('output', 'None')}")
        
        if len(session_data['traces']) > 3:
            print(f"    ... and {len(session_data['traces']) - 3} more traces")

Session Data:
  Session ID: 090cc1ff-a8c0-482d-af7a-64109ae09d57
  Created At: 2025-08-28T21:13:55.124Z
  Project ID: cmaxr5f6h002bad07s9drmhxz
  Environment: default
  Number of traces: 8

  Traces:
    Trace 1:
      ID: 16b0a4cc-2082-4479-8fd0-8f4caa0d1ea3
      Timestamp: 2025-09-04T01:53:59.075Z
      Name: LangGraph
      Input: {'messages': [{'content': 'Can you rephrase it so it looks like it was written by a lawyer?', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': 'c88cc65a-dbfe-4c6d-a5dd-f694b9780f5e', 'example': False}]}
      Output: {'messages': [{'content': 'my neighbor is bothering me', 'additional_kwargs': {}, 'response_metadata': {}, 'type': 'human', 'name': None, 'id': '717d4a2d-a027-4dae-a56c-259f11303b3e', 'example': False}, {'content': [{'text': "I'm sorry to hear your neighbor is bothering you. That can be really stressful when it's happening where you live.\n\nCan you tell me what specifically your neighbor is doing that's 

In [None]:
# Sample Trace data
trace

{'id': '14999254-f3a0-4904-878b-2e7825c79bf7',
 'projectId': 'cmaxr5f6h002bad07s9drmhxz',
 'name': 'LangGraph',
 'timestamp': '2025-08-28T21:13:46.707Z',
 'environment': 'default',
 'tags': [],
 'bookmarked': False,
 'release': None,
 'version': None,
 'userId': '1e62abdf-37e9-4808-897c-ff6fb7fe5413',
 'sessionId': '090cc1ff-a8c0-482d-af7a-64109ae09d57',
 'public': False,
 'input': {'messages': [{'content': 'my neighbor is bothering me',
    'additional_kwargs': {},
    'response_metadata': {},
    'type': 'human',
    'name': None,
    'id': '717d4a2d-a027-4dae-a56c-259f11303b3e',
    'example': False}]},
 'output': {'messages': [{'content': 'my neighbor is bothering me',
    'additional_kwargs': {},
    'response_metadata': {},
    'type': 'human',
    'name': None,
    'id': '717d4a2d-a027-4dae-a56c-259f11303b3e',
    'example': False},
   {'content': [{'text': "I'm sorry to hear your neighbor is bothering you. That can be really stressful when it's happening where you live.\n\nCan 