In [None]:
import os
import pandas as pd
import datetime as dt
from typing import List, Dict
from langfuse import Langfuse
import requests
from requests.auth import HTTPBasicAuth

from dateutil import parser
import pytz
from src.blob_storage import BlobStorageHandler
import tempfile
import glob
import logging

from dotenv import load_dotenv
load_dotenv()


logger = logging.getLogger()
logger.setLevel(logging.DEBUG)

# Clear existing handlers to avoid duplicate logs
if logger.hasHandlers():
    logger.handlers.clear()

# Add a handler to display log in notebook directly
handler = logging.StreamHandler()
handler.setLevel(logging.INFO)

formatter = logging.Formatter('%(asctime)s - %(name)s - %(levelname)s - %(message)s')
handler.setFormatter(formatter)

logger.addHandler(handler)

# helper function
def pydantic_list_to_dataframe(pydantic_list):
    """
    Convert a list of pydantic objects to a pandas dataframe.
    """
    data = []
    for item in pydantic_list:
        data.append(item.dict())
    return pd.DataFrame(data)


def convert_to_utc(date_str: str, date_type: str, local_tz=dt.timezone(dt.timedelta(hours=9))):
    local_date = dt.datetime.strptime(date_str, "%Y-%m-%d")  # Convert string to datetime
    local_date = dt.datetime(local_date.year, local_date.month, local_date.day, 
                                0, 0, 0, 0, tzinfo=local_tz)  # Set time to 00:00:00
    if date_type == 'to':
        local_date = local_date - dt.timedelta(microseconds=1) # End of yesterday
    return local_date.astimezone(dt.timezone.utc)  # Convert to UTC

def get_timestamps(from_timestamp=None, to_timestamp=None, local_tz=dt.timezone(dt.timedelta(hours=9))):

    if not from_timestamp:
        from_timestamp = dt.datetime.strftime(dt.datetime.now() - dt.timedelta(days=1), "%Y-%m-%d")
    from_timestamp = convert_to_utc(from_timestamp, date_type='from', local_tz=local_tz)

    if not to_timestamp:
        to_timestamp = dt.datetime.strftime(dt.datetime.now(), "%Y-%m-%d")
    else:
        to_timestamp = dt.datetime.strftime(dt.datetime.strptime(to_timestamp, "%Y-%m-%d") + dt.timedelta(days=1), "%Y-%m-%d")
    to_timestamp = convert_to_utc(to_timestamp, date_type='to', local_tz=local_tz)

    return from_timestamp, to_timestamp

def expend_metadata(df, metadata_keys: List[str] | None = None):
        # Select specific keys to extract
        # metadata_keys = ['channel', 'user_email', 'department_name', 'azure_user_id', 'question_uuid', 'costs']

        if metadata_keys is None or metadata_keys == []:
            # Convert None/NaN to an empty dictionary before expanding
            df_expanded = df['metadata'].apply(lambda x: pd.Series(x) if isinstance(x, dict) else pd.Series(dtype='object'))

            # Concatenate original DataFrame (excluding 'metadata') with expanded columns
            df = pd.concat([df, df_expanded], axis=1)
        else:
            # Create new columns with selected keys
            # df['metadata'].values[0]
            for key in metadata_keys:
                # if key not in 
                df[key] = df['metadata'].apply(lambda x: x.get(key, None) if isinstance(x, dict) else None)  # Check if x is a dict

        # Drop the original 'metadata' column if not needed
        df = df.drop(columns=['metadata'])
        return df

def convert_to_jst(utc_dt):
    # Original datetime in UTC
    # utc_time = dt.datetime.fromisoformat('2025-03-15 15:00:00+00:00')

    # Convert to JST (UTC+9)
    jst_time = utc_dt.astimezone(dt.timezone(dt.timedelta(hours=9)))

    return jst_time

def datetime_to_iso8601(org_dt: dt.datetime) -> str:
    iso_str = org_dt.replace(microsecond=0).isoformat().replace("+00:00", "Z")
    return iso_str



class LangfuseAPI(object):
    def __init__(self, public_key: str, secret_key: str, host: str):
        self.host = host
        self.public_key = public_key
        self.secret_key = secret_key
        self.langfuse = Langfuse(public_key = public_key, secret_key = secret_key, host = host)

    def fetch_traces(
        self, 
        limit: int = 100, 
        page: int = 1, 
        user_id: str | None = None, 
        from_timestamp: str | None = None, 
        to_timestamp: str | None =None, 
        max_traces: int | None = None,
        metadata_keys: List[str] | None = None  # default keys to extract
    ):

        from_timestamp, to_timestamp = get_timestamps(from_timestamp, to_timestamp)
        logger.info(f"Collect data from JST {convert_to_jst(from_timestamp)} to {convert_to_jst(to_timestamp)}")
        # logger.debug(f"Collect data from uTC {from_timestamp} to {to_timestamp}")
        all_traces = []
        while True:
            # logger.debug(f"Page#{page}")
            traces = self.langfuse.fetch_traces(limit=limit, page=page, user_id=user_id, from_timestamp=from_timestamp, to_timestamp=to_timestamp)
            all_traces.extend(traces.data)
            
            if len(traces.data) == 0:
                logger.info("No more traces found.")
                break

            if page % 10 == 0:
                logger.info(f"Page#{page}, {traces.data[0].timestamp} ~ {traces.data[-1].timestamp}")

            if len(traces.data) < limit:
                logger.info(f"All traces have been collected as the number of records is less than limit {limit} per page.")
                logger.info(f"Page#{page}, {traces.data[0].timestamp} ~ {traces.data[-1].timestamp}")
                break
            
            if max_traces is not None and len(all_traces) >= max_traces:
                logger.info(f"The number of traces has been collected exceeds the total limit {max_traces}")
                break
            
            page += 1
        
        logger.info(f"Retrieved {len(all_traces)} traces.")
        all_traces_df = pydantic_list_to_dataframe(all_traces)

        if len(all_traces) > 0 and 'metadata' in all_traces_df.columns:
            all_traces_df = expend_metadata(all_traces_df, metadata_keys)
        return all_traces_df
    def fetch_observations(
        self, 
        limit: int = 100, 
        page: int = 1, 
        user_id: str | None = None, 
        from_timestamp: str | None = None, 
        to_timestamp: str | None =None, 
        max_traces: int | None = None,
        metadata_keys: List[str] | None = None  # default keys to extract
    ):

        from_timestamp, to_timestamp = get_timestamps(from_timestamp, to_timestamp)
        logger.info(f"Collect data from JST {convert_to_jst(from_timestamp)} to {convert_to_jst(to_timestamp)}")
        # logger.debug(f"Collect data from uTC {from_timestamp} to {to_timestamp}")
        all_data = []
        while True:
            # logger.debug(f"Page#{page}")
            obs = self.langfuse.fetch_observations(limit=limit, page=page, user_id=user_id, from_start_time=from_timestamp, to_start_time=to_timestamp)
            all_data.extend(obs.data)
            #print(obs.data)

            if len(obs.data) == 0:
                logger.info("No more data found.")
                break
            
            if page % 10 == 0:
                logger.info(f"Page#{page}, {obs.data[0].start_time} ~ {obs.data[-1].start_time}")

            if len(obs.data) < limit:
                logger.info(f"All data have been collected as the number of records is less than limit {limit} per page.")
                logger.info(f"Page#{page}, {obs.data[0].start_time} ~ {obs.data[-1].start_time}")
                break
            
            if max_traces is not None and len(all_data) >= max_traces:
                logger.info(f"The number of data has been collected exceeds the total limit {max_traces}")
                break
            
            page += 1
        
        logger.info(f"Retrieved {len(all_data)} data.")
        all_df = pydantic_list_to_dataframe(all_data)

        if len(all_data) > 0 and 'metadata' in all_df.columns:
            all_df = expend_metadata(all_df, metadata_keys)
        return all_df
    def fetch_scores(
        self, 
        limit: int = 100, 
        page: int = 1, 
        user_id: str | None = None, 
        from_timestamp: str | None = None, 
        to_timestamp: str | None =None, 
        max_traces: int | None = None,
    ):
        from_timestamp, to_timestamp = get_timestamps(from_timestamp, to_timestamp)
        logger.info(f"Collect data from JST {convert_to_jst(from_timestamp)} to {convert_to_jst(to_timestamp)}")
        # logger.debug(f"Collect data from uTC {from_timestamp} to {to_timestamp}")
        all_scores = []
        while True:
            logger.info(f"Page#{page}")
            
            # Define parameters
            params = {
                "page": page,
                "limit": limit,
                "userId": user_id,
                "fromTimestamp": datetime_to_iso8601(from_timestamp),  # ISO 8601 format
                "toTimestamp": datetime_to_iso8601(to_timestamp)
            }

            # Example: Fetch scores
            response = requests.get(
                f"{self.host}/api/public/scores",
                auth=HTTPBasicAuth(self.public_key, self.secret_key),
                params=params
            )

            if response.status_code == 200:
                data = response.json().get("data", [])
                all_scores.extend(data)
                
                if len(data) < limit:
                    logger.info("All data have been collected as the number of records is less than limit per page.")
                    break
                
                if max_traces is not None and len(all_scores) >= max_traces:
                    logger.info("The number of data has been collected exceeds the total limit")
                    break
                
            else:
                logger.error(f"Error fetching scores: {response.status_code} - {response.text}")
                break
            page += 1
        score_df = pd.DataFrame(all_scores)
        return score_df
    



function_names_mapping = {
    "meeting_minutes": ["minutes"],
    "jera_knowledge": ["jera_knowledge"],
    "file_difference": ["file_diff"],
    "presentation_generator": ["ppt_generator"],
    "summarize": ["summarize"],
    "chat_with_image": ["image_to_text"],
    "docx_translator": ["docx_file_translator"],
    "pptx_translator": ["pptx_file_translator"],
    "pdf_translator": ["pdf_file_translator"],
    "txt_translator": ["txt_file_translator"],
    "chat_with_file": ["rag"],
    "speech_generator": ["speech_draft_generator"],
    "meeting_setup": ["meeting_setup"],
    "text_translator": ["text_translator", "transText_extractor"],
    "simple_web_search":["simple_web_searcher"],
    "research_report":["research_report"],
    "outlook_redirect": ["outlook_redirect"],
    "outlook_triggered": ["outlook_adaptive_card_triggered"],
}


def upload_to_blob(project:str, container_name, fpath: str, overwrite: bool = False):
    conn_str = os.environ[f"{project}_CONNECTION_STRING_PROD"]
    bsh = BlobStorageHandler(conn_str=conn_str, container_name=container_name)

    file_name = os.path.basename(fpath)
    to_file_path = os.path.join(file_name)

    if bsh.blob_exists(to_file_path) and not overwrite:
        print(f"`{to_file_path}` already exists in {container_name}.")
    else:
        bsh.upload_file(fpath, to_file_path, overwrite)
        print(f"`{to_file_path}` uploaded to blob {container_name}.")

def convert_to_local(utc_time: str, formatter: str = "%Y-%m-%d %H:%M:%S"):
    utc_time = dt.datetime.strptime(utc_time[:19], "%Y-%m-%dT%H:%M:%S")
    # Set the timezone to UTC for the datetime object
    utc_time = utc_time.replace(tzinfo=pytz.UTC)
    # Convert to Japan Standard Time (JST)
    jst_time = utc_time.astimezone(pytz.timezone("Asia/Tokyo"))
    jst_time = dt.datetime.strftime(jst_time, formatter)

    return jst_time

def format_datetime(val):
    try:
        dt = parser.isoparse(val)
        return dt.strftime('%Y-%m-%dT%H:%M:%S.%f')[:-3] + 'Z'
    except Exception as e:
        return val  # or raise, log, or return None if desired

def add_new_name_by_quid(qid_to_new_names: dict, qid: str, new_name: str):
    if qid not in qid_to_new_names:
        qid_to_new_names[qid] = new_name
    else:
        logger.info(f"Question ID: {qid} already exists, the new function name is {qid_to_new_names[qid]}")
    return qid_to_new_names

def rename_funtion_name(df):
    df = df.copy()

    qid_to_new_fname = {}
    qid_col_name = "question_uuid"
    qid_to_org_fnames = dict(df[["name", qid_col_name]].groupby(qid_col_name)['name'].apply(lambda x: sorted(set(x))).reset_index().values)
    qid_to_traceid =  dict(df[["id", qid_col_name]].groupby([qid_col_name])['id'].apply(list).reset_index().values)

    for qid, org_fn in qid_to_org_fnames.items():
        channel = org_fn[0].split("_")[0]
        org_f_names = ["_".join(i.split("_")[1:]) for i in org_fn]
        
        # function names
        find_agent = False
        for new_f_name, mpping_org_f_names in function_names_mapping.items():
            if len([org_fn for org_fn in org_f_names if org_fn in mpping_org_f_names]) > 0:
                find_agent = True
                add_new_name_by_quid(qid_to_new_fname, qid, new_f_name)
                break

        # sob function
        if not find_agent:
            if org_f_names == ['language_detection']:
                find_agent = True
                add_new_name_by_quid(qid_to_new_fname, qid, 'other')
            else:
                find_agent = True
                # if org_f_names == ['sob'] or org_f_names == ['is_outlook_meeting']:
                add_new_name_by_quid(qid_to_new_fname, qid, 'sob')

        if not find_agent:
            logger.info(f"channel: {channel}, Question ID: {qid}, Function Names: {org_f_names}, qid_to_traceid: {qid_to_traceid[qid]}")

    df['agent_name'] = df['question_uuid'].astype(str).map(qid_to_new_fname)
    return df

def add_whisper_cost(trace_df, obs_df):
    trace_df = trace_df.copy()
    obs_df = obs_df.copy()
    trace_mm = trace_df[trace_df["agent_name"] =="meeting_minutes"][[
        "id", "question_uuid", "audio_length", "timestamp"]].dropna().drop_duplicates(["question_uuid", "audio_length"])

    data = []
    for i in range(len(trace_mm)):
        obs_id = trace_mm["id"].values[i] + "-tts"
        if obs_id in set(obs_df["id"]):
            print(f"{obs_id} already exists")
        else:
            tmp = {
                "id": trace_mm["id"].values[i] + "-tts",
                "name": "AgentExecutor",
                "startTime": trace_mm["timestamp"].values[i],
                "endTime": trace_mm["timestamp"].values[i],
                "parentObservationId": None,
                "type": "TTS",
                "model": "whisper",
                "completionTokens": 0,
                "promptTokens": 0,
                "totalTokens": 0,
                "version": None,
                "traceId": trace_mm["id"].values[i],
                "totalCost": 0.006*trace_mm["audio_length"].values[i]/(60000),
                "input": None,
                "date": convert_to_local(trace_mm["timestamp"].values[i])[:10]
            }
            data.append(tmp)
            logger.debug(tmp)
    tmp = pd.DataFrame(data)
    
    obs_df = pd.concat([obs_df, tmp], ignore_index=True).sort_values("startTime", ascending=False)
    return obs_df


def add_specific_datetime_col(df, datetime_col:str, format: str = '%Y-%m', new_col: str = 'year_month'):
    df = df.copy()

    df[datetime_col] = pd.to_datetime(df[datetime_col], format='mixed', utc=True)
    # 3. Convert to JST (UTC+9)
    df[datetime_col] = df[datetime_col].dt.tz_convert('Asia/Tokyo')

    # 4. Create year-month column
    df[new_col] = df[datetime_col].dt.strftime(format)

    # Step 4: Convert JST timestamp to ISO format string (optional: overwrite or add new column)
    df[datetime_col] = df[datetime_col].dt.strftime('%Y-%m-%dT%H:%M:%S.%f%z')
    
    return df


def load_from_blob(project:str, container_name, from_file_path: str):
    conn_str = os.environ[f"{project}_CONNECTION_STRING_PROD"]
    bsh = BlobStorageHandler(conn_str=conn_str, container_name=container_name)

    if not bsh.blob_exists(from_file_path):
        logger.info(f"`{from_file_path}` does not exist in {container_name}.")
        return None
    else:
        # Create a temporary directory
        with tempfile.TemporaryDirectory() as tmpdir:
            file_name = from_file_path.split("/")[-1]
            to_file_path = os.path.join(tmpdir, file_name)
            bsh.download_file(from_file_path, to_file_path)
            logger.info(f"save data to temporary place`{to_file_path}`.")
            df = pd.read_csv(to_file_path)
            return df

In [2]:
LANGFUSE_PUBLIC_KEY = "pk-lf-8181225a-ebfd-43bb-9d7e-e0087597db6c"
LANGFUSE_SECRET_KEY = "sk-lf-0ac2ca59-6836-45fd-8471-93d9c2b70d48"
LANGFUSE_HOSTNAME = "https://langfuse.dev.jera-stg.com"
langfuseapi = LangfuseAPI(public_key=LANGFUSE_PUBLIC_KEY, secret_key=LANGFUSE_SECRET_KEY, host=LANGFUSE_HOSTNAME)

## OBS

In [15]:
limit = 100
page = 1
max_traces = None
user_id = None
metadata_keys = None

dts = [
    "2025-04-14"
]

for target_day in dts:
    logger.info(f"Fetch data on {target_day}")
    obs_df = langfuseapi.fetch_observations(
        limit=limit, 
        page=page, 
        user_id=user_id, 
        from_timestamp=target_day, 
        to_timestamp=target_day,
        metadata_keys=metadata_keys
    )
    obs_df = obs_df[["id","name","startTime","endTime","parentObservationId","type","model","completionTokens","promptTokens","version","traceId","input",'calculatedInputCost', 'calculatedOutputCost', 'calculatedTotalCost']]
    obs_df = obs_df[obs_df["calculatedTotalCost"] > 0.0]
    obs_df.to_csv(f"data/yui/obs/{target_day}.csv", index=False, quoting=1)

2025-04-18 10:36:07,601 - root - INFO - Fetch data on 2025-04-14
2025-04-18 10:36:07,603 - root - INFO - Collect data from JST 2025-04-14 00:00:00+09:00 to 2025-04-14 23:59:59.999999+09:00
2025-04-18 10:36:08,574 - root - INFO - Page#10, 2025-04-14 12:01:33.996000+00:00 ~ 2025-04-14 12:01:23.220000+00:00
2025-04-18 10:36:09,332 - root - INFO - Page#20, 2025-04-14 10:27:28.706000+00:00 ~ 2025-04-14 09:56:33.709000+00:00
2025-04-18 10:36:10,158 - root - INFO - Page#30, 2025-04-14 08:44:44.338000+00:00 ~ 2025-04-14 08:34:16.379000+00:00
2025-04-18 10:36:11,175 - root - INFO - Page#40, 2025-04-14 07:41:47.543000+00:00 ~ 2025-04-14 07:34:56.561000+00:00
2025-04-18 10:36:12,117 - root - INFO - Page#50, 2025-04-14 06:38:11.667000+00:00 ~ 2025-04-14 06:35:20.058000+00:00
2025-04-18 10:36:13,158 - root - INFO - Page#60, 2025-04-14 05:52:08.667000+00:00 ~ 2025-04-14 05:51:16.415000+00:00
2025-04-18 10:36:14,094 - root - INFO - Page#70, 2025-04-14 05:09:51.585000+00:00 ~ 2025-04-14 05:09:38.74400

## score

In [14]:
limit = 100
page = 1
max_traces = None
user_id = None

dts = [
    "2025-04-14"
]

for target_day in dts:
    logger.info(f"Fetch data on {target_day}")
    score_df = langfuseapi.fetch_scores(
        limit=limit, 
        page=page, 
        user_id=user_id, 
        from_timestamp=target_day, 
        to_timestamp=target_day
    )
    
    if not score_df.empty:
        score_df = score_df[["id","timestamp","name","value","comment","traceId","observationId","trace"]]
        score_df.to_csv(f"data/yui/score/{target_day}.csv", index=False, quoting=1)

2025-04-18 10:35:30,851 - root - INFO - Fetch data on 2025-04-14
2025-04-18 10:35:30,853 - root - INFO - Collect data from JST 2025-04-14 00:00:00+09:00 to 2025-04-14 23:59:59.999999+09:00
2025-04-18 10:35:30,855 - root - INFO - Page#1
2025-04-18 10:35:31,032 - root - INFO - All data have been collected as the number of records is less than limit per page.


In [9]:
score_df

Unnamed: 0,id,timestamp,name,value,comment,traceId,observationId,trace
0,36b9fa41-f35d-4631-ab99-b8b7b07054a2,2025-04-15T08:27:59.669Z,小山 舞(Mai Koyama),-1,Type: その他 - PCの故障交換をお願いいたします。,5229a702-228b-4cc4-b8da-74e9cb009e78,,"{'userId': '小山 舞(Mai Koyama)', 'tags': []}"
1,bbe83222-17a8-4ec1-af20-f28cf60024b9,2025-04-15T03:25:43.938Z,伊藤 和也(Kazuya Ito),1,LIKE,18199780-0cbb-46b4-96e8-e073f307a216,,"{'userId': '伊藤 和也(Kazuya Ito)', 'tags': []}"
2,b2f2f6e2-b328-4c8a-afb8-edb7d7538f18,2025-04-14T20:55:55.751Z,小室 幸大(Kodai Komuro),1,LIKE,4a9b5ae1-efd8-4b82-a966-e136d5f9862b,,"{'userId': '小室 幸大(Kodai Komuro)', 'tags': []}"


# Trace data collection

In [16]:
limit = 100
page = 1
max_traces = None
user_id = None
metadata_keys = None

dts = [
    "2025-04-14"
]

for from_timestamp in dts:

    trace_df = langfuseapi.fetch_traces(
        limit=limit, 
        page=page, 
        user_id=user_id, 
        from_timestamp=from_timestamp, 
        to_timestamp=from_timestamp,
        metadata_keys=metadata_keys
    )
    trace_df = trace_df[["id","name","timestamp",'input', 'output', "userId","scores","version","azure_user_id","question_uuid","department_name","using", "channel",'costs', 'audio_length','user_email', 'totalCost', 'observations']]
    trace_df.to_csv(f"data/yui/trace/{from_timestamp}.csv", index=False, quoting=1)

2025-04-18 10:36:46,360 - root - INFO - Collect data from JST 2025-04-14 00:00:00+09:00 to 2025-04-14 23:59:59.999999+09:00
2025-04-18 10:36:47,128 - root - INFO - Page#10, 2025-04-14 01:58:44.824000+00:00 ~ 2025-04-14 01:27:10.884000+00:00
2025-04-18 10:36:47,297 - root - INFO - All traces have been collected as the number of records is less than limit 100 per page.
2025-04-18 10:36:47,298 - root - INFO - Page#13, 2025-04-13 22:15:51.258000+00:00 ~ 2025-04-13 15:00:15.616000+00:00
2025-04-18 10:36:47,299 - root - INFO - Retrieved 1212 traces.
