Skip to content

Commit

Permalink
[ISSUE apache#4047] impl Parse request
Browse files Browse the repository at this point in the history
  • Loading branch information
jevinjiang committed Apr 10, 2024
1 parent 608d169 commit e2dcf44
Show file tree
Hide file tree
Showing 6 changed files with 217 additions and 58 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.apache.eventmesh.common.utils.AssertUtils;
import org.apache.eventmesh.connector.chatgpt.source.config.ChatGPTSourceConfig;
import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO;
import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType;
import org.apache.eventmesh.connector.chatgpt.source.handlers.ChatHandler;
import org.apache.eventmesh.connector.chatgpt.source.handlers.ParseHandler;
import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager;
Expand Down Expand Up @@ -53,6 +54,7 @@
import io.vertx.core.http.HttpServerOptions;
import io.vertx.ext.web.RequestBody;
import io.vertx.ext.web.Router;
import io.vertx.ext.web.RoutingContext;
import io.vertx.ext.web.handler.BodyHandler;

import lombok.extern.slf4j.Slf4j;
Expand Down Expand Up @@ -121,39 +123,60 @@ private void doInit() {
try {
RequestBody body = ctx.body();
ChatGPTRequestDTO bodyObject = body.asPojo(ChatGPTRequestDTO.class);
if (bodyObject.getSubject() == null || bodyObject.getDataContentType() == null || bodyObject.getText() == null) {
throw new IllegalStateException("Attributes 'subject', 'datacontenttype', and 'prompt' cannot be null");
}
chatgptSourceExecutorService.execute(() -> {
try {
CloudEvent cloudEvent;
switch (bodyObject.getRequestType()) {
case CHAT:
cloudEvent = chatHandler.invoke(bodyObject);
break;
case PARSE:
cloudEvent = parseHandler.invoke(bodyObject);
break;
default:
throw new IllegalStateException("the request type is illegal");
}
queue.add(cloudEvent);
log.info("[ChatGPTSourceConnector] Succeed to convert payload into CloudEvent.");
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
} catch (Exception e) {
log.error("[ChatGPTSourceConnector] Error processing request: {}", e.getMessage(), e);
ctx.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
}
});
validateRequestDTO(bodyObject);
handleRequest(bodyObject, ctx);
} catch (Exception e) {
log.error("[ChatGPTSourceConnector] Malformed request. StatusCode={}", HttpResponseStatus.BAD_REQUEST.code(), e);
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
handleError(e, ctx);
}
});
this.server = vertx.createHttpServer(new HttpServerOptions().setPort(this.sourceConfig.connectorConfig.getPort())
.setIdleTimeout(this.sourceConfig.connectorConfig.getIdleTimeout())).requestHandler(router);
}


private void validateRequestDTO(ChatGPTRequestDTO bodyObject) {
if (bodyObject.getSubject() == null || bodyObject.getDataContentType() == null || bodyObject.getText() == null) {
throw new IllegalArgumentException("Attributes 'subject', 'datacontenttype', and 'prompt' cannot be null");
}
}

private void handleRequest(ChatGPTRequestDTO bodyObject, RoutingContext ctx) {
chatgptSourceExecutorService.execute(() -> {
try {
ChatGPTRequestType chatgptRequestType = ChatGPTRequestType.valueOf(bodyObject.getRequestType());
CloudEvent cloudEvent = invokeHandler(chatgptRequestType, bodyObject);
queue.add(cloudEvent);
log.info("[ChatGPTSourceConnector] Succeed to convert payload into CloudEvent.");
ctx.response().setStatusCode(HttpResponseStatus.OK.code()).end();
} catch (IllegalArgumentException e) {
log.error("[ChatGPTSourceConnector] the request type is illegal: {}", e.getMessage(), e);
ctx.response()
.setStatusCode(HttpResponseStatus.BAD_REQUEST.code())
.setStatusMessage(String.format("request type '%s' is not supported", bodyObject.getRequestType()))
.end();
} catch (Exception e) {
log.error("[ChatGPTSourceConnector] Error processing request: {}", e.getMessage(), e);
ctx.response().setStatusCode(HttpResponseStatus.INTERNAL_SERVER_ERROR.code()).end();
}
});
}

private CloudEvent invokeHandler(ChatGPTRequestType chatgptRequestType, ChatGPTRequestDTO bodyObject) {
switch (chatgptRequestType) {
case CHAT:
return chatHandler.invoke(bodyObject);
case PARSE:
return parseHandler.invoke(bodyObject);
default:
throw new IllegalStateException("the request type is illegal");
}
}

private void handleError(Exception e, RoutingContext ctx) {
log.error("[ChatGPTSourceConnector] Malformed request.", e);
ctx.response().setStatusCode(HttpResponseStatus.BAD_REQUEST.code()).end();
}

@Override
public void start() {
Throwable t = this.server.listen().cause();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@

import org.apache.eventmesh.connector.chatgpt.source.enums.ChatGPTRequestType;

import java.time.ZonedDateTime;
import java.util.List;
import java.util.UUID;

import com.fasterxml.jackson.annotation.JsonInclude;
import com.fasterxml.jackson.annotation.JsonProperty;

import lombok.AllArgsConstructor;
Expand All @@ -30,7 +35,7 @@
@NoArgsConstructor
public class ChatGPTRequestDTO {

private ChatGPTRequestType requestType;
private String requestType = ChatGPTRequestType.CHAT.name();

private String source;

Expand All @@ -43,4 +48,15 @@ public class ChatGPTRequestDTO {

private String text;

private String fields;

@JsonInclude
private String id = UUID.randomUUID().toString();

@JsonInclude
private String time = ZonedDateTime.now().toOffsetDateTime().toString();

public String getFields() {
return fields.replace(";", "\n");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,21 +53,14 @@ private CloudEvent genGptConnectRecord(ChatGPTRequestDTO event) {
List<ChatMessage> chatMessages = new ArrayList<>();
chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), event.getText()));
ChatCompletionRequest req = openaiManager.newChatCompletionRequest(chatMessages);
StringBuilder gptData = new StringBuilder();

try {
openaiManager.getOpenAiService().createChatCompletion(req).getChoices()
.forEach(chatCompletionChoice -> gptData.append(chatCompletionChoice.getMessage().getContent()));
} catch (Exception e) {
log.error("Failed to generate GPT connection record: {}", e.getMessage());
}
String chatResult = openaiManager.getResult(req);

return CloudEventBuilder.v1()
.withId(UUID.randomUUID().toString())
.withSource(URI.create(event.getSource()))
.withType(event.getType())
.withTime(ZonedDateTime.now().toOffsetDateTime())
.withData(gptData.toString().getBytes())
.withData(chatResult.getBytes())
.withSubject(event.getSubject())
.withDataContentType(event.getDataContentType())
.build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,26 +18,120 @@
package org.apache.eventmesh.connector.chatgpt.source.handlers;


import org.apache.eventmesh.common.Constants;
import org.apache.eventmesh.connector.chatgpt.source.dto.ChatGPTRequestDTO;
import org.apache.eventmesh.connector.chatgpt.source.managers.OpenaiManager;

import org.apache.commons.lang3.StringUtils;
import org.apache.commons.text.StringSubstitutor;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import io.cloudevents.CloudEvent;
import io.cloudevents.jackson.JsonFormat;

import com.fasterxml.jackson.annotation.JsonProperty;
import com.theokanning.openai.completion.chat.ChatCompletionRequest;
import com.theokanning.openai.completion.chat.ChatMessage;
import com.theokanning.openai.completion.chat.ChatMessageRole;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ParseHandler {

private final OpenaiManager openaiManager;

private final String promptTemplate;

private static final JsonFormat jsonFormat = new JsonFormat(false, true);


public ParseHandler(OpenaiManager openaiManager, String promptTemplate) {
this.openaiManager = openaiManager;
this.promptTemplate = promptTemplate;
}

@SuppressWarnings("checkstyle:WhitespaceAfter")
public CloudEvent invoke(ChatGPTRequestDTO event) {
// todo use StringSubstitutor event and promptTemplate translate to final prompt
Map<String, String> map = convertToMap(event);

StringSubstitutor substitute = new StringSubstitutor(map);
String finalPrompt = substitute.replace(promptTemplate);
List<ChatMessage> chatMessages = new ArrayList<>();
chatMessages.add(new ChatMessage(ChatMessageRole.USER.value(), finalPrompt));
ChatCompletionRequest req = openaiManager.newChatCompletionRequest(chatMessages);
String chatResult = openaiManager.getResult(req);
chatResult = StringUtils.removeFirst(chatResult, "```json");
chatResult = StringUtils.removeEnd(chatResult, "```");
CloudEvent cloudEvent;
try {
cloudEvent = jsonFormat.deserialize(chatResult.getBytes(Constants.DEFAULT_CHARSET));
} catch (Exception e) {
throw new IllegalStateException("cloudEvent parse fail, please check your parse prompt file content", e);
}
return cloudEvent;
}

public Map<String, String> convertToMap(Object obj) {
Map<String, String> map = new HashMap<>();
Class<?> clazz = obj.getClass();
Field[] fields = clazz.getDeclaredFields();
for (Field field : fields) {
if (field.isSynthetic()) {
continue;
}
if (Map.class.isAssignableFrom(field.getType()) || List.class.isAssignableFrom(field.getType())) {
continue;
}
try {
String key = field.getName();
if (field.isAnnotationPresent(JsonProperty.class)) {
JsonProperty annotation = field.getAnnotation(JsonProperty.class);
key = annotation.value();
}
Method getter = getGetter(field, clazz);
map.put(key, String.valueOf(getter.invoke(obj)));
} catch (IllegalAccessException | NoSuchMethodException | InvocationTargetException e) {
throw new IllegalStateException("convert to Map is fail", e);
}
}

return map;
}

public Method getGetter(Field field, Class<?> clazz) throws NoSuchMethodException {
boolean isBooleanField = false;
if (boolean.class.isAssignableFrom(field.getType()) || Boolean.class.isAssignableFrom(field.getType())) {
isBooleanField = true;
}
String handledFieldName = upperFirst(field.getName());
String methodName;
if (isBooleanField) {
methodName = "is" + handledFieldName;
} else {
methodName = "get" + handledFieldName;
}
return clazz.getDeclaredMethod(methodName);
}

return null;
public String upperFirst(String str) {
if (null == str) {
return null;
}
if (!str.isEmpty()) {
char firstChar = str.charAt(0);
if (Character.isLowerCase(firstChar)) {
return Character.toUpperCase(firstChar) + StringUtils.substring(str, 1);
}
}
return str;
}

}
Original file line number Diff line number Diff line change
@@ -1,21 +1,27 @@
You are an AI assistant named CloudEventsConverter. Your task is to convert input text provided by the user into a CloudEvents-formatted JSON object, avoid escape characters .
You are an AI assistant named CloudEventsConverter. avoid escape characters .
Your task is to construct a JSON object in CloudEvents format. Based on the field name and field description in the 'data' field of the CloudEvents formatted JSON object, convert the input text provided by the user into the content of the 'data' field, which must comply with the specifications of the content of the 'datacontenttype' field.
The role is :
- If the 'datacontenttype' field content is 'application/json', then the' data 'field content should be a JSON object,
- else If the 'datacontenttype' field content is not 'application/json' and is 'application/xml', then the' data 'field content should be a string in XML format and the outermost of XML format is <data> </data>, inside is the XML generated by you based on field info;
- else the 'datacontenttype' field content is not 'application/json' and 'application/xml', then the' data 'field content is string of the 'text' field content;
Except for the content of the data field, all other values should be set to and cannot be modified. Finally, return to me the JSON object in CloudEvents format that you constructed

For the following text, extract the following information:
The following text is the field name and field description in the 'data' field of the CloudEvents-formatted JSON object, extract the following information:
<BEGIN FIELD INFO>
${fields}
<END FIELD INFO>

Create a CloudEvents-formatted JSON object with the following fields:
- specversion: Set to "1.0" (the current CloudEvents specification version)
- type: Set to \\\ ${type} \\\
- source: Set to \\\ ${source} \\\
- id: Set to \\\ ${id} \\\ (Generate a unique identifier for the event.)
- time: Set to \\\ ${time} \\\
- datacontenttype: Set to \\\ ${datacontenttype} \\\ (e.g.,application/json)
- data: Set to the input text provided by the user
\\\
${fields}
\\\
text: ${text}

text: \\\ ${text} \\\

If any of the fields marked as \\\ {} \\\ are null or empty, use a default value.

Return the CloudEvents-formatted JSON object to the user,The format of the data field matches the datacontenttype,Just need to return the JSON object, nothing else needs to be returned。
The output should be a markdown code snippet formatted in the following schema, including the leading and trailing "```json" and "```":
```json
{
"specversion": string, Set to "1.0"
"type": string, Set to ${type}
"source": string, Set to ${source}
"subject": string, Set to ${subject}
"id": string, Set to ${id}
"time": string, Set to ${time}
"datacontenttype": string, Set to ${datacontenttype}
"data": object or string
}
Loading

0 comments on commit e2dcf44

Please sign in to comment.