Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chat: Add tracing #3168

Merged
merged 10 commits into from
Feb 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 16 additions & 5 deletions lib/shared/src/sourcegraph-api/completions/client.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import type { Span } from '@opentelemetry/api'
import type { ConfigurationWithAccessToken } from '../../configuration'

import type {
Expand All @@ -7,6 +8,7 @@ import type {
CompletionResponse,
Event,
} from './types'
import { recordErrorToSpan } from '../../tracing'

export interface CompletionLogger {
startCompletion(
Expand Down Expand Up @@ -50,23 +52,32 @@ export abstract class SourcegraphCompletionsClient {
return new URL('/.api/completions/stream', this.config.serverEndpoint).href
}

protected sendEvents(events: Event[], cb: CompletionCallbacks): void {
protected sendEvents(events: Event[], cb: CompletionCallbacks, span?: Span): void {
for (const event of events) {
switch (event.type) {
case 'completion':
case 'completion': {
span?.addEvent('yield', { stopReason: event.stopReason })
cb.onChange(event.completion)
break
case 'error':
}
case 'error': {
const error = new Error(event.error)
if (span) {
recordErrorToSpan(span, error)
}
this.errorEncountered = true
cb.onError(new Error(event.error))
cb.onError(error)
break
case 'done':
}
case 'done': {
if (!this.errorEncountered) {
cb.onComplete()
}
// reset errorEncountered for next request
this.errorEncountered = false
span?.end()
break
}
}
}
}
Expand Down
302 changes: 162 additions & 140 deletions lib/shared/src/sourcegraph-api/completions/nodeClient.ts
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import { toPartialUtf8String } from '../utils'
import { SourcegraphCompletionsClient } from './client'
import { parseEvents } from './parse'
import type { CompletionCallbacks, CompletionParameters } from './types'
import { getTraceparentHeaders, recordErrorToSpan, tracer } from '../../tracing'

const isTemperatureZero = process.env.CODY_TEMPERATURE_ZERO === 'true'

Expand All @@ -20,171 +21,192 @@ export class SourcegraphNodeCompletionsClient extends SourcegraphCompletionsClie
cb: CompletionCallbacks,
signal?: AbortSignal
): void {
if (isTemperatureZero) {
params = {
...params,
temperature: 0,
tracer.startActiveSpan(`POST ${this.completionsEndpoint}`, span => {
span.setAttributes({
fast: params.fast,
maxTokensToSample: params.maxTokensToSample,
temperature: params.temperature,
topK: params.topK,
topP: params.topP,
model: params.model,
})

if (isTemperatureZero) {
params = {
...params,
temperature: 0,
}
}
}

const log = this.logger?.startCompletion(params, this.completionsEndpoint)
const log = this.logger?.startCompletion(params, this.completionsEndpoint)

const requestFn = this.completionsEndpoint.startsWith('https://') ? https.request : http.request
const requestFn = this.completionsEndpoint.startsWith('https://')
? https.request
: http.request

// Keep track if we have send any message to the completion callbacks
let didSendMessage = false
let didSendError = false
// Keep track if we have send any message to the completion callbacks
let didSendMessage = false
let didSendError = false

// Call the error callback only once per request.
const onErrorOnce = (error: Error, statusCode?: number | undefined): void => {
if (!didSendError) {
cb.onError(error, statusCode)
didSendMessage = true
didSendError = true
// Call the error callback only once per request.
const onErrorOnce = (error: Error, statusCode?: number | undefined): void => {
if (!didSendError) {
recordErrorToSpan(span, error)
cb.onError(error, statusCode)
didSendMessage = true
didSendError = true
}
}
}

const request = requestFn(
this.completionsEndpoint,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
// Disable gzip compression since the sg instance will start to batch
// responses afterwards.
'Accept-Encoding': 'gzip;q=0',
...(this.config.accessToken
? { Authorization: `token ${this.config.accessToken}` }
: null),
...(customUserAgent ? { 'User-Agent': customUserAgent } : null),
...this.config.customHeaders,

const request = requestFn(
this.completionsEndpoint,
{
method: 'POST',
headers: {
'Content-Type': 'application/json',
// Disable gzip compression since the sg instance will start to batch
// responses afterwards.
'Accept-Encoding': 'gzip;q=0',
...(this.config.accessToken
? { Authorization: `token ${this.config.accessToken}` }
: null),
...(customUserAgent ? { 'User-Agent': customUserAgent } : null),
...this.config.customHeaders,
...getTraceparentHeaders(),
},
// So we can send requests to the Sourcegraph local development instance, which has an incompatible cert.
rejectUnauthorized:
process.env.NODE_TLS_REJECT_UNAUTHORIZED !== '0' && !this.config.debugEnable,
},
// So we can send requests to the Sourcegraph local development instance, which has an incompatible cert.
rejectUnauthorized:
process.env.NODE_TLS_REJECT_UNAUTHORIZED !== '0' && !this.config.debugEnable,
},
(res: http.IncomingMessage) => {
if (res.statusCode === undefined) {
throw new Error('no status code present')
}
(res: http.IncomingMessage) => {
const { 'set-cookie': _setCookie, ...safeHeaders } = res.headers
span.addEvent('response', {
...safeHeaders,
status: res.statusCode,
})

// Calls the error callback handler for an error.
//
// If the request failed with a rate limit error, wraps the
// error in RateLimitError.
function handleError(e: Error): void {
log?.onError(e.message, e)

if (res.statusCode === 429) {
// Check for explicit false, because if the header is not set, there
// is no upgrade available.
const upgradeIsAvailable =
typeof res.headers['x-is-cody-pro-user'] !== 'undefined' &&
res.headers['x-is-cody-pro-user'] === 'false'
const retryAfter = res.headers['retry-after']

const limit = res.headers['x-ratelimit-limit']
? getHeader(res.headers['x-ratelimit-limit'])
: undefined

const error = new RateLimitError(
'chat messages and commands',
e.message,
upgradeIsAvailable,
limit ? parseInt(limit, 10) : undefined,
retryAfter
)
onErrorOnce(error, res.statusCode)
} else {
onErrorOnce(e, res.statusCode)
if (res.statusCode === undefined) {
throw new Error('no status code present')
}

// Calls the error callback handler for an error.
//
// If the request failed with a rate limit error, wraps the
// error in RateLimitError.
function handleError(e: Error): void {
log?.onError(e.message, e)

if (res.statusCode === 429) {
// Check for explicit false, because if the header is not set, there
// is no upgrade available.
const upgradeIsAvailable =
typeof res.headers['x-is-cody-pro-user'] !== 'undefined' &&
res.headers['x-is-cody-pro-user'] === 'false'
const retryAfter = res.headers['retry-after']

const limit = res.headers['x-ratelimit-limit']
? getHeader(res.headers['x-ratelimit-limit'])
: undefined

const error = new RateLimitError(
'chat messages and commands',
e.message,
upgradeIsAvailable,
limit ? parseInt(limit, 10) : undefined,
retryAfter
)
onErrorOnce(error, res.statusCode)
} else {
onErrorOnce(e, res.statusCode)
}
}

// For failed requests, we just want to read the entire body and
// ultimately return it to the error callback.
if (res.statusCode >= 400) {
// Bytes which have not been decoded as UTF-8 text
let bufferBin = Buffer.of()
// Text which has not been decoded as a server-sent event (SSE)
let errorMessage = ''
res.on('data', chunk => {
if (!(chunk instanceof Buffer)) {
throw new TypeError('expected chunk to be a Buffer')
}
// Messages are expected to be UTF-8, but a chunk can terminate
// in the middle of a character
const { str, buf } = toPartialUtf8String(Buffer.concat([bufferBin, chunk]))
errorMessage += str
bufferBin = buf
})

res.on('error', e => handleError(e))
res.on('end', () => handleError(new Error(errorMessage)))
return
}
}

// For failed requests, we just want to read the entire body and
// ultimately return it to the error callback.
if (res.statusCode >= 400) {
// Bytes which have not been decoded as UTF-8 text
// By tes which have not been decoded as UTF-8 text
let bufferBin = Buffer.of()
// Text which has not been decoded as a server-sent event (SSE)
let errorMessage = ''
let bufferText = ''

res.on('data', chunk => {
if (!(chunk instanceof Buffer)) {
throw new TypeError('expected chunk to be a Buffer')
}
// Messages are expected to be UTF-8, but a chunk can terminate
// in the middle of a character
// text/event-stream messages are always UTF-8, but a chunk
// may terminate in the middle of a character
const { str, buf } = toPartialUtf8String(Buffer.concat([bufferBin, chunk]))
errorMessage += str
bufferText += str
bufferBin = buf

const parseResult = parseEvents(bufferText)
if (isError(parseResult)) {
logError(
'SourcegraphNodeCompletionsClient',
'isError(parseEvents(bufferText))',
parseResult
)
return
}

didSendMessage = true
log?.onEvents(parseResult.events)
this.sendEvents(parseResult.events, cb, span)
bufferText = parseResult.remainingBuffer
})

res.on('error', e => handleError(e))
res.on('end', () => handleError(new Error(errorMessage)))
return
}
)

request.on('error', e => {
let error = e
if (error.message.includes('ECONNREFUSED')) {
error = new Error(
'Could not connect to Cody. Please ensure that you are connected to the Sourcegraph server.'
)
}
log?.onError(error.message, e)
onErrorOnce(error)
})

// If the connection is closed and we did neither:
//
// - Receive an error HTTP code
// - Or any request body
//
// We still want to close the request.
request.on('close', () => {
if (!didSendMessage) {
onErrorOnce(new Error('Connection unexpectedly closed'))
}
})

// By tes which have not been decoded as UTF-8 text
let bufferBin = Buffer.of()
// Text which has not been decoded as a server-sent event (SSE)
let bufferText = ''

res.on('data', chunk => {
if (!(chunk instanceof Buffer)) {
throw new TypeError('expected chunk to be a Buffer')
}
// text/event-stream messages are always UTF-8, but a chunk
// may terminate in the middle of a character
const { str, buf } = toPartialUtf8String(Buffer.concat([bufferBin, chunk]))
bufferText += str
bufferBin = buf

const parseResult = parseEvents(bufferText)
if (isError(parseResult)) {
logError(
'SourcegraphNodeCompletionsClient',
'isError(parseEvents(bufferText))',
parseResult
)
return
}

didSendMessage = true
log?.onEvents(parseResult.events)
this.sendEvents(parseResult.events, cb)
bufferText = parseResult.remainingBuffer
})

res.on('error', e => handleError(e))
}
)

request.on('error', e => {
let error = e
if (error.message.includes('ECONNREFUSED')) {
error = new Error(
'Could not connect to Cody. Please ensure that you are connected to the Sourcegraph server.'
)
}
log?.onError(error.message, e)
onErrorOnce(error)
})
request.write(JSON.stringify(params))
request.end()

// If the connection is closed and we did neither:
//
// - Receive an error HTTP code
// - Or any request body
//
// We still want to close the request.
request.on('close', () => {
if (!didSendMessage) {
onErrorOnce(new Error('Connection unexpectedly closed'))
}
onAbort(signal, () => request.destroy())
})

request.write(JSON.stringify(params))
request.end()

onAbort(signal, () => request.destroy())
}
}

Expand Down
Loading
Loading