-
Notifications
You must be signed in to change notification settings - Fork 3
/
chatrecord.py
185 lines (159 loc) · 6.41 KB
/
chatrecord.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
# AUTOGENERATED! DO NOT EDIT! File to edit: ../nbs/03_chatrecord.ipynb.
# %% auto 0
__all__ = ['NoChatOpenAI', 'get_nested_child_run', 'get_child_chat_run', 'ChatRecord', 'ChatRecordSet']
# %% ../nbs/03_chatrecord.ipynb 3
from typing import List, Iterable, Union
from collections import Counter
from pathlib import Path
import pickle
import pandas as pd
from pydantic import BaseModel
import langsmith
from fastcore.foundation import first, L
from fastcore.test import test_eq
from .runs import (get_runs_by_commit,
get_params, get_functions,
get_feedback)
from .transform import RunData
from langsmith import Client
# %% ../nbs/03_chatrecord.ipynb 5
class NoChatOpenAI(Exception):
def __init__(self, message, extra_data=None):
super().__init__(message)
# %% ../nbs/03_chatrecord.ipynb 6
def get_nested_child_run(run):
"Get the last nested `ChatOpenAI` run inside a Runnable Agent."
client = Client()
run = client.read_run(run_id=run.id, load_child_runs=True)
oai_children = []
for r in run.child_runs:
if r.name == 'RunnableAgent':
for c in r.child_runs:
if c.name == 'ChatOpenAI':
oai_children.append(c)
if r.name == 'ChatOpenAI':
oai_children.append(r)
if not oai_children:
raise NoChatOpenAI(f'Not able to find ChatOpenAI child run from root run {run.id}')
return oai_children[-1]
def get_child_chat_run(run):
"Get the last child `ChatOpenAI` run."
client = Client()
if run.parent_run_id is not None:
# if run.execution_order != 1: # this is a child run, get the parent
run = client.read_run(run.parent_run_id)
crun = get_nested_child_run(run)
return run, crun
# %% ../nbs/03_chatrecord.ipynb 9
class ChatRecord(BaseModel):
"A parsed run from LangSmith, focused on the `ChatOpenAI` run type."
child_run_id:str
child_run:RunData
child_url:Union[str,None] = None
parent_run_id:Union[str,None] = None
parent_url: Union[str,None] = None
total_tokens:Union[int, None]
prompt_tokens:Union[int, None]
completion_tokens:Union[int, None]
feedback: Union[List,None] = None
feedback_keys: Union[List,None] = None
tags: Union[List,None] = []
start_dt: Union[str, None] = None
function_defs: Union[List,None] = None
param_model_name: Union[str,None]= None
param_n: Union[int, None] = None
param_top_p: Union[int, None] = None
param_temp: Union[int, None] = None
param_presence_penalty: Union[int, None] = None
param_freq_penalty: Union[int, None] = None
@property
def flat_input(self): return self.child_run.flat_input
@property
def flat_output(self): return self.child_run.flat_output
@classmethod
def from_run_id(cls,
run_id:str # the run id to fetch and parse.
):
"Collect information About A Run into a `ChatRecord`."
client = Client()
return cls.from_run(client.read_run(run_id=run_id))
@classmethod
def from_run(cls,
run:langsmith.schemas.Run # the run object to parse.
):
"Collect information About A Run into a `ChatRecord`."
run, crun = get_child_chat_run(run)
if crun:
params = get_params(crun)
_feedback = get_feedback(run) # you must get feedback from the root
return cls(child_run_id=str(crun.id),
child_run=RunData.from_run_id(str(crun.id)),
child_url=crun.url,
parent_run_id=str(run.id) if run else None,
parent_url=run.url if run else None,
total_tokens=crun.total_tokens,
prompt_tokens=crun.prompt_tokens,
completion_tokens=crun.completion_tokens,
feedback=_feedback,
feedback_keys=list(L(_feedback).attrgot('key').filter()),
tags=run.tags,
start_dt=run.start_time.strftime('%m/%d/%Y'),
function_defs=get_functions(crun),
**params)
# %% ../nbs/03_chatrecord.ipynb 19
class ChatRecordSet(BaseModel):
"A List of `ChatRecord`."
records: List[ChatRecord]
@classmethod
def from_commit(cls, commit_id:str, limit:int=None):
"Create a `ChatRecordSet` from a commit id"
_runs = get_runs_by_commit(commit_id=commit_id, limit=limit)
return cls.from_runs(_runs)
@classmethod
def from_runs(cls, runs:List[langsmith.schemas.Run]):
"Load ChatRecordSet from runs."
_records = []
for r in runs:
try: _records.append(ChatRecord.from_run(r))
except NoChatOpenAI as e: print(e)
return cls(records=_records)
@classmethod
def from_run_ids(cls, runs:List[str]):
"Load ChatRecordSet from run ids."
_records = []
for r in runs:
try: _records.append(ChatRecord.from_run_id(r))
except NoChatOpenAI as e: print(e)
return cls(records=_records)
def __len__(self): return len(self.records)
def __getitem__(self, index: int) -> ChatRecord:
return self.records[index]
def __repr__(self):
return f'`List[ChatRecord]` of size {len(self.records)}.'
def save(self, path:str):
"Save data to disk."
dest_path = Path(path)
if not dest_path.parent.exists(): dest_path.parent.mkdir(exist_ok=True)
with open(dest_path, 'wb') as f:
pickle.dump(self, f)
return dest_path
def __iter__(self):
for r in self.records:
yield r
@classmethod
def load(cls, path:str):
"Load data from disk."
src_path = Path(path)
with open(src_path, 'rb') as f:
obj = pickle.load(f)
if isinstance(obj, cls):
return obj
else:
raise TypeError(f"The loaded object is not of type {cls.__name__}")
def to_pandas(self):
"Convert the `ChatRecordSet` to a pandas.DataFrame."
records = L(self.records).map(dict)
return pd.DataFrame(records)
def to_dicts(self):
"Convert the ChatRecordSet to a list of dicts, which you can convert to jsonl."
return list(L(self.records).map(lambda x: x.child_run.to_msg_dict()))