@@ -394,12 +394,19 @@ pub async fn run_agent(
394394 vec ! [ ]
395395 } ;
396396
397- // Check if explicit messages were provided
398- let use_explicit_messages = args
399- . messages
400- . as_ref ( )
401- . map ( |m| !m. is_empty ( ) )
402- . unwrap_or ( false ) ;
397+ // Fetch flow context for input transforms context, chat and memory
398+ let mut flow_context = get_flow_context ( db, job) . await ;
399+
400+ // Determine history mode with backward compatibility
401+ let history = args. history . clone ( ) . or_else ( || {
402+ // Backward compatibility: if messages_context_length is set, use auto mode
403+ args. messages_context_length . map ( |context_length| {
404+ History :: Auto { context_length }
405+ } )
406+ } ) ;
407+
408+ // Determine if we're using manual messages (which bypasses memory)
409+ let use_manual_messages = matches ! ( history, Some ( History :: Manual { .. } ) ) ;
403410
404411 // Check if user_message is provided and non-empty
405412 let has_user_message = args
@@ -408,55 +415,57 @@ pub async fn run_agent(
408415 . map ( |m| !m. is_empty ( ) )
409416 . unwrap_or ( false ) ;
410417
411- // Validate: at least one of messages or user_message must be provided
412- if !use_explicit_messages && !has_user_message {
418+ // Validate: at least one of history with manual messages or user_message must be provided
419+ if !use_manual_messages && !has_user_message {
413420 return Err ( Error :: internal_err (
414- "Either 'messages' or 'user_message' must be provided" . to_string ( ) ,
421+ "Either 'history' with manual messages or 'user_message' must be provided" . to_string ( ) ,
415422 ) ) ;
416423 }
417424
418- // Fetch flow context for input transforms context, chat and memory
419- let mut flow_context = get_flow_context ( db, job) . await ;
420-
421- // Load messages if given, this will bypass memory
422- if let Some ( ref explicit_messages) = args. messages {
423- if !explicit_messages. is_empty ( ) {
424- // Use explicitly provided messages (bypass memory)
425- messages. extend ( explicit_messages. clone ( ) ) ;
426- }
427- }
425+ // Load messages based on history mode
426+ if matches ! ( output_type, OutputType :: Text ) {
427+ match history {
428+ Some ( History :: Manual { messages : manual_messages } ) => {
429+ // Use explicitly provided messages (bypass memory)
430+ if !manual_messages. is_empty ( ) {
431+ messages. extend ( manual_messages) ;
432+ }
433+ }
434+ Some ( History :: Auto { context_length } ) if context_length > 0 => {
435+ // Auto mode: load from memory
436+ if let Some ( step_id) = job. flow_step_id . as_deref ( ) {
437+ if let Some ( memory_id) = flow_context
438+ . flow_status
439+ . as_ref ( )
440+ . and_then ( |fs| fs. memory_id )
441+ {
442+ // Read messages from memory
443+ match read_from_memory ( db, & job. workspace_id , memory_id, step_id) . await {
444+ Ok ( Some ( loaded_messages) ) => {
445+ // Take the last n messages
446+ let start_idx = loaded_messages. len ( ) . saturating_sub ( context_length) ;
447+ let mut messages_to_load = loaded_messages[ start_idx..] . to_vec ( ) ;
448+ let first_non_tool_message_index =
449+ messages_to_load. iter ( ) . position ( |m| m. role != "tool" ) ;
450+
451+ // Remove the first messages if their role is "tool" to avoid OpenAI API error
452+ if let Some ( index) = first_non_tool_message_index {
453+ messages_to_load = messages_to_load[ index..] . to_vec ( ) ;
454+ }
428455
429- if !use_explicit_messages && matches ! ( output_type, OutputType :: Text ) {
430- if let Some ( context_length) = args. messages_context_length . filter ( |& n| n > 0 ) {
431- if let Some ( step_id) = job. flow_step_id . as_deref ( ) {
432- if let Some ( memory_id) = flow_context
433- . flow_status
434- . as_ref ( )
435- . and_then ( |fs| fs. memory_id )
436- {
437- // Read messages from memory
438- match read_from_memory ( db, & job. workspace_id , memory_id, step_id) . await {
439- Ok ( Some ( loaded_messages) ) => {
440- // Take the last n messages
441- let start_idx = loaded_messages. len ( ) . saturating_sub ( context_length) ;
442- let mut messages_to_load = loaded_messages[ start_idx..] . to_vec ( ) ;
443- let first_non_tool_message_index =
444- messages_to_load. iter ( ) . position ( |m| m. role != "tool" ) ;
445-
446- // Remove the first messages if their role is "tool" to avoid OpenAI API error
447- if let Some ( index) = first_non_tool_message_index {
448- messages_to_load = messages_to_load[ index..] . to_vec ( ) ;
456+ messages. extend ( messages_to_load) ;
457+ }
458+ Ok ( None ) => { }
459+ Err ( e) => {
460+ tracing:: error!( "Failed to read memory for step {}: {}" , step_id, e) ;
449461 }
450-
451- messages. extend ( messages_to_load) ;
452- }
453- Ok ( None ) => { }
454- Err ( e) => {
455- tracing:: error!( "Failed to read memory for step {}: {}" , step_id, e) ;
456462 }
457463 }
458464 }
459465 }
466+ _ => {
467+ // No history or context_length is 0 - don't load any messages
468+ }
460469 }
461470 }
462471
@@ -921,37 +930,39 @@ pub async fn run_agent(
921930 }
922931 }
923932
924- // Persist complete conversation to memory at the end (only if context length is set )
925- // Skip memory persistence if explicit messages were provided (bypass memory entirely)
933+ // Persist complete conversation to memory at the end (only if in auto mode with context length )
934+ // Skip memory persistence if using manual messages (bypass memory entirely)
926935 // final_messages contains the complete history (old messages + new ones)
927- if matches ! ( output_type, OutputType :: Text ) && !use_explicit_messages {
928- if let Some ( context_length) = args. messages_context_length . filter ( |& n| n > 0 ) {
929- if let Some ( step_id) = job. flow_step_id . as_deref ( ) {
930- // Extract OpenAIMessages from final_messages
931- let all_messages: Vec < OpenAIMessage > =
932- final_messages. iter ( ) . map ( |m| m. message . clone ( ) ) . collect ( ) ;
933-
934- if !all_messages. is_empty ( ) {
935- // Keep only the last n messages
936- let start_idx = all_messages. len ( ) . saturating_sub ( context_length) ;
937- let messages_to_persist = all_messages[ start_idx..] . to_vec ( ) ;
938-
939- if let Some ( memory_id) = flow_context. flow_status . and_then ( |fs| fs. memory_id ) {
940- if let Err ( e) = write_to_memory (
941- db,
942- & job. workspace_id ,
943- memory_id,
944- step_id,
945- & messages_to_persist,
946- )
947- . await
948- {
949- tracing:: error!(
950- "Failed to persist {} messages to memory for step {}: {}" ,
951- messages_to_persist. len( ) ,
936+ if matches ! ( output_type, OutputType :: Text ) && !use_manual_messages {
937+ if let Some ( History :: Auto { context_length } ) = history {
938+ if context_length > 0 {
939+ if let Some ( step_id) = job. flow_step_id . as_deref ( ) {
940+ // Extract OpenAIMessages from final_messages
941+ let all_messages: Vec < OpenAIMessage > =
942+ final_messages. iter ( ) . map ( |m| m. message . clone ( ) ) . collect ( ) ;
943+
944+ if !all_messages. is_empty ( ) {
945+ // Keep only the last n messages
946+ let start_idx = all_messages. len ( ) . saturating_sub ( context_length) ;
947+ let messages_to_persist = all_messages[ start_idx..] . to_vec ( ) ;
948+
949+ if let Some ( memory_id) = flow_context. flow_status . and_then ( |fs| fs. memory_id ) {
950+ if let Err ( e) = write_to_memory (
951+ db,
952+ & job. workspace_id ,
953+ memory_id,
952954 step_id,
953- e
954- ) ;
955+ & messages_to_persist,
956+ )
957+ . await
958+ {
959+ tracing:: error!(
960+ "Failed to persist {} messages to memory for step {}: {}" ,
961+ messages_to_persist. len( ) ,
962+ step_id,
963+ e
964+ ) ;
965+ }
955966 }
956967 }
957968 }
0 commit comments