diff --git a/.gitignore b/.gitignore index b7faf40..7baf72d 100644 --- a/.gitignore +++ b/.gitignore @@ -143,6 +143,7 @@ venv/ ENV/ env.bak/ venv.bak/ +my_env/ # Spyder project settings .spyderproject @@ -205,3 +206,6 @@ cython_debug/ marimo/_static/ marimo/_lsp/ __marimo__/ + +#kfp +pipelines/github_rag_pipeline.yaml \ No newline at end of file diff --git a/README.md b/README.md index 5d3a8e7..c1e1637 100644 --- a/README.md +++ b/README.md @@ -1,2 +1,2 @@ # docs-agent -Kubeflow Documentation AI Agent to power the Kubeflow Website +Kubeflow Documentation AI Agent to power the Kubeflow Website \ No newline at end of file diff --git a/docs_scripts/chatbot.js b/docs_scripts/chatbot.js new file mode 100644 index 0000000..0dc8878 --- /dev/null +++ b/docs_scripts/chatbot.js @@ -0,0 +1,625 @@ +// Function to create chatbot HTML elements dynamically +function createChatbotElements() { + try { + // Check if elements already exist (prevent duplicates) + if (document.getElementById('chatbot-container')) { + console.log('Chatbot elements already exist, skipping creation'); + return true; + } + + // Create chatbot backdrop + const chatbotBackdrop = document.createElement('div'); + chatbotBackdrop.id = 'chatbot-backdrop'; + chatbotBackdrop.className = 'chatbot-backdrop'; + document.body.appendChild(chatbotBackdrop); + + // Create chatbot container + const chatbotContainer = document.createElement('div'); + chatbotContainer.id = 'chatbot-container'; + chatbotContainer.className = 'chatbot-container'; + + // Create chatbot header + const chatbotHeader = document.createElement('div'); + chatbotHeader.className = 'chatbot-header'; + + const chatbotTitle = document.createElement('div'); + chatbotTitle.className = 'chatbot-title'; + chatbotTitle.innerHTML = 'Docs Bot'; + + const toggleButton = document.createElement('button'); + toggleButton.id = 'toggle-chatbot'; + toggleButton.className = 'toggle-chatbot'; + toggleButton.innerHTML = '×'; + + chatbotHeader.appendChild(chatbotTitle); + chatbotHeader.appendChild(toggleButton); + + // Create chat messages area + const chatMessages = document.createElement('div'); + chatMessages.id = 'chat-messages'; + chatMessages.className = 'chat-messages'; + + // Create welcome message + const welcomeMessage = document.createElement('div'); + welcomeMessage.className = 'message bot-message welcome-message'; + const welcomeContent = document.createElement('div'); + welcomeContent.className = 'message-content'; + const welcomeText = document.createElement('p'); + welcomeText.textContent = "Hello! I'm your documentation assistant. How can I help you today?"; + welcomeContent.appendChild(welcomeText); + welcomeMessage.appendChild(welcomeContent); + chatMessages.appendChild(welcomeMessage); + + // Create input container + const chatInputContainer = document.createElement('div'); + chatInputContainer.className = 'chat-input-container'; + + const inputWrapper = document.createElement('div'); + inputWrapper.className = 'input-wrapper'; + + const userInput = document.createElement('textarea'); + userInput.id = 'user-input'; + userInput.className = 'chat-input'; + userInput.placeholder = 'Message Docs Bot...'; + userInput.rows = 1; + + const sendButton = document.createElement('button'); + sendButton.id = 'send-message'; + sendButton.className = 'send-button'; + sendButton.innerHTML = ''; + + inputWrapper.appendChild(userInput); + inputWrapper.appendChild(sendButton); + + const inputFooter = document.createElement('div'); + inputFooter.className = 'input-footer'; + const inputHint = document.createElement('span'); + inputHint.className = 'input-hint'; + inputHint.textContent = 'Press Enter to send, Shift + Enter for new line'; + inputFooter.appendChild(inputHint); + + chatInputContainer.appendChild(inputWrapper); + chatInputContainer.appendChild(inputFooter); + + // Assemble chatbot container + chatbotContainer.appendChild(chatbotHeader); + chatbotContainer.appendChild(chatMessages); + chatbotContainer.appendChild(chatInputContainer); + document.body.appendChild(chatbotContainer); + + // Create chatbot toggle button + const chatbotToggle = document.createElement('button'); + chatbotToggle.id = 'chatbot-toggle'; + chatbotToggle.className = 'chatbot-toggle'; + chatbotToggle.innerHTML = '💬Docs Bot'; + document.body.appendChild(chatbotToggle); + + // Force a small delay to ensure DOM is updated + return new Promise(resolve => { + setTimeout(() => { + // Verify all elements were created successfully + const requiredElements = [ + 'chatbot-container', 'chatbot-backdrop', 'chat-messages', + 'user-input', 'send-message', 'toggle-chatbot', 'chatbot-toggle' + ]; + + const missingElements = requiredElements.filter(id => !document.getElementById(id)); + if (missingElements.length > 0) { + console.error('Failed to create chatbot elements:', missingElements); + resolve(false); + } else { + console.log('All chatbot elements created successfully'); + resolve(true); + } + }, 10); + }); + + } catch (error) { + console.error('Error creating chatbot elements:', error); + return false; + } +} + +document.addEventListener('DOMContentLoaded', async function() { + // Create chatbot HTML structure dynamically and wait for completion + console.log("Newer Version Loaded"); + const elementsCreated = await createChatbotElements(); + + if (!elementsCreated) { + console.error('Failed to create chatbot elements, aborting initialization'); + return; + } + + // DOM Elements - with null checks + const chatbotContainer = document.getElementById('chatbot-container'); + const chatbotBackdrop = document.getElementById('chatbot-backdrop'); + const chatMessages = document.getElementById('chat-messages'); + const userInput = document.getElementById('user-input'); + const sendButton = document.getElementById('send-message'); + const toggleButton = document.getElementById('toggle-chatbot'); + const chatbotToggle = document.getElementById('chatbot-toggle'); + + // Validate all required elements exist + if (!chatbotContainer || !chatbotBackdrop || !chatMessages || !userInput || !sendButton || !toggleButton || !chatbotToggle) { + console.error('Some chatbot elements are missing:', { + chatbotContainer: !!chatbotContainer, + chatbotBackdrop: !!chatbotBackdrop, + chatMessages: !!chatMessages, + userInput: !!userInput, + sendButton: !!sendButton, + toggleButton: !!toggleButton, + chatbotToggle: !!chatbotToggle + }); + return; + } + + // State + let isTyping = false; + let socket = null; + let currentMessageDiv = null; + let currentMessageContent = ''; + + // Initialize WebSocket connection + function connectWebSocket() { + try { + console.log('Attempting to connect to WebSocket...'); + socket = new WebSocket('wss://websocket-server-production-9b44.up.railway.app'); + + socket.onopen = function(e) { + console.log('WebSocket connection established successfully'); + }; + + socket.onmessage = function(event) { + console.log('Received message from server:', event.data); + try { + const response = JSON.parse(event.data); + + if (response.type === 'system') { + addSystemMessage(response.content); + return; + } + + if (response.type === 'citations') { + addCitations(response.citations); + return; + } + + if (response.type === 'content') { + if (!currentMessageDiv) { + const messagesContainer = chatMessages || document.getElementById('chat-messages'); + if (!messagesContainer) { + console.error('Cannot display message: chat messages container not found'); + return; + } + + // Create new message div for the first token + currentMessageDiv = document.createElement('div'); + currentMessageDiv.className = 'message bot-message'; + + const contentDiv = document.createElement('div'); + contentDiv.className = 'message-content'; + + const paragraph = document.createElement('p'); + currentMessageDiv.appendChild(contentDiv); + contentDiv.appendChild(paragraph); + + messagesContainer.appendChild(currentMessageDiv); + removeTypingIndicator(); + } + + // Append new content to the end (tokens come in reverse order) + currentMessageContent += response.content; + const paragraph = currentMessageDiv.querySelector('p'); + + // Format streaming content + const formattedText = formatMarkdown(currentMessageContent, true); + + paragraph.innerHTML = formattedText; + + // Apply syntax highlighting to any new code blocks + if (window.Prism) { + const codeBlocks = currentMessageDiv.querySelectorAll('pre code'); + codeBlocks.forEach(block => { + if (!block.classList.contains('prism-highlighted')) { + block.classList.add('prism-highlighted'); + window.Prism.highlightElement(block); + } + }); + } + + scrollToBottom(); + } + + // Handle end of message or errors + if (response.type === 'end') { + currentMessageDiv = null; + currentMessageContent = ''; + } else if (response.type === 'error') { + removeTypingIndicator(); + addSystemMessage('Error: ' + response.content); + currentMessageDiv = null; + currentMessageContent = ''; + } + + } catch (error) { + console.error('Error parsing WebSocket message:', error); + removeTypingIndicator(); + addMessage('Sorry, there was an error processing your request.', 'bot'); + } + }; + + socket.onclose = function(event) { + console.log('WebSocket connection closed, code:', event.code, 'reason:', event.reason); + addSystemMessage('Disconnected from server'); + setTimeout(connectWebSocket, 3000); + }; + + socket.onerror = function(error) { + console.error('WebSocket error:', error); + addSystemMessage('Connection error'); + }; + } catch (error) { + console.error('Error creating WebSocket connection:', error); + addSystemMessage('Failed to connect'); + } + } + + // Connect to WebSocket when page loads + connectWebSocket(); + + // Event Listeners + sendButton.addEventListener('click', handleSendMessage); + userInput.addEventListener('keypress', function(e) { + if (e.key === 'Enter' && !e.shiftKey) { + e.preventDefault(); + handleSendMessage(); + } + }); + + toggleButton.addEventListener('click', hideChatbot); + chatbotToggle.addEventListener('click', showChatbot); + chatbotBackdrop.addEventListener('click', hideChatbot); + + // Auto-resize textarea + userInput.addEventListener('input', function() { + this.style.height = 'auto'; + this.style.height = Math.min(this.scrollHeight, 150) + 'px'; + }); + + // Unified markdown formatting function + function formatMarkdown(text, isStreaming = false) { + if (!text) return ''; + + let formatted = text; + const tempCodeBlocks = []; + const tempInlineCode = []; + + // Handle code blocks first (triple backticks) + formatted = formatted.replace(/```(\w+)?\n?([\s\S]*?)```/g, (match, lang, code) => { + const language = lang || 'text'; + const escapedCode = code.replace(/&/g, '&').replace(//g, '>'); + const block = `
${escapedCode.trim()}
`; + tempCodeBlocks.push(block); + return `__CODEBLOCK_${tempCodeBlocks.length - 1}__`; + }); + + // Handle inline code (single backticks) + formatted = formatted.replace(/`([^`]+)`/g, (match, code) => { + const escapedCode = code.replace(/&/g, '&').replace(//g, '>'); + const inlineCode = `${escapedCode}`; + tempInlineCode.push(inlineCode); + return `__INLINECODE_${tempInlineCode.length - 1}__`; + }); + + // Escape remaining HTML + formatted = formatted.replace(/&/g, '&').replace(//g, '>'); + + // Restore code blocks and inline code + tempCodeBlocks.forEach((block, index) => { + formatted = formatted.replace(`__CODEBLOCK_${index}__`, block); + }); + + tempInlineCode.forEach((code, index) => { + formatted = formatted.replace(`__INLINECODE_${index}__`, code); + }); + + // Bold text + formatted = formatted.replace(/\*\*(.*?)\*\*/g, '$1'); + + return formatted; + } + + function handleSendMessage() { + const message = userInput.value.trim(); + if (!message || isTyping) return; + + // Reset the current message state + currentMessageDiv = null; + currentMessageContent = ''; + + // Add user message + addMessage(message, 'user'); + userInput.value = ''; + userInput.style.height = 'auto'; + + // Show typing indicator + showTypingIndicator(); + + // Send message to WebSocket server + if (socket && socket.readyState === WebSocket.OPEN) { + try { + // Send the raw message - the server expects plain text + console.log('Sending message to server:', message); + socket.send(message); + } catch (error) { + console.error('Error sending message:', error); + removeTypingIndicator(); + addSystemMessage('Failed to send message'); + addMessage('Error sending your message. Please try again.', 'bot'); + } + } else { + // If WebSocket is not connected, show an error message + console.error('WebSocket not connected. Current state:', socket ? socket.readyState : 'No socket'); + removeTypingIndicator(); + addSystemMessage('Not connected to server'); + addMessage('Unable to connect to the server. Please try again later.', 'bot'); + // Try to reconnect + connectWebSocket(); + } + } + + function addMessage(text, sender) { + const messagesContainer = chatMessages || document.getElementById('chat-messages'); + if (!messagesContainer) { + console.error('Cannot add message: chat messages container not found'); + return; + } + + const messageDiv = document.createElement('div'); + messageDiv.className = `message ${sender}-message`; + + const contentDiv = document.createElement('div'); + contentDiv.className = 'message-content'; + + const paragraph = document.createElement('p'); + + // Use enhanced parsing for bot messages to handle markdown + if (sender === 'bot') { + const formattedText = formatMarkdown(text); + paragraph.innerHTML = formattedText; + + // Apply syntax highlighting + if (window.Prism) { + const codeBlocks = messageDiv.querySelectorAll('pre code'); + codeBlocks.forEach(block => { + window.Prism.highlightElement(block); + }); + } + } else { + paragraph.textContent = text; + } + + contentDiv.appendChild(paragraph); + messageDiv.appendChild(contentDiv); + + messagesContainer.appendChild(messageDiv); + scrollToBottom(); + } + + function addSystemMessage(text) { + const messagesContainer = chatMessages || document.getElementById('chat-messages'); + if (!messagesContainer) { + console.error('Cannot add system message: chat messages container not found'); + return; + } + + const messageDiv = document.createElement('div'); + messageDiv.className = 'message system-message'; + + const contentDiv = document.createElement('div'); + contentDiv.className = 'message-content'; + + const paragraph = document.createElement('p'); + paragraph.textContent = text; + + contentDiv.appendChild(paragraph); + messageDiv.appendChild(contentDiv); + + messagesContainer.appendChild(messageDiv); + scrollToBottom(); + } + + function addCitations(citations) { + if (!citations || citations.length === 0) return; + + const messagesContainer = chatMessages || document.getElementById('chat-messages'); + if (!messagesContainer) { + console.error('Cannot add citations: chat messages container not found'); + return; + } + + // Find the last bot message to attach citations to + const messages = messagesContainer.querySelectorAll('.bot-message:not(.typing-indicator)'); + const lastBotMessage = messages[messages.length - 1]; + + if (!lastBotMessage) { + // If no bot message exists, create a standalone citations message + const citationDiv = document.createElement('div'); + citationDiv.className = 'message bot-message citations-message'; + + const contentDiv = document.createElement('div'); + contentDiv.className = 'message-content'; + + const citationsContainer = createCitationsDropdown(citations); + contentDiv.appendChild(citationsContainer); + citationDiv.appendChild(contentDiv); + + messagesContainer.appendChild(citationDiv); + } else { + // Attach citations to the last bot message + const contentDiv = lastBotMessage.querySelector('.message-content'); + const existingCitations = contentDiv.querySelector('.citations-container'); + + // Remove existing citations if any + if (existingCitations) { + existingCitations.remove(); + } + + const citationsContainer = createCitationsDropdown(citations); + contentDiv.appendChild(citationsContainer); + } + + scrollToBottom(); + } + + function createCitationsDropdown(citations) { + const container = document.createElement('div'); + container.className = 'citations-container'; + + const toggleButton = document.createElement('button'); + toggleButton.className = 'citations-toggle'; + toggleButton.innerHTML = ` + 📚 + Sources (${citations.length}) + + `; + + const dropdown = document.createElement('div'); + dropdown.className = 'citations-dropdown'; + + citations.forEach((citation, index) => { + const link = document.createElement('a'); + link.href = citation; + link.target = '_blank'; + link.rel = 'noopener noreferrer'; + link.className = 'citation-link'; + + // Extract the page title from URL for better display + const urlParts = citation.split('/'); + const pageName = urlParts[urlParts.length - 1] || urlParts[urlParts.length - 2]; + const displayName = pageName.replace(/-/g, ' ').replace(/_/g, ' '); + + link.innerHTML = ` + ${index + 1} + ${displayName} + ${citation} + `; + + dropdown.appendChild(link); + }); + + // Toggle functionality + let isOpen = false; + toggleButton.addEventListener('click', function() { + isOpen = !isOpen; + dropdown.classList.toggle('open', isOpen); + toggleButton.querySelector('.citations-arrow').textContent = isOpen ? '▲' : '▼'; + toggleButton.classList.toggle('open', isOpen); + }); + + container.appendChild(toggleButton); + container.appendChild(dropdown); + + return container; + } + + function showTypingIndicator() { + const messagesContainer = chatMessages || document.getElementById('chat-messages'); + if (!messagesContainer) { + console.error('Cannot show typing indicator: chat messages container not found'); + return; + } + + isTyping = true; + const typingDiv = document.createElement('div'); + typingDiv.className = 'message bot-message typing-indicator'; + typingDiv.id = 'typing-indicator'; + + const contentDiv = document.createElement('div'); + contentDiv.className = 'message-content'; + + const dots = document.createElement('div'); + dots.className = 'typing-dots'; + dots.innerHTML = ''; + + contentDiv.appendChild(dots); + typingDiv.appendChild(contentDiv); + + messagesContainer.appendChild(typingDiv); + scrollToBottom(); + } + + function removeTypingIndicator() { + isTyping = false; + const typingIndicator = document.getElementById('typing-indicator'); + if (typingIndicator) { + typingIndicator.remove(); + } + } + + function scrollToBottom() { + const messagesContainer = chatMessages || document.getElementById('chat-messages'); + if (messagesContainer) { + messagesContainer.scrollTop = messagesContainer.scrollHeight; + } + } + + function showChatbot() { + if (chatbotContainer) { + chatbotContainer.classList.remove('minimized'); + chatbotContainer.classList.add('active'); + } + if (chatbotBackdrop) { + chatbotBackdrop.classList.add('active'); + } + if (chatbotToggle) { + chatbotToggle.classList.add('hidden'); + } + if (toggleButton) { + const minimizeIcon = toggleButton.querySelector('.minimize-icon'); + if (minimizeIcon) { + minimizeIcon.textContent = '×'; + } + } + scrollToBottom(); + document.body.style.overflow = 'hidden'; + } + + function hideChatbot() { + if (chatbotContainer) { + chatbotContainer.classList.remove('active'); + } + if (chatbotBackdrop) { + chatbotBackdrop.classList.remove('active'); + } + if (chatbotToggle) { + chatbotToggle.classList.remove('hidden'); + } + + // Restore body scroll + document.body.style.overflow = ''; + + setTimeout(() => { + if (chatbotContainer) { + chatbotContainer.classList.add('minimized'); + } + if (toggleButton) { + const minimizeIcon = toggleButton.querySelector('.minimize-icon'); + if (minimizeIcon) { + minimizeIcon.textContent = '+'; + } + } + }, 300); + } + + // Initialize chatbot state + if (chatbotContainer) { + chatbotContainer.classList.add('minimized'); + } + if (toggleButton) { + const minimizeIcon = toggleButton.querySelector('.minimize-icon'); + if (minimizeIcon) { + minimizeIcon.textContent = '+'; + } + } +}); diff --git a/docs_styles/chatbot.css b/docs_styles/chatbot.css new file mode 100644 index 0000000..447a8a2 --- /dev/null +++ b/docs_styles/chatbot.css @@ -0,0 +1,465 @@ +.chatbot-container { + position: fixed; + top: 50%; + left: 50%; + width: 900px; + height: 700px; + background: #ffffff; + border-radius: 16px; + box-shadow: 0 20px 60px rgba(0, 0, 0, 0.25); + display: flex; + flex-direction: column; + z-index: 1001; + transition: all 0.4s cubic-bezier(0.4, 0, 0.2, 1); + font-family: -apple-system, BlinkMacSystemFont, "Segoe UI", Roboto, "Helvetica Neue", Arial, sans-serif; + opacity: 0; + transform: translate(-50%, -50%) scale(0.8); + pointer-events: none; + transform-origin: center center; + backdrop-filter: blur(10px); + border: 1px solid rgba(255, 255, 255, 0.1); + overflow: hidden; +} +.chatbot-container.active { + opacity: 1; + transform: translate(-50%, -50%) scale(1); + pointer-events: all; +} +.chatbot-container.minimized { + opacity: 0; + transform: translate(-50%, -50%) scale(0.8); + pointer-events: none; +} +.chatbot-header { + padding: 16px 20px; + background: linear-gradient(135deg, #4a6bdf 0%, #2854d8 100%); + color: white; + border-radius: 16px 16px 0 0; + display: flex; + justify-content: space-between; + align-items: center; + box-shadow: 0 2px 8px rgba(0, 0, 0, 0.05); +} +.chatbot-title { + font-size: 16px; + font-weight: 600; + letter-spacing: 0.3px; + display: flex; + align-items: center; + gap: 8px; +} +.chatbot-title::before { + content: ''; + width: 8px; + height: 8px; + background: #4CAF50; + border-radius: 50%; + display: inline-block; +} +.toggle-chatbot { + background: none; + border: none; + color: white; + cursor: pointer; + font-size: 20px; + padding: 4px 8px; + opacity: 0.8; + transition: all 0.2s ease; + line-height: 1; + border-radius: 6px; +} +.toggle-chatbot:hover { + opacity: 1; + background: rgba(255, 255, 255, 0.1); +} +.chat-messages { + flex: 1; + overflow-y: auto; + padding: 24px; + display: flex; + flex-direction: column; + gap: 16px; + background: #ffffff; + word-wrap: break-word; + overflow-x: hidden; +} +.message { + max-width: 85%; + width: fit-content; + padding: 12px 16px; + border-radius: 14px; + line-height: 1.5; + font-size: 14px; + position: relative; + opacity: 0; + transform: translateY(10px); + animation: messageAppear 0.3s ease forwards; + box-shadow: 0 2px 6px rgba(0, 0, 0, 0.04); +} +@keyframes messageAppear { + to { + opacity: 1; + transform: translateY(0); + } +} +.message-content { + position: relative; + z-index: 1; + max-width: 100%; +} +.message-content p { + margin: 0; + white-space: pre-wrap; + word-wrap: break-word; + overflow-wrap: break-word; + line-height: 1.5; +} +.message-content strong { + font-weight: 600; +} +.message-content code { + background: #f1f3f4; + padding: 2px 4px; + border-radius: 4px; + font-family: 'Monaco', 'Consolas', 'Courier New', monospace; + font-size: 13px; + color: #d73a49; +} +.message-content pre { + background: #f6f8fa; + border: 1px solid #e1e4e8; + border-radius: 6px; + padding: 16px; + margin: 8px 0; + overflow-x: auto; + font-family: 'Monaco', 'Consolas', 'Courier New', monospace; + font-size: 13px; + line-height: 1.45; + white-space: pre; + word-wrap: normal; +} +.message-content pre code { + background: none; + padding: 0; + color: #24292e; + border-radius: 0; +} +/* Removed unused list styles since we simplified markdown */ +.user-message { + background: linear-gradient(135deg, #4a6bdf 0%, #2854d8 100%); + color: white; + align-self: flex-end; + border-radius: 16px 16px 0 16px; +} +.bot-message { + background: #f5f7fa; + color: #333; + align-self: flex-start; + border-radius: 16px 16px 16px 0; +} +/* Removed unused welcome-message style */ +.system-message { + background: #f0f2f5; + color: #666; + align-self: center; + border-radius: 12px; + max-width: 80%; + padding: 6px 12px; + font-size: 12px; + opacity: 0.8; + margin: 8px 0; +} +.chat-input-container { + padding: 16px 20px; + border-top: 1px solid rgba(0, 0, 0, 0.05); + background: #ffffff; + border-radius: 0 0 16px 16px; +} +.input-wrapper { + display: flex; + gap: 12px; + align-items: flex-end; +} +.chat-input { + flex: 1; + padding: 12px 16px; + border: 1px solid rgba(0, 0, 0, 0.1); + border-radius: 12px; + resize: none; + font-family: inherit; + font-size: 14px; + line-height: 1.5; + max-height: 150px; + transition: all 0.2s ease; + background: #f8f9fa; +} +.chat-input:focus { + outline: none; + border-color: #4a6bdf; + box-shadow: 0 0 0 2px rgba(74, 107, 223, 0.2); + background: #ffffff; +} +.send-button { + background: linear-gradient(135deg, #4a6bdf 0%, #2854d8 100%); + color: white; + border: none; + border-radius: 10px; + width: 36px; + height: 36px; + display: flex; + align-items: center; + justify-content: center; + cursor: pointer; + transition: all 0.2s ease; + flex-shrink: 0; + box-shadow: 0 2px 8px rgba(0, 0, 0, 0.1); +} +.send-button:hover { + transform: translateY(-2px); + box-shadow: 0 4px 12px rgba(74, 107, 223, 0.3); +} +/* Removed unused input footer styles */ +.chatbot-toggle { + position: fixed; + bottom: 20px; + right: 20px; + background: linear-gradient(135deg, #4a6bdf 0%, #2854d8 100%); + color: white; + border: none; + border-radius: 24px; + cursor: pointer; + display: flex; + align-items: center; + gap: 8px; + padding: 12px 20px; + box-shadow: 0 4px 16px rgba(74, 107, 223, 0.3); + z-index: 1000; + transition: all 0.3s ease; + opacity: 1; +} +.chatbot-toggle.hidden { + opacity: 0; + pointer-events: none; +} +.chatbot-toggle:hover { + transform: translateY(-2px); + box-shadow: 0 6px 20px rgba(74, 107, 223, 0.4); +} +.chat-icon { + font-size: 18px; +} +.chat-text { + font-size: 14px; + font-weight: 500; +} +.typing-indicator { + background: #f5f7fa; + padding: 12px 16px; + border-radius: 12px; + max-width: 80px; + align-self: flex-start; +} +.typing-dots { + display: flex; + gap: 4px; + align-items: center; + justify-content: center; +} +.typing-dots span { + width: 6px; + height: 6px; + background: #4a6bdf; + border-radius: 50%; + animation: typing 1s infinite ease-in-out; +} +.typing-dots span:nth-child(2) { + animation-delay: 0.2s; +} +.typing-dots span:nth-child(3) { + animation-delay: 0.4s; +} +@keyframes typing { + 0%, 100% { + transform: translateY(0); + } + 50% { + transform: translateY(-4px); + } +} +/* Modal backdrop */ +.chatbot-backdrop { + position: fixed; + top: 0; + left: 0; + width: 100%; + height: 100%; + background: rgba(0, 0, 0, 0.5); + z-index: 1000; + opacity: 0; + pointer-events: none; + transition: opacity 0.3s ease; +} +.chatbot-backdrop.active { + opacity: 1; + pointer-events: all; +} + +@media (max-width: 1024px) { + .chatbot-container { + width: 90%; + height: 80%; + max-width: 800px; + max-height: 600px; + } +} + +/* Citations styling */ +.citations-container { + margin-top: 12px; + border-top: 1px solid rgba(0, 0, 0, 0.1); + padding-top: 12px; +} + +.citations-toggle { + background: #f8f9fa; + border: 1px solid #e9ecef; + border-radius: 8px; + padding: 8px 12px; + cursor: pointer; + display: flex; + align-items: center; + gap: 8px; + width: 100%; + font-size: 13px; + color: #495057; + transition: all 0.2s ease; + font-family: inherit; +} + +.citations-toggle:hover { + background: #e9ecef; + border-color: #dee2e6; +} + +.citations-toggle.open { + background: #e3f2fd; + border-color: #2196f3; + color: #1976d2; +} + +.citations-icon { + font-size: 14px; +} + +.citations-text { + flex: 1; + text-align: left; + font-weight: 500; +} + +.citations-arrow { + font-size: 10px; + transition: transform 0.2s ease; +} + +.citations-dropdown { + max-height: 0; + overflow: hidden; + transition: max-height 0.3s ease; + background: #ffffff; + border: 1px solid #e9ecef; + border-top: none; + border-radius: 0 0 8px 8px; +} + +.citations-dropdown.open { + max-height: 300px; + overflow-y: auto; +} + +.citation-link { + display: block; + padding: 10px 12px; + text-decoration: none; + color: #495057; + border-bottom: 1px solid #f1f3f4; + transition: background-color 0.2s ease; + font-size: 12px; +} + +.citation-link:hover { + background: #f8f9fa; + color: #2196f3; +} + +.citation-link:last-child { + border-bottom: none; +} + +.citation-number { + display: inline-block; + background: #2196f3; + color: white; + width: 18px; + height: 18px; + border-radius: 50%; + text-align: center; + line-height: 18px; + font-size: 10px; + font-weight: bold; + margin-right: 8px; + flex-shrink: 0; +} + +.citation-title { + display: block; + font-weight: 500; + color: #212529; + margin-bottom: 2px; + text-transform: capitalize; +} + +.citation-url { + display: block; + color: #6c757d; + font-size: 11px; + word-break: break-all; + opacity: 0.8; +} + +.citations-message { + background: #f8f9fa; + border: 1px solid #e9ecef; +} + +@media (max-width: 768px) { + .chatbot-container { + width: 95%; + height: 90%; + max-width: none; + max-height: none; + border-radius: 12px; + } + .chatbot-container.minimized { + opacity: 0; + transform: translate(-50%, -50%) scale(0.8); + pointer-events: none; + } + .chatbot-toggle { + bottom: 15px; + right: 15px; + padding: 10px 16px; + } + .message { + max-width: 90%; + } + .chat-messages { + padding: 16px; + } + .citations-dropdown.open { + max-height: 200px; + } + .citation-link { + padding: 8px 10px; + } +} \ No newline at end of file diff --git a/manifests/inference-service.yaml b/manifests/inference-service.yaml new file mode 100644 index 0000000..ca61feb --- /dev/null +++ b/manifests/inference-service.yaml @@ -0,0 +1,38 @@ +apiVersion: serving.kserve.io/v1beta1 +kind: InferenceService +metadata: + name: llama + namespace: santhosh +spec: + predictor: + model: + modelFormat: + name: huggingface + version: "1" + runtime: llm-runtime + args: + - --model_name=llama3.1-8B + - --model_id=RedHatAI/Llama-3.1-8B-Instruct + - --backend=vllm + - --max-model-len=32768 + - --gpu-memory-utilization=0.90 + - --enable-auto-tool-choice + - --tool-call-parser=llama3_json + - --enable-tool-call-parser + env: + - name: HF_TOKEN + valueFrom: + secretKeyRef: + name: huggingface-secret + key: token + - name: CUDA_VISIBLE_DEVICES + value: "0" + resources: + requests: + cpu: "4" + memory: "16Gi" + nvidia.com/gpu: "1" + limits: + cpu: "6" + memory: "24Gi" + nvidia.com/gpu: "1" diff --git a/manifests/milvus-deployment.yaml b/manifests/milvus-deployment.yaml new file mode 100644 index 0000000..38c5dd9 --- /dev/null +++ b/manifests/milvus-deployment.yaml @@ -0,0 +1,256 @@ +apiVersion: v1 +kind: Pod +metadata: + annotations: + sidecar.istio.io/inject: "false" + creationTimestamp: "2025-08-03T16:17:18Z" + generateName: milvus-standalone-final-5cb655b8d6- + generation: 1 + labels: + app: milvus-standalone-final + pod-template-hash: 5cb655b8d6 + name: milvus-standalone-final-5cb655b8d6-6ngrn + namespace: santhosh + ownerReferences: + - apiVersion: apps/v1 + blockOwnerDeletion: true + controller: true + kind: ReplicaSet + name: milvus-standalone-final-5cb655b8d6 + uid: 7859a7a3-5ff8-41af-8e07-a533a298b141 + resourceVersion: "14225268" + uid: 804d95ff-f4e4-47a4-a7b3-d5c6edaa5042 +spec: + containers: + - command: + - milvus + - run + - standalone + env: + - name: ETCD_ENDPOINTS + value: localhost:2379 + - name: MINIO_ADDRESS + value: localhost:9000 + - name: MINIO_ACCESS_KEY_ID + value: minioadmin + - name: MINIO_SECRET_ACCESS_KEY + value: minioadmin + image: milvusdb/milvus:v2.3.4 + imagePullPolicy: IfNotPresent + name: milvus + ports: + - containerPort: 19530 + protocol: TCP + - containerPort: 9091 + protocol: TCP + readinessProbe: + failureThreshold: 3 + httpGet: + path: /healthz + port: 9091 + scheme: HTTP + initialDelaySeconds: 30 + periodSeconds: 10 + successThreshold: 1 + timeoutSeconds: 5 + resources: {} + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access-2hjlp + readOnly: true + - command: + - etcd + - --advertise-client-urls=http://127.0.0.1:2379 + - --listen-client-urls=http://0.0.0.0:2379 + - --data-dir=/etcd-data + image: quay.io/coreos/etcd:v3.5.0 + imagePullPolicy: IfNotPresent + name: etcd + ports: + - containerPort: 2379 + protocol: TCP + resources: {} + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - mountPath: /etcd-data + name: etcd-data + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access-2hjlp + readOnly: true + - command: + - minio + - server + - /minio-data + - --console-address + - :9001 + env: + - name: MINIO_ROOT_USER + value: minioadmin + - name: MINIO_ROOT_PASSWORD + value: minioadmin + image: minio/minio:RELEASE.2023-03-20T20-16-18Z + imagePullPolicy: IfNotPresent + name: minio + ports: + - containerPort: 9000 + protocol: TCP + resources: {} + terminationMessagePath: /dev/termination-log + terminationMessagePolicy: File + volumeMounts: + - mountPath: /minio-data + name: minio-data + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access-2hjlp + readOnly: true + dnsPolicy: ClusterFirst + enableServiceLinks: true + nodeName: 10.0.10.183 + preemptionPolicy: PreemptLowerPriority + priority: 0 + restartPolicy: Always + schedulerName: default-scheduler + securityContext: {} + serviceAccount: default + serviceAccountName: default + terminationGracePeriodSeconds: 30 + tolerations: + - effect: NoExecute + key: node.kubernetes.io/not-ready + operator: Exists + tolerationSeconds: 300 + - effect: NoExecute + key: node.kubernetes.io/unreachable + operator: Exists + tolerationSeconds: 300 + volumes: + - emptyDir: {} + name: etcd-data + - emptyDir: {} + name: minio-data + - name: kube-api-access-2hjlp + projected: + defaultMode: 420 + sources: + - serviceAccountToken: + expirationSeconds: 3607 + path: token + - configMap: + items: + - key: ca.crt + path: ca.crt + name: kube-root-ca.crt + - downwardAPI: + items: + - fieldRef: + apiVersion: v1 + fieldPath: metadata.namespace + path: namespace +status: + conditions: + - lastProbeTime: null + lastTransitionTime: "2025-08-03T16:17:30Z" + status: "True" + type: PodReadyToStartContainers + - lastProbeTime: null + lastTransitionTime: "2025-08-03T16:17:18Z" + status: "True" + type: Initialized + - lastProbeTime: null + lastTransitionTime: "2025-08-03T16:18:01Z" + status: "True" + type: Ready + - lastProbeTime: null + lastTransitionTime: "2025-08-03T16:18:01Z" + status: "True" + type: ContainersReady + - lastProbeTime: null + lastTransitionTime: "2025-08-03T16:17:18Z" + status: "True" + type: PodScheduled + containerStatuses: + - containerID: cri-o://974cb9b3a881ab0f7965bd5e31621686474764d8d01550ef3710e9b2058e48a7 + image: quay.io/coreos/etcd:v3.5.0 + imageID: quay.io/coreos/etcd@sha256:28759af54acd6924b2191dc1a1d096e2fa2e219717a21b9d8edf89717db3631b + lastState: {} + name: etcd + ready: true + resources: {} + restartCount: 0 + started: true + state: + running: + startedAt: "2025-08-03T16:17:27Z" + user: + linux: + gid: 0 + supplementalGroups: + - 0 + uid: 0 + volumeMounts: + - mountPath: /etcd-data + name: etcd-data + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access-2hjlp + readOnly: true + recursiveReadOnly: Disabled + - containerID: cri-o://b864b918e771fcb9e1fdb41baeb8f46f5024ecfd08f88679210603506682463b + image: docker.io/milvusdb/milvus:v2.3.4 + imageID: docker.io/milvusdb/milvus@sha256:efd6ef720b6ad0de62d006319996ba18504842ffaa543e3b072aeb5963305907 + lastState: {} + name: milvus + ready: true + resources: {} + restartCount: 0 + started: true + state: + running: + startedAt: "2025-08-03T16:17:25Z" + user: + linux: + gid: 0 + supplementalGroups: + - 0 + uid: 0 + volumeMounts: + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access-2hjlp + readOnly: true + recursiveReadOnly: Disabled + - containerID: cri-o://4a19f6821fa31b1d3f5db68a0b49f0df03cba496770ba4d46922cb3308ff781e + image: docker.io/minio/minio:RELEASE.2023-03-20T20-16-18Z + imageID: docker.io/minio/minio@sha256:6d770d7f255cda1f18d841ffc4365cb7e0d237f6af6a15fcdb587480cd7c3b93 + lastState: {} + name: minio + ready: true + resources: {} + restartCount: 0 + started: true + state: + running: + startedAt: "2025-08-03T16:17:29Z" + user: + linux: + gid: 0 + supplementalGroups: + - 0 + uid: 0 + volumeMounts: + - mountPath: /minio-data + name: minio-data + - mountPath: /var/run/secrets/kubernetes.io/serviceaccount + name: kube-api-access-2hjlp + readOnly: true + recursiveReadOnly: Disabled + hostIP: 10.0.10.183 + hostIPs: + - ip: 10.0.10.183 + phase: Running + podIP: 10.0.10.93 + podIPs: + - ip: 10.0.10.93 + qosClass: BestEffort + startTime: "2025-08-03T16:17:18Z" diff --git a/manifests/serving-runtime.yaml b/manifests/serving-runtime.yaml new file mode 100644 index 0000000..5f45810 --- /dev/null +++ b/manifests/serving-runtime.yaml @@ -0,0 +1,23 @@ +apiVersion: serving.kserve.io/v1alpha1 +kind: ServingRuntime +metadata: + name: llm-runtime + namespace: santhosh +spec: + supportedModelFormats: + - name: huggingface + version: "1" + autoSelect: true + containers: + - name: kserve-container + image: kserve/huggingfaceserver:latest-gpu + command: ["python", "-m", "huggingfaceserver"] + resources: + requests: + cpu: "4" + memory: "16Gi" + nvidia.com/gpu: "1" + limits: + cpu: "6" + memory: "24Gi" + nvidia.com/gpu: "1" \ No newline at end of file diff --git a/pipelines/README_incremental.md b/pipelines/README_incremental.md new file mode 100644 index 0000000..ed2ca0d --- /dev/null +++ b/pipelines/README_incremental.md @@ -0,0 +1,271 @@ +# Incremental RAG Pipeline + +This directory contains an incremental version of the RAG (Retrieval-Augmented Generation) pipeline that processes only changed files instead of rebuilding the entire documentation corpus. + +## 📁 Files Overview + +- **`kubeflow-pipeline.py`** - Original full rebuild pipeline +- **`incremental-pipeline.py`** - New incremental pipeline (processes only changed files) +- **`example_incremental_usage.py`** - Usage examples and integration patterns +- **`README_incremental.md`** - This documentation + +## 🔄 How Incremental Updates Work + +The incremental pipeline follows this process: + +1. **Detect Changed Files** - Receive a list of changed file paths +2. **Delete Old Vectors** - Remove existing embeddings for changed files from Milvus +3. **Download Specific Files** - Fetch only the changed files from GitHub +4. **Process & Embed** - Chunk and create embeddings for the new content +5. **Insert New Vectors** - Store updated embeddings in Milvus + +## 🚀 Quick Start + +### 1. Basic Usage + +```python +import json +from incremental_pipeline import github_rag_incremental_pipeline + +# List of changed files +changed_files = [ + "content/en/docs/started/getting-started.md", + "content/en/docs/components/pipelines/overview.md" +] + +# Convert to JSON string (required by pipeline) +changed_files_json = json.dumps(changed_files) + +# Compile pipeline +kfp.compiler.Compiler().compile( + pipeline_func=github_rag_incremental_pipeline, + package_path="github_rag_incremental_pipeline.yaml" +) +``` + +### 2. Pipeline Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `repo_owner` | "kubeflow" | GitHub repository owner | +| `repo_name` | "website" | GitHub repository name | +| `changed_files` | "[]" | JSON string of changed file paths | +| `github_token` | "" | GitHub API token (optional but recommended) | +| `base_url` | "https://www.kubeflow.org/docs" | Base URL for citations | +| `chunk_size` | 1000 | Text chunk size for embeddings | +| `chunk_overlap` | 100 | Overlap between chunks | +| `milvus_host` | "milvus-standalone-final.santhosh.svc.cluster.local" | Milvus server host | +| `milvus_port` | "19530" | Milvus server port | +| `collection_name` | "docs_rag" | Milvus collection name | + +## 🔧 Integration Patterns + +### Git Integration + +Automatically detect changed files from git: + +```python +import subprocess +import json + +def get_changed_files_from_git(since_commit="HEAD~1"): + cmd = ["git", "diff", "--name-only", since_commit, "HEAD"] + result = subprocess.run(cmd, capture_output=True, text=True) + + if result.returncode == 0: + files = [f.strip() for f in result.stdout.split('\n') if f.strip()] + # Filter for documentation files + doc_files = [f for f in files if f.endswith(('.md', '.html'))] + return json.dumps(doc_files) + return "[]" + +# Use in pipeline +changed_files_json = get_changed_files_from_git() +``` + +### Webhook Integration + +Process GitHub webhook payloads: + +```python +def process_github_webhook(webhook_payload): + all_changed_files = [] + + for commit in webhook_payload.get("commits", []): + all_changed_files.extend(commit.get("modified", [])) + all_changed_files.extend(commit.get("added", [])) + # Handle removed files separately if needed + + # Filter for documentation files + doc_files = [f for f in set(all_changed_files) if f.endswith(('.md', '.html'))] + return json.dumps(doc_files) +``` + +### CI/CD Integration + +Example GitHub Actions workflow: + +```yaml +name: Incremental RAG Update +on: + push: + paths: + - 'content/en/docs/**/*.md' + - 'content/en/docs/**/*.html' + +jobs: + update-rag: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + with: + fetch-depth: 2 # Need previous commit for diff + + - name: Get changed files + id: changed-files + run: | + CHANGED=$(git diff --name-only HEAD~1 HEAD | grep -E '\.(md|html)$' | jq -R -s -c 'split("\n")[:-1]') + echo "files=$CHANGED" >> $GITHUB_OUTPUT + + - name: Trigger Kubeflow Pipeline + run: | + # Submit pipeline with changed files + # Implementation depends on your KFP setup +``` + +## 📊 Component Details + +### 1. `download_specific_files` + +Downloads only the specified files from GitHub instead of the entire directory tree. + +**Key Features:** +- Accepts JSON array of file paths +- Skips non-documentation files +- Handles API rate limits gracefully +- Supports both public and private repositories + +### 2. `delete_old_vectors` + +Removes existing vectors for changed files from Milvus before inserting new ones. + +**Key Features:** +- Deletes by `file_unique_id` (format: `{repo_name}:{file_path}`) +- Handles missing collections gracefully +- Provides detailed logging of deletion counts +- Ensures clean updates without duplicates + +### 3. `chunk_and_embed_incremental` + +Same processing logic as the original pipeline but optimized for smaller batches. + +**Key Features:** +- Identical text cleaning and chunking logic +- GPU acceleration when available +- Efficient processing of small file sets +- Detailed chunk statistics + +### 4. `store_milvus_incremental` + +Inserts new vectors without dropping the entire collection. + +**Key Features:** +- Creates collection if it doesn't exist +- Preserves existing data +- Handles indexing efficiently +- Provides collection size statistics + +## 🔍 Monitoring & Debugging + +### Check Pipeline Status + +```python +# After submitting the pipeline +run = client.get_run(run_id) +print(f"Status: {run.run.status}") + +# Get component logs +for component in run.run.pipeline_spec.components: + logs = client.get_run_logs(run_id, component.name) + print(f"{component.name}: {logs}") +``` + +### Verify Milvus Updates + +```python +from pymilvus import connections, Collection + +connections.connect("default", host="your-milvus-host", port="19530") +collection = Collection("docs_rag") + +# Check total count +print(f"Total vectors: {collection.num_entities}") + +# Check specific file +file_id = "website:content/en/docs/your-file.md" +results = collection.query( + expr=f'file_unique_id == "{file_id}"', + output_fields=["chunk_index", "last_updated"] +) +print(f"Chunks for {file_id}: {len(results)}") +``` + +## ⚡ Performance Comparison + +| Metric | Full Pipeline | Incremental Pipeline | +|--------|---------------|---------------------| +| **Typical Files Processed** | ~500-1000 files | 1-10 files | +| **Processing Time** | 15-30 minutes | 1-3 minutes | +| **Resource Usage** | High (full rebuild) | Low (targeted update) | +| **Milvus Operations** | Drop + Recreate | Delete + Insert | +| **Suitable For** | Initial setup, major changes | Regular updates, CI/CD | + +## 🛠️ Troubleshooting + +### Common Issues + +1. **"Collection doesn't exist"** - The incremental pipeline creates the collection if needed +2. **"No files to process"** - Check that file paths are correct and files exist +3. **"GitHub API rate limit"** - Use a GitHub token for higher limits +4. **"Milvus connection failed"** - Verify host, port, and network connectivity + +### Debug Mode + +Add debug logging to components: + +```python +# In any component +import logging +logging.basicConfig(level=logging.DEBUG) +print(f"Debug: Processing {len(file_paths_list)} files") +``` + +## 🔄 Migration from Full Pipeline + +1. **First Time Setup**: Run the full pipeline once to create the initial collection +2. **Switch to Incremental**: Use the incremental pipeline for subsequent updates +3. **Periodic Full Rebuilds**: Optionally run full pipeline monthly/quarterly for cleanup + +## 📈 Future Enhancements + +- **Batch Processing**: Handle large sets of changed files efficiently +- **Retry Logic**: Automatic retry for failed file downloads +- **Metrics Collection**: Detailed performance and success metrics +- **File Deletion Handling**: Remove vectors when files are deleted from repo +- **Multi-Repository Support**: Process changes from multiple repositories + +## 🤝 Contributing + +To improve the incremental pipeline: + +1. Test with your specific repository structure +2. Add error handling for edge cases +3. Optimize for your Milvus configuration +4. Share performance improvements + +## 📞 Support + +For issues or questions: +- Check the troubleshooting section above +- Review component logs in Kubeflow Pipelines UI +- Verify Milvus collection status and connectivity diff --git a/pipelines/example_incremental_usage.py b/pipelines/example_incremental_usage.py new file mode 100644 index 0000000..935eba0 --- /dev/null +++ b/pipelines/example_incremental_usage.py @@ -0,0 +1,188 @@ +#!/usr/bin/env python3 +""" +Example script showing how to trigger the incremental RAG pipeline +with a list of changed files. + +This script demonstrates: +1. How to format the changed files list +2. How to compile and run the incremental pipeline +3. Example integration with git to detect changed files +""" + +import json +import subprocess +import kfp +from incremental_pipeline import github_rag_incremental_pipeline + +def get_changed_files_from_git(repo_path=".", since_commit="HEAD~1"): + """ + Get list of changed files from git diff. + + Args: + repo_path: Path to git repository + since_commit: Compare changes since this commit (default: last commit) + + Returns: + List of changed file paths + """ + try: + # Get changed files between commits + cmd = ["git", "diff", "--name-only", since_commit, "HEAD"] + result = subprocess.run(cmd, cwd=repo_path, capture_output=True, text=True) + + if result.returncode == 0: + files = [f.strip() for f in result.stdout.split('\n') if f.strip()] + # Filter for documentation files only + doc_files = [f for f in files if f.endswith(('.md', '.html'))] + return doc_files + else: + print(f"Git command failed: {result.stderr}") + return [] + except Exception as e: + print(f"Error getting changed files: {e}") + return [] + + +def run_incremental_pipeline_example(): + """ + Example of running the incremental pipeline with specific changed files. + """ + + # Example 1: Manually specified changed files + changed_files = [ + "content/en/docs/started/getting-started.md", + "content/en/docs/components/pipelines/overview.md", + "content/en/docs/external-add-ons/kustomize.md" + ] + + print("=== Manual File List Example ===") + print(f"Changed files: {changed_files}") + + # Convert to JSON string (required by the pipeline) + changed_files_json = json.dumps(changed_files) + print(f"JSON format: {changed_files_json}") + + # Compile the pipeline + kfp.compiler.Compiler().compile( + pipeline_func=github_rag_incremental_pipeline, + package_path="github_rag_incremental_pipeline.yaml" + ) + print("✅ Pipeline compiled to: github_rag_incremental_pipeline.yaml") + + # Example pipeline parameters + pipeline_params = { + 'repo_owner': 'kubeflow', + 'repo_name': 'website', + 'changed_files': changed_files_json, + 'github_token': '', # Add your GitHub token here + 'base_url': 'https://www.kubeflow.org/docs', + 'chunk_size': 1000, + 'chunk_overlap': 100, + 'milvus_host': 'milvus-standalone-final.santhosh.svc.cluster.local', + 'milvus_port': '19530', + 'collection_name': 'docs_rag' + } + + print("\n=== Pipeline Parameters ===") + for key, value in pipeline_params.items(): + if key == 'github_token' and value: + print(f"{key}: {'*' * len(value)}") # Hide token + else: + print(f"{key}: {value}") + + +def run_git_integration_example(): + """ + Example of integrating with git to automatically detect changed files. + """ + print("\n=== Git Integration Example ===") + + # Get changed files from git + changed_files = get_changed_files_from_git() + + if not changed_files: + print("No documentation files changed since last commit.") + return + + print(f"Detected {len(changed_files)} changed documentation files:") + for file in changed_files: + print(f" - {file}") + + # Convert to JSON for pipeline + changed_files_json = json.dumps(changed_files) + + print(f"\nJSON format for pipeline: {changed_files_json}") + + # You would trigger the pipeline here with changed_files_json + print("\n📝 To run the pipeline with these files:") + print("1. Set up your Kubeflow Pipelines client") + print("2. Submit the pipeline with the changed_files parameter") + print("3. Monitor the pipeline execution") + + +def webhook_trigger_example(): + """ + Example of how this could be triggered by a webhook from GitHub. + """ + print("\n=== Webhook Integration Example ===") + + # Simulate webhook payload (GitHub push event) + webhook_payload = { + "commits": [ + { + "modified": [ + "content/en/docs/started/getting-started.md", + "content/en/docs/components/pipelines/overview.md" + ], + "added": [ + "content/en/docs/new-feature.md" + ], + "removed": [ + "content/en/docs/deprecated.md" + ] + } + ] + } + + # Extract all changed files + all_changed_files = [] + for commit in webhook_payload["commits"]: + all_changed_files.extend(commit.get("modified", [])) + all_changed_files.extend(commit.get("added", [])) + # Note: removed files would need special handling to delete from Milvus + + # Remove duplicates and filter for documentation files + changed_files = list(set([f for f in all_changed_files if f.endswith(('.md', '.html'))])) + + print(f"Files to process from webhook: {changed_files}") + + if changed_files: + changed_files_json = json.dumps(changed_files) + print(f"Would trigger pipeline with: {changed_files_json}") + else: + print("No documentation files to process.") + + +def main(): + """ + Run all examples. + """ + print("🚀 Incremental RAG Pipeline Examples") + print("=" * 50) + + # Run examples + run_incremental_pipeline_example() + run_git_integration_example() + webhook_trigger_example() + + print("\n" + "=" * 50) + print("✅ Examples completed!") + print("\n📚 Next steps:") + print("1. Set up your GitHub token") + print("2. Configure your Kubeflow Pipelines environment") + print("3. Set up webhooks or CI/CD integration") + print("4. Test with a small set of changed files") + + +if __name__ == "__main__": + main() diff --git a/pipelines/github_rag_pipeline.yaml b/pipelines/github_rag_pipeline.yaml new file mode 100644 index 0000000..e6c8bc8 --- /dev/null +++ b/pipelines/github_rag_pipeline.yaml @@ -0,0 +1,393 @@ +# PIPELINE DEFINITION +# Name: github-rag-full-build +# Description: RAG pipeline for processing GitHub documentation +# Inputs: +# base_url: str [Default: 'https://www.kubeflow.org/docs'] +# chunk_overlap: int [Default: 100.0] +# chunk_size: int [Default: 1000.0] +# collection_name: str [Default: 'docs_rag'] +# directory_path: str [Default: 'content/en'] +# github_token: str [Default: ''] +# milvus_host: str [Default: 'milvus-standalone-final.santhosh.svc.cluster.local'] +# milvus_port: str [Default: '19530'] +# repo_name: str [Default: 'website'] +# repo_owner: str [Default: 'kubeflow'] +components: + comp-chunk-and-embed: + executorLabel: exec-chunk-and-embed + inputDefinitions: + artifacts: + github_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + base_url: + parameterType: STRING + chunk_overlap: + parameterType: NUMBER_INTEGER + chunk_size: + parameterType: NUMBER_INTEGER + repo_name: + parameterType: STRING + outputDefinitions: + artifacts: + embedded_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-download-github-directory: + executorLabel: exec-download-github-directory + inputDefinitions: + parameters: + directory_path: + parameterType: STRING + github_token: + parameterType: STRING + repo_name: + parameterType: STRING + repo_owner: + parameterType: STRING + outputDefinitions: + artifacts: + github_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + comp-store-milvus: + executorLabel: exec-store-milvus + inputDefinitions: + artifacts: + embedded_data: + artifactType: + schemaTitle: system.Dataset + schemaVersion: 0.0.1 + parameters: + collection_name: + parameterType: STRING + milvus_host: + parameterType: STRING + milvus_port: + parameterType: STRING +deploymentSpec: + executors: + exec-chunk-and-embed: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - chunk_and_embed + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'sentence-transformers'\ + \ 'langchain' && python3 -m pip install --quiet --no-warn-script-location\ + \ 'kfp==2.14.2' '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"\ + 3.9\"' && \"$0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef chunk_and_embed(\n github_data: dsl.Input[dsl.Dataset],\n\ + \ repo_name: str,\n base_url: str,\n chunk_size: int,\n chunk_overlap:\ + \ int,\n embedded_data: dsl.Output[dsl.Dataset]\n):\n import json\n\ + \ import os\n import re\n import torch\n from sentence_transformers\ + \ import SentenceTransformer\n from langchain.text_splitter import RecursiveCharacterTextSplitter\n\ + \n device = 'cuda' if torch.cuda.is_available() else 'cpu'\n model\ + \ = SentenceTransformer('sentence-transformers/all-mpnet-base-v2', device=device)\n\ + \ print(f\"Model loaded on {device}\")\n\n records = []\n\n with\ + \ open(github_data.path, 'r', encoding='utf-8') as f:\n for line\ + \ in f:\n file_data = json.loads(line)\n content =\ + \ file_data['content']\n\n # AGGRESSIVE CLEANING FOR BETTER EMBEDDINGS\n\ + \n # Remove Hugo frontmatter (both --- and +++ styles)\n \ + \ content = re.sub(r'^\\s*[+\\-]{3,}.*?[+\\-]{3,}\\s*', '', content,\ + \ flags=re.DOTALL | re.MULTILINE)\n\n # Remove Hugo template\ + \ syntax\n content = re.sub(r'\\{\\{.*?\\}\\}', '', content,\ + \ flags=re.DOTALL)\n\n # Remove HTML comments and tags\n \ + \ content = re.sub(r'', '', content, flags=re.DOTALL)\n\ + \ content = re.sub(r'<[^>]+>', ' ', content)\n\n #\ + \ Remove navigation/menu artifacts\n content = re.sub(r'\\b(Get\ + \ Started|Contribute|GenAI|Home|Menu|Navigation)\\b', '', content, flags=re.IGNORECASE)\n\ + \n # Clean up URLs and links\n content = re.sub(r'https?://[^\\\ + s]+', '', content)\n content = re.sub(r'\\[([^\\]]+)\\]\\([^\\\ + )]+\\)', r'\\1', content) # Convert [text](url) to text\n\n \ + \ # Remove excessive whitespace and normalize\n content = re.sub(r'\\\ + s+', ' ', content) # Multiple spaces to single\n content = re.sub(r'\\\ + n\\s*\\n\\s*\\n+', '\\n\\n', content) # Multiple newlines to double\n \ + \ content = content.strip()\n\n # Skip files that are\ + \ too short after cleaning\n if len(content) < 50:\n \ + \ print(f\"Skipping file after cleaning: {file_data['path']} ({len(content)}\ + \ chars)\")\n continue\n\n # Build citation URL\ + \ (same as before)\n path_parts = file_data['path'].split('/')\n\ + \ if 'content/en/docs' in file_data['path']:\n \ + \ docs_index = path_parts.index('docs')\n url_path = '/'.join(path_parts[docs_index+1:])\n\ + \ url_path = os.path.splitext(url_path)[0]\n \ + \ citation_url = f\"{base_url}/{url_path}\"\n else:\n \ + \ citation_url = f\"{base_url}/{file_data['path']}\"\n\n \ + \ file_unique_id = f\"{repo_name}:{file_data['path']}\"\n\n \ + \ # Create splitter\n text_splitter = RecursiveCharacterTextSplitter(\n\ + \ chunk_size=chunk_size,\n chunk_overlap=chunk_overlap,\n\ + \ length_function=len,\n separators=[\"\\\ + n\\n\", \"\\n\", \". \", \" \", \"\"]\n )\n\n # Split\ + \ into chunks\n chunks = text_splitter.split_text(content)\n\n\ + \ print(f\"File: {file_data['path']} -> {len(chunks)} chunks\ + \ (avg: {sum(len(c) for c in chunks)/len(chunks):.0f} chars)\")\n\n \ + \ # Create embeddings\n for chunk_idx, chunk in enumerate(chunks):\n\ + \ embedding = model.encode(chunk).tolist()\n \ + \ records.append({\n 'file_unique_id': file_unique_id,\n\ + \ 'repo_name': repo_name,\n 'file_path':\ + \ file_data['path'],\n 'file_name': file_data['file_name'],\n\ + \ 'citation_url': citation_url[:1024],\n \ + \ 'chunk_index': chunk_idx,\n 'content_text':\ + \ chunk[:2000],\n 'embedding': embedding\n \ + \ })\n\n print(f\"Created {len(records)} total chunks\")\n\n \ + \ with open(embedded_data.path, 'w', encoding='utf-8') as f:\n for\ + \ record in records:\n f.write(json.dumps(record, ensure_ascii=False)\ + \ + '\\n')\n\n" + image: pytorch/pytorch:2.3.0-cuda12.1-cudnn8-runtime + exec-download-github-directory: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - download_github_directory + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'requests' 'beautifulsoup4'\ + \ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef download_github_directory(\n repo_owner: str,\n repo_name:\ + \ str,\n directory_path: str,\n github_token: str,\n github_data:\ + \ dsl.Output[dsl.Dataset]\n):\n import requests\n import json\n \ + \ import base64\n from bs4 import BeautifulSoup\n\n headers = {\"\ + Authorization\": f\"token {github_token}\"} if github_token else {}\n \ + \ api_url = f\"https://api.github.com/repos/{repo_owner}/{repo_name}/contents/{directory_path}\"\ + \n\n def get_files_recursive(url):\n files = []\n try:\n\ + \ response = requests.get(url, headers=headers)\n \ + \ response.raise_for_status()\n items = response.json()\n\n \ + \ for item in items:\n if item['type'] == 'file'\ + \ and (item['name'].endswith('.md') or item['name'].endswith('.html')):\n\ + \ file_response = requests.get(item['url'], headers=headers)\n\ + \ file_response.raise_for_status()\n \ + \ file_data = file_response.json()\n content = base64.b64decode(file_data['content']).decode('utf-8')\n\ + \n # Extract text from HTML files\n \ + \ if item['name'].endswith('.html'):\n soup = BeautifulSoup(content,\ + \ 'html.parser')\n content = soup.get_text(separator='\ + \ ', strip=True)\n\n files.append({\n \ + \ 'path': item['path'],\n 'content': content,\n\ + \ 'file_name': item['name']\n \ + \ })\n elif item['type'] == 'dir':\n files.extend(get_files_recursive(item['url']))\n\ + \ except Exception as e:\n print(f\"Error fetching {url}:\ + \ {e}\")\n return files\n\n files = get_files_recursive(api_url)\n\ + \ print(f\"Downloaded {len(files)} files\")\n\n with open(github_data.path,\ + \ 'w', encoding='utf-8') as f:\n for file_data in files:\n \ + \ f.write(json.dumps(file_data, ensure_ascii=False) + '\\n')\n\n" + image: python:3.9 + exec-store-milvus: + container: + args: + - --executor_input + - '{{$}}' + - --function_to_execute + - store_milvus + command: + - sh + - -c + - "\nif ! [ -x \"$(command -v pip)\" ]; then\n python3 -m ensurepip ||\ + \ python3 -m ensurepip --user || apt-get install python3-pip\nfi\n\nPIP_DISABLE_PIP_VERSION_CHECK=1\ + \ python3 -m pip install --quiet --no-warn-script-location 'pymilvus' 'numpy'\ + \ && python3 -m pip install --quiet --no-warn-script-location 'kfp==2.14.2'\ + \ '--no-deps' 'typing-extensions>=3.7.4,<5; python_version<\"3.9\"' && \"\ + $0\" \"$@\"\n" + - sh + - -ec + - 'program_path=$(mktemp -d) + + + printf "%s" "$0" > "$program_path/ephemeral_component.py" + + _KFP_RUNTIME=true python3 -m kfp.dsl.executor_main --component_module_path "$program_path/ephemeral_component.py" "$@" + + ' + - "\nimport kfp\nfrom kfp import dsl\nfrom kfp.dsl import *\nfrom typing import\ + \ *\n\ndef store_milvus(\n embedded_data: dsl.Input[dsl.Dataset],\n \ + \ milvus_host: str,\n milvus_port: str,\n collection_name: str\n\ + ):\n from pymilvus import connections, utility, FieldSchema, CollectionSchema,\ + \ DataType, Collection\n import json\n from datetime import datetime\n\ + \n connections.connect(\"default\", host=milvus_host, port=milvus_port)\n\ + \n # DROP existing collection to fix schema mismatch\n if utility.has_collection(collection_name):\n\ + \ utility.drop_collection(collection_name)\n print(f\"Dropped\ + \ existing collection: {collection_name}\")\n\n # Enhanced schema with\ + \ 768 dimensions\n fields = [\n FieldSchema(name=\"id\", dtype=DataType.INT64,\ + \ is_primary=True, auto_id=True),\n FieldSchema(name=\"file_unique_id\"\ + , dtype=DataType.VARCHAR, max_length=512),\n FieldSchema(name=\"\ + repo_name\", dtype=DataType.VARCHAR, max_length=256),\n FieldSchema(name=\"\ + file_path\", dtype=DataType.VARCHAR, max_length=512),\n FieldSchema(name=\"\ + file_name\", dtype=DataType.VARCHAR, max_length=256),\n FieldSchema(name=\"\ + citation_url\", dtype=DataType.VARCHAR, max_length=1024),\n FieldSchema(name=\"\ + chunk_index\", dtype=DataType.INT64),\n FieldSchema(name=\"content_text\"\ + , dtype=DataType.VARCHAR, max_length=2000),\n FieldSchema(name=\"\ + vector\", dtype=DataType.FLOAT_VECTOR, dim=768), # Updated for all-mpnet-base-v2\n\ + \ FieldSchema(name=\"last_updated\", dtype=DataType.INT64)\n ]\n\ + \n # Create new collection with correct schema\n schema = CollectionSchema(fields,\ + \ \"RAG collection for documentation\")\n collection = Collection(collection_name,\ + \ schema)\n print(f\"Created new collection: {collection_name}\")\n\n\ + \ # Rest of your existing code remains the same...\n records = []\n\ + \ timestamp = int(datetime.now().timestamp())\n\n with open(embedded_data.path,\ + \ 'r', encoding='utf-8') as f:\n for line in f:\n record\ + \ = json.loads(line)\n records.append({\n \"file_unique_id\"\ + : record[\"file_unique_id\"],\n \"repo_name\": record[\"\ + repo_name\"],\n \"file_path\": record[\"file_path\"],\n \ + \ \"file_name\": record[\"file_name\"],\n \"\ + citation_url\": record[\"citation_url\"],\n \"chunk_index\"\ + : record[\"chunk_index\"],\n \"content_text\": record[\"\ + content_text\"],\n \"vector\": record[\"embedding\"],\n \ + \ \"last_updated\": timestamp\n })\n\n if records:\n\ + \ batch_size = 1000\n for i in range(0, len(records), batch_size):\n\ + \ batch = records[i:i + batch_size]\n collection.insert(batch)\n\ + \n collection.flush()\n\n # Create index\n index_params\ + \ = {\n \"metric_type\": \"COSINE\",\n \"index_type\"\ + : \"IVF_FLAT\", \n \"params\": {\"nlist\": min(1024, len(records))}\n\ + \ }\n collection.create_index(\"vector\", index_params)\n\ + \ collection.load()\n print(f\"\u2705 Inserted {len(records)}\ + \ records. Total: {collection.num_entities}\")\n\n" + image: python:3.9 +pipelineInfo: + description: RAG pipeline for processing GitHub documentation + name: github-rag-full-build +root: + dag: + tasks: + chunk-and-embed: + cachingOptions: + enableCache: true + componentRef: + name: comp-chunk-and-embed + dependentTasks: + - download-github-directory + inputs: + artifacts: + github_data: + taskOutputArtifact: + outputArtifactKey: github_data + producerTask: download-github-directory + parameters: + base_url: + componentInputParameter: base_url + chunk_overlap: + componentInputParameter: chunk_overlap + chunk_size: + componentInputParameter: chunk_size + repo_name: + componentInputParameter: repo_name + taskInfo: + name: chunk-and-embed + download-github-directory: + cachingOptions: + enableCache: true + componentRef: + name: comp-download-github-directory + inputs: + parameters: + directory_path: + componentInputParameter: directory_path + github_token: + componentInputParameter: github_token + repo_name: + componentInputParameter: repo_name + repo_owner: + componentInputParameter: repo_owner + taskInfo: + name: download-github-directory + store-milvus: + cachingOptions: + enableCache: true + componentRef: + name: comp-store-milvus + dependentTasks: + - chunk-and-embed + inputs: + artifacts: + embedded_data: + taskOutputArtifact: + outputArtifactKey: embedded_data + producerTask: chunk-and-embed + parameters: + collection_name: + componentInputParameter: collection_name + milvus_host: + componentInputParameter: milvus_host + milvus_port: + componentInputParameter: milvus_port + taskInfo: + name: store-milvus + inputDefinitions: + parameters: + base_url: + defaultValue: https://www.kubeflow.org/docs + isOptional: true + parameterType: STRING + chunk_overlap: + defaultValue: 100.0 + isOptional: true + parameterType: NUMBER_INTEGER + chunk_size: + defaultValue: 1000.0 + isOptional: true + parameterType: NUMBER_INTEGER + collection_name: + defaultValue: docs_rag + isOptional: true + parameterType: STRING + directory_path: + defaultValue: content/en + isOptional: true + parameterType: STRING + github_token: + defaultValue: '' + isOptional: true + parameterType: STRING + milvus_host: + defaultValue: milvus-standalone-final.santhosh.svc.cluster.local + isOptional: true + parameterType: STRING + milvus_port: + defaultValue: '19530' + isOptional: true + parameterType: STRING + repo_name: + defaultValue: website + isOptional: true + parameterType: STRING + repo_owner: + defaultValue: kubeflow + isOptional: true + parameterType: STRING +schemaVersion: 2.1.0 +sdkVersion: kfp-2.14.2 diff --git a/pipelines/incremental-pipeline.py b/pipelines/incremental-pipeline.py new file mode 100644 index 0000000..a545f6c --- /dev/null +++ b/pipelines/incremental-pipeline.py @@ -0,0 +1,401 @@ +import kfp +from kfp import dsl +from kfp.dsl import * +from typing import * + +@dsl.component( + base_image="python:3.9", + packages_to_install=["requests", "beautifulsoup4"] +) +def download_specific_files( + repo_owner: str, + repo_name: str, + file_paths: str, # JSON string of file paths list + github_token: str, + github_data: dsl.Output[dsl.Dataset] +): + import requests + import json + import base64 + from bs4 import BeautifulSoup + + headers = {"Authorization": f"token {github_token}"} if github_token else {} + + # Parse the file paths from JSON string + try: + file_paths_list = json.loads(file_paths) + except json.JSONDecodeError: + print(f"Error: Invalid JSON in file_paths: {file_paths}") + file_paths_list = [] + + print(f"Processing {len(file_paths_list)} changed files") + + files = [] + + for file_path in file_paths_list: + # Skip non-documentation files + if not (file_path.endswith('.md') or file_path.endswith('.html')): + print(f"Skipping non-doc file: {file_path}") + continue + + try: + # Get file content from GitHub API + api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/contents/{file_path}" + response = requests.get(api_url, headers=headers) + response.raise_for_status() + file_data = response.json() + + # Decode content + content = base64.b64decode(file_data['content']).decode('utf-8') + + # Extract text from HTML files + if file_path.endswith('.html'): + soup = BeautifulSoup(content, 'html.parser') + content = soup.get_text(separator=' ', strip=True) + + files.append({ + 'path': file_path, + 'content': content, + 'file_name': file_data['name'] + }) + print(f"Downloaded: {file_path}") + + except Exception as e: + print(f"Error downloading {file_path}: {e}") + continue + + print(f"Successfully downloaded {len(files)} files") + + # Save to output dataset + with open(github_data.path, 'w', encoding='utf-8') as f: + for file_data in files: + f.write(json.dumps(file_data, ensure_ascii=False) + '\n') + + +@dsl.component( + base_image="python:3.9", + packages_to_install=["pymilvus"] +) +def delete_old_vectors( + file_paths: str, # JSON string of file paths list + repo_name: str, + milvus_host: str, + milvus_port: str, + collection_name: str +): + from pymilvus import connections, Collection + import json + + # Connect to Milvus + connections.connect("default", host=milvus_host, port=milvus_port) + + # Parse file paths + try: + file_paths_list = json.loads(file_paths) + except json.JSONDecodeError: + print(f"Error: Invalid JSON in file_paths: {file_paths}") + return + + # Check if collection exists + try: + collection = Collection(collection_name) + collection.load() + print(f"Connected to collection: {collection_name}") + + # Delete old vectors for each changed file + deleted_count = 0 + for file_path in file_paths_list: + file_unique_id = f"{repo_name}:{file_path}" + + # Delete vectors with matching file_unique_id + expr = f'file_unique_id == "{file_unique_id}"' + try: + # Get count before deletion for logging + query_result = collection.query( + expr=expr, + output_fields=["id"], + limit=10000 + ) + count_before = len(query_result) + + if count_before > 0: + # Delete the vectors + collection.delete(expr) + collection.flush() + deleted_count += count_before + print(f"Deleted {count_before} vectors for file: {file_path}") + else: + print(f"No existing vectors found for file: {file_path}") + + except Exception as e: + print(f"Error deleting vectors for {file_path}: {e}") + continue + + print(f"✅ Total deleted vectors: {deleted_count}") + + except Exception as e: + print(f"Error connecting to collection {collection_name}: {e}") + print("Collection might not exist yet - this is okay for first run") + + +@dsl.component( + base_image="pytorch/pytorch:2.3.0-cuda12.1-cudnn8-runtime", + packages_to_install=["sentence-transformers", "langchain"] +) +def chunk_and_embed_incremental( + github_data: dsl.Input[dsl.Dataset], + repo_name: str, + base_url: str, + chunk_size: int, + chunk_overlap: int, + embedded_data: dsl.Output[dsl.Dataset] +): + import json + import os + import re + import torch + from sentence_transformers import SentenceTransformer + from langchain.text_splitter import RecursiveCharacterTextSplitter + + device = 'cuda' if torch.cuda.is_available() else 'cpu' + model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2', device=device) + print(f"Model loaded on {device}") + + records = [] + + with open(github_data.path, 'r', encoding='utf-8') as f: + for line in f: + file_data = json.loads(line) + content = file_data['content'] + + # AGGRESSIVE CLEANING FOR BETTER EMBEDDINGS (same as original) + + # Remove Hugo frontmatter (both --- and +++ styles) + content = re.sub(r'^\s*[+\-]{3,}.*?[+\-]{3,}\s*', '', content, flags=re.DOTALL | re.MULTILINE) + + # Remove Hugo template syntax + content = re.sub(r'\{\{.*?\}\}', '', content, flags=re.DOTALL) + + # Remove HTML comments and tags + content = re.sub(r'', '', content, flags=re.DOTALL) + content = re.sub(r'<[^>]+>', ' ', content) + + # Remove navigation/menu artifacts + content = re.sub(r'\b(Get Started|Contribute|GenAI|Home|Menu|Navigation)\b', '', content, flags=re.IGNORECASE) + + # Clean up URLs and links + content = re.sub(r'https?://[^\s]+', '', content) + content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', content) # Convert [text](url) to text + + # Remove excessive whitespace and normalize + content = re.sub(r'\s+', ' ', content) # Multiple spaces to single + content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content) # Multiple newlines to double + content = content.strip() + + # Skip files that are too short after cleaning + if len(content) < 50: + print(f"Skipping file after cleaning: {file_data['path']} ({len(content)} chars)") + continue + + # Build citation URL + path_parts = file_data['path'].split('/') + if 'content/en/docs' in file_data['path']: + docs_index = path_parts.index('docs') + url_path = '/'.join(path_parts[docs_index+1:]) + url_path = os.path.splitext(url_path)[0] + citation_url = f"{base_url}/{url_path}" + else: + citation_url = f"{base_url}/{file_data['path']}" + + file_unique_id = f"{repo_name}:{file_data['path']}" + + # Create splitter + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + length_function=len, + separators=["\n\n", "\n", ". ", " ", ""] + ) + + # Split into chunks + chunks = text_splitter.split_text(content) + + print(f"File: {file_data['path']} -> {len(chunks)} chunks (avg: {sum(len(c) for c in chunks)/len(chunks):.0f} chars)") + + # Create embeddings + for chunk_idx, chunk in enumerate(chunks): + embedding = model.encode(chunk).tolist() + records.append({ + 'file_unique_id': file_unique_id, + 'repo_name': repo_name, + 'file_path': file_data['path'], + 'file_name': file_data['file_name'], + 'citation_url': citation_url[:1024], + 'chunk_index': chunk_idx, + 'content_text': chunk[:2000], + 'embedding': embedding + }) + + print(f"Created {len(records)} total chunks for incremental update") + + with open(embedded_data.path, 'w', encoding='utf-8') as f: + for record in records: + f.write(json.dumps(record, ensure_ascii=False) + '\n') + + +@dsl.component( + base_image="python:3.9", + packages_to_install=["pymilvus", "numpy"] +) +def store_milvus_incremental( + embedded_data: dsl.Input[dsl.Dataset], + milvus_host: str, + milvus_port: str, + collection_name: str +): + from pymilvus import connections, utility, FieldSchema, CollectionSchema, DataType, Collection + import json + from datetime import datetime + + connections.connect("default", host=milvus_host, port=milvus_port) + + # Check if collection exists, if not create it + if not utility.has_collection(collection_name): + print(f"Collection {collection_name} doesn't exist, creating it...") + + # Enhanced schema with 768 dimensions + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), + FieldSchema(name="file_unique_id", dtype=DataType.VARCHAR, max_length=512), + FieldSchema(name="repo_name", dtype=DataType.VARCHAR, max_length=256), + FieldSchema(name="file_path", dtype=DataType.VARCHAR, max_length=512), + FieldSchema(name="file_name", dtype=DataType.VARCHAR, max_length=256), + FieldSchema(name="citation_url", dtype=DataType.VARCHAR, max_length=1024), + FieldSchema(name="chunk_index", dtype=DataType.INT64), + FieldSchema(name="content_text", dtype=DataType.VARCHAR, max_length=2000), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768), + FieldSchema(name="last_updated", dtype=DataType.INT64) + ] + + schema = CollectionSchema(fields, "RAG collection for documentation") + collection = Collection(collection_name, schema) + print(f"Created new collection: {collection_name}") + else: + collection = Collection(collection_name) + print(f"Using existing collection: {collection_name}") + + # Load collection + collection.load() + + # Prepare records for insertion + records = [] + timestamp = int(datetime.now().timestamp()) + + with open(embedded_data.path, 'r', encoding='utf-8') as f: + for line in f: + record = json.loads(line) + records.append({ + "file_unique_id": record["file_unique_id"], + "repo_name": record["repo_name"], + "file_path": record["file_path"], + "file_name": record["file_name"], + "citation_url": record["citation_url"], + "chunk_index": record["chunk_index"], + "content_text": record["content_text"], + "vector": record["embedding"], + "last_updated": timestamp + }) + + if records: + # Insert new records + batch_size = 1000 + for i in range(0, len(records), batch_size): + batch = records[i:i + batch_size] + collection.insert(batch) + + collection.flush() + + # Create/update index if needed + try: + # Check if index exists + index_info = collection.index() + if not index_info: + print("Creating index...") + index_params = { + "metric_type": "COSINE", + "index_type": "IVF_FLAT", + "params": {"nlist": min(1024, max(100, len(records)))} + } + collection.create_index("vector", index_params) + collection.load() + print("Index created successfully") + else: + print("Index already exists") + except Exception as e: + print(f"Index operation result: {e}") + + print(f"✅ Inserted {len(records)} new records. Total collection size: {collection.num_entities}") + else: + print("No records to insert") + + +@dsl.pipeline( + name="github-rag-incremental-build", + description="Incremental RAG pipeline for processing only changed GitHub files" +) +def github_rag_incremental_pipeline( + repo_owner: str = "kubeflow", + repo_name: str = "website", + changed_files: str = '[]', # JSON string of changed file paths + github_token: str = "", + base_url: str = "https://www.kubeflow.org/docs", + chunk_size: int = 1000, + chunk_overlap: int = 100, + milvus_host: str = "milvus-standalone-final.santhosh.svc.cluster.local", + milvus_port: str = "19530", + collection_name: str = "docs_rag" +): + # Step 1: Delete old vectors for changed files + delete_task = delete_old_vectors( + file_paths=changed_files, + repo_name=repo_name, + milvus_host=milvus_host, + milvus_port=milvus_port, + collection_name=collection_name + ) + + # Step 2: Download only the changed files + download_task = download_specific_files( + repo_owner=repo_owner, + repo_name=repo_name, + file_paths=changed_files, + github_token=github_token + ) + + # Step 3: Chunk and embed the changed files + chunk_task = chunk_and_embed_incremental( + github_data=download_task.outputs["github_data"], + repo_name=repo_name, + base_url=base_url, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap + ) + + # Step 4: Store new vectors in Milvus (after deletion is complete) + store_task = store_milvus_incremental( + embedded_data=chunk_task.outputs["embedded_data"], + milvus_host=milvus_host, + milvus_port=milvus_port, + collection_name=collection_name + ) + + # Ensure deletion happens before insertion + store_task.after(delete_task) + + +if __name__ == "__main__": + # Compile the pipeline + kfp.compiler.Compiler().compile( + pipeline_func=github_rag_incremental_pipeline, + package_path="github_rag_incremental_pipeline.yaml" + ) diff --git a/pipelines/kubeflow-pipeline.py b/pipelines/kubeflow-pipeline.py new file mode 100644 index 0000000..96fd83f --- /dev/null +++ b/pipelines/kubeflow-pipeline.py @@ -0,0 +1,292 @@ +import kfp +from kfp import dsl +from kfp.dsl import * +from typing import * + +@dsl.component( + base_image="python:3.9", + packages_to_install=["requests", "beautifulsoup4"] +) +def download_github_directory( + repo_owner: str, + repo_name: str, + directory_path: str, + github_token: str, + github_data: dsl.Output[dsl.Dataset] +): + import requests + import json + import base64 + from bs4 import BeautifulSoup + + headers = {"Authorization": f"token {github_token}"} if github_token else {} + api_url = f"https://api.github.com/repos/{repo_owner}/{repo_name}/contents/{directory_path}" + + def get_files_recursive(url): + files = [] + try: + response = requests.get(url, headers=headers) + response.raise_for_status() + items = response.json() + + for item in items: + if item['type'] == 'file' and (item['name'].endswith('.md') or item['name'].endswith('.html')): + file_response = requests.get(item['url'], headers=headers) + file_response.raise_for_status() + file_data = file_response.json() + content = base64.b64decode(file_data['content']).decode('utf-8') + + # Extract text from HTML files + if item['name'].endswith('.html'): + soup = BeautifulSoup(content, 'html.parser') + content = soup.get_text(separator=' ', strip=True) + + files.append({ + 'path': item['path'], + 'content': content, + 'file_name': item['name'] + }) + elif item['type'] == 'dir': + files.extend(get_files_recursive(item['url'])) + except Exception as e: + print(f"Error fetching {url}: {e}") + return files + + files = get_files_recursive(api_url) + print(f"Downloaded {len(files)} files") + + with open(github_data.path, 'w', encoding='utf-8') as f: + for file_data in files: + f.write(json.dumps(file_data, ensure_ascii=False) + '\n') + + +@dsl.component( + base_image="pytorch/pytorch:2.3.0-cuda12.1-cudnn8-runtime", + packages_to_install=["sentence-transformers", "langchain"] +) +def chunk_and_embed( + github_data: dsl.Input[dsl.Dataset], + repo_name: str, + base_url: str, + chunk_size: int, + chunk_overlap: int, + embedded_data: dsl.Output[dsl.Dataset] +): + import json + import os + import re + import torch + from sentence_transformers import SentenceTransformer + from langchain.text_splitter import RecursiveCharacterTextSplitter + + device = 'cuda' if torch.cuda.is_available() else 'cpu' + model = SentenceTransformer('sentence-transformers/all-mpnet-base-v2', device=device) + print(f"Model loaded on {device}") + + records = [] + + with open(github_data.path, 'r', encoding='utf-8') as f: + for line in f: + file_data = json.loads(line) + content = file_data['content'] + + # AGGRESSIVE CLEANING FOR BETTER EMBEDDINGS + + # Remove Hugo frontmatter (both --- and +++ styles) + content = re.sub(r'^\s*[+\-]{3,}.*?[+\-]{3,}\s*', '', content, flags=re.DOTALL | re.MULTILINE) + + # Remove Hugo template syntax + content = re.sub(r'\{\{.*?\}\}', '', content, flags=re.DOTALL) + + # Remove HTML comments and tags + content = re.sub(r'', '', content, flags=re.DOTALL) + content = re.sub(r'<[^>]+>', ' ', content) + + # Remove navigation/menu artifacts + content = re.sub(r'\b(Get Started|Contribute|GenAI|Home|Menu|Navigation)\b', '', content, flags=re.IGNORECASE) + + # Clean up URLs and links + content = re.sub(r'https?://[^\s]+', '', content) + content = re.sub(r'\[([^\]]+)\]\([^\)]+\)', r'\1', content) # Convert [text](url) to text + + # Remove excessive whitespace and normalize + content = re.sub(r'\s+', ' ', content) # Multiple spaces to single + content = re.sub(r'\n\s*\n\s*\n+', '\n\n', content) # Multiple newlines to double + content = content.strip() + + # Skip files that are too short after cleaning + if len(content) < 50: + print(f"Skipping file after cleaning: {file_data['path']} ({len(content)} chars)") + continue + + # Build citation URL (same as before) + path_parts = file_data['path'].split('/') + if 'content/en/docs' in file_data['path']: + docs_index = path_parts.index('docs') + url_path = '/'.join(path_parts[docs_index+1:]) + url_path = os.path.splitext(url_path)[0] + citation_url = f"{base_url}/{url_path}" + else: + citation_url = f"{base_url}/{file_data['path']}" + + file_unique_id = f"{repo_name}:{file_data['path']}" + + # Create splitter + text_splitter = RecursiveCharacterTextSplitter( + chunk_size=chunk_size, + chunk_overlap=chunk_overlap, + length_function=len, + separators=["\n\n", "\n", ". ", " ", ""] + ) + + # Split into chunks + chunks = text_splitter.split_text(content) + + print(f"File: {file_data['path']} -> {len(chunks)} chunks (avg: {sum(len(c) for c in chunks)/len(chunks):.0f} chars)") + + # Create embeddings + for chunk_idx, chunk in enumerate(chunks): + embedding = model.encode(chunk).tolist() + records.append({ + 'file_unique_id': file_unique_id, + 'repo_name': repo_name, + 'file_path': file_data['path'], + 'file_name': file_data['file_name'], + 'citation_url': citation_url[:1024], + 'chunk_index': chunk_idx, + 'content_text': chunk[:2000], + 'embedding': embedding + }) + + print(f"Created {len(records)} total chunks") + + with open(embedded_data.path, 'w', encoding='utf-8') as f: + for record in records: + f.write(json.dumps(record, ensure_ascii=False) + '\n') + + +@dsl.component( + base_image="python:3.9", + packages_to_install=["pymilvus", "numpy"] +) +def store_milvus( + embedded_data: dsl.Input[dsl.Dataset], + milvus_host: str, + milvus_port: str, + collection_name: str +): + from pymilvus import connections, utility, FieldSchema, CollectionSchema, DataType, Collection + import json + from datetime import datetime + + connections.connect("default", host=milvus_host, port=milvus_port) + + # DROP existing collection to fix schema mismatch + if utility.has_collection(collection_name): + utility.drop_collection(collection_name) + print(f"Dropped existing collection: {collection_name}") + + # Enhanced schema with 768 dimensions + fields = [ + FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=True), + FieldSchema(name="file_unique_id", dtype=DataType.VARCHAR, max_length=512), + FieldSchema(name="repo_name", dtype=DataType.VARCHAR, max_length=256), + FieldSchema(name="file_path", dtype=DataType.VARCHAR, max_length=512), + FieldSchema(name="file_name", dtype=DataType.VARCHAR, max_length=256), + FieldSchema(name="citation_url", dtype=DataType.VARCHAR, max_length=1024), + FieldSchema(name="chunk_index", dtype=DataType.INT64), + FieldSchema(name="content_text", dtype=DataType.VARCHAR, max_length=2000), + FieldSchema(name="vector", dtype=DataType.FLOAT_VECTOR, dim=768), # Updated for all-mpnet-base-v2 + FieldSchema(name="last_updated", dtype=DataType.INT64) + ] + + # Create new collection with correct schema + schema = CollectionSchema(fields, "RAG collection for documentation") + collection = Collection(collection_name, schema) + print(f"Created new collection: {collection_name}") + + # Rest of your existing code remains the same... + records = [] + timestamp = int(datetime.now().timestamp()) + + with open(embedded_data.path, 'r', encoding='utf-8') as f: + for line in f: + record = json.loads(line) + records.append({ + "file_unique_id": record["file_unique_id"], + "repo_name": record["repo_name"], + "file_path": record["file_path"], + "file_name": record["file_name"], + "citation_url": record["citation_url"], + "chunk_index": record["chunk_index"], + "content_text": record["content_text"], + "vector": record["embedding"], + "last_updated": timestamp + }) + + if records: + batch_size = 1000 + for i in range(0, len(records), batch_size): + batch = records[i:i + batch_size] + collection.insert(batch) + + collection.flush() + + # Create index + index_params = { + "metric_type": "COSINE", + "index_type": "IVF_FLAT", + "params": {"nlist": min(1024, len(records))} + } + collection.create_index("vector", index_params) + collection.load() + print(f"✅ Inserted {len(records)} records. Total: {collection.num_entities}") + + +@dsl.pipeline( + name="github-rag-full-build", + description="RAG pipeline for processing GitHub documentation" +) +def github_rag_pipeline( + repo_owner: str = "kubeflow", + repo_name: str = "website", + directory_path: str = "content/en", + github_token: str = "", + base_url: str = "https://www.kubeflow.org/docs", + chunk_size: int = 1000, + chunk_overlap: int = 100, + milvus_host: str = "milvus-standalone-final.santhosh.svc.cluster.local", + milvus_port: str = "19530", + collection_name: str = "docs_rag" +): + # Download GitHub directory + download_task = download_github_directory( + repo_owner=repo_owner, + repo_name=repo_name, + directory_path=directory_path, + github_token=github_token + ) + + # Chunk and embed the content + chunk_task = chunk_and_embed( + github_data=download_task.outputs["github_data"], + repo_name=repo_name, + base_url=base_url, + chunk_size=chunk_size, + chunk_overlap=chunk_overlap + ) + + # Store in Milvus + store_task = store_milvus( + embedded_data=chunk_task.outputs["embedded_data"], + milvus_host=milvus_host, + milvus_port=milvus_port, + collection_name=collection_name + ) + +if __name__ == "__main__": + # Compile the pipeline + kfp.compiler.Compiler().compile( + pipeline_func=github_rag_pipeline, + package_path="github_rag_pipeline.yaml" + ) \ No newline at end of file diff --git a/server/Dockerfile b/server/Dockerfile new file mode 100644 index 0000000..0eb7d80 --- /dev/null +++ b/server/Dockerfile @@ -0,0 +1,29 @@ +FROM python:3.11-slim + +# Create non-root user +RUN useradd -m -u 1000 appuser +WORKDIR /app + +# Install deps +COPY requirements.txt ./ +RUN pip install --no-cache-dir -r requirements.txt + +# Writable caches for HF/Transformers/Sentence-Transformers +ENV HF_HOME=/home/appuser/.cache/huggingface \ + TRANSFORMERS_CACHE=/home/appuser/.cache/huggingface \ + SENTENCE_TRANSFORMERS_HOME=/home/appuser/.cache/sentence_transformers \ + XDG_CACHE_HOME=/home/appuser/.cache \ + PORT=8000 + +# Ensure directories exist and owned by appuser +RUN mkdir -p $HF_HOME $SENTENCE_TRANSFORMERS_HOME $XDG_CACHE_HOME && \ + chown -R appuser:appuser /home/appuser + +# Switch to non-root before running the app +USER appuser + +# App +COPY app.py /app/ + +EXPOSE 8000 +CMD ["python", "-u", "app.py"] diff --git a/server/app.py b/server/app.py new file mode 100644 index 0000000..734ac96 --- /dev/null +++ b/server/app.py @@ -0,0 +1,454 @@ +import os +import json +import asyncio +import httpx +import websockets +from websockets.server import serve +from websockets.exceptions import ConnectionClosedError +import logging +from typing import Dict, Any, List +from sentence_transformers import SentenceTransformer +from pymilvus import connections, Collection + +# Config +KSERVE_URL = os.getenv("KSERVE_URL", "http://llama.santhosh.svc.cluster.local/openai/v1/chat/completions") +MODEL = os.getenv("MODEL", "llama3.1-8B") +PORT = int(os.getenv("PORT", "8000")) + +# Milvus Config +MILVUS_HOST = os.getenv("MILVUS_HOST", "milvus-standalone-final.santhosh.svc.cluster.local") +MILVUS_PORT = os.getenv("MILVUS_PORT", "19530") +MILVUS_COLLECTION = os.getenv("MILVUS_COLLECTION", "docs_rag") +MILVUS_VECTOR_FIELD = os.getenv("MILVUS_VECTOR_FIELD", "vector") +EMBEDDING_MODEL = os.getenv("EMBEDDING_MODEL", "sentence-transformers/all-mpnet-base-v2") + +# System prompt +SYSTEM_PROMPT = """ +You are the Kubeflow Docs Assistant. + +!!IMPORTANT!! +- You should not use the tool calls directly from the user's input. You should refine the query to make sure that it is documenation specific and relevant. +- You should never output the raw tool call to the user. + +Your role +- Always answer the user's question directly. +- If the question can be answered from general knowledge (e.g., greetings, small talk, generic programming/Kubernetes basics), respond without using tools. +- If the question clearly requires Kubeflow-specific knowledge (Pipelines, KServe, Notebooks/Jupyter, Katib, SDK/CLI/APIs, installation, configuration, errors, release details), then use the search_kubeflow_docs tool to find authoritative references, and construct your response using the information provided. + +Tool Use +- Use search_kubeflow_docs ONLY when Kubeflow-specific documentation is needed. +- Do NOT use the tool for greetings, personal questions, small talk, or generic non-Kubeflow concepts. +- When you do call the tool: + • Use one clear, focused query. + • Summarize the result in your own words. + • If no results are relevant, say “not found in the docs” and suggest refining the query. +- Example usage: + - User: "What is Kubeflow and how to setup kubeflow on my local machine" + - You should make a tool call to search the docs with a query "kubeflow setup". + + - User: "What is the Kubeflow Pipelines and how can i make a quick kubeflow pipeline" + - You should make a tool call to search the docs with a query "kubeflow pipeline setup". + +The idea is to make sure that human inputs are not directly sent to tool calls, instead we should refine the query to make sure that it is documenation specific and relevant. + +Routing +- Greetings/small talk → respond briefly, no tool. +- Out-of-scope (sports, unrelated topics) → politely say you only help with Kubeflow. +- Kubeflow-specific → answer and call the tool if documentation is needed. + +Style +- Be concise (2–5 sentences). Use bullet points or steps when helpful. +- Provide examples only when asked. +- Never invent features. If unsure, say so. +- Reply in clean Markdown. +""" + + + +def milvus_search(query: str, top_k: int = 5) -> Dict[str, Any]: + """Execute a semantic search in Milvus and return structured JSON serializable results.""" + try: + # Connect to Milvus + connections.connect(alias="default", host=MILVUS_HOST, port=MILVUS_PORT) + collection = Collection(MILVUS_COLLECTION) + collection.load() + + # Encoder (same model as pipeline) + encoder = SentenceTransformer(EMBEDDING_MODEL) + query_vec = encoder.encode(query).tolist() + + search_params = {"metric_type": "COSINE", "params": {"nprobe": 32}} + results = collection.search( + data=[query_vec], + anns_field=MILVUS_VECTOR_FIELD, + param=search_params, + limit=int(top_k), + output_fields=["file_path", "content_text", "citation_url"], + ) + + hits = [] + for hit in results[0]: + # similarity = 1 - distance for COSINE in Milvus + similarity = 1.0 - float(hit.distance) + entity = hit.entity + content_text = entity.get("content_text") or "" + if isinstance(content_text, str) and len(content_text) > 400: + content_text = content_text[:400] + "..." + hits.append({ + "similarity": similarity, + "file_path": entity.get("file_path"), + "citation_url": entity.get("citation_url"), + "content_text": content_text, + }) + return {"results": hits} + except Exception as e: + print(f"[ERROR] Milvus search failed: {e}") + return {"results": []} + finally: + try: + connections.disconnect(alias="default") + except Exception: + pass +TOOLS = [ + { + "type": "function", + "function": { + "name": "search_kubeflow_docs", + "description": ( + "Search the official Kubeflow docs when the user asks Kubeflow-specific questions " + "about Pipelines, KServe, Notebooks/Jupyter, Katib, or the SDK/CLI/APIs.\n" + "Call ONLY for Kubeflow features, setup, usage, errors, or version differences that need citations.\n" + ), + "parameters": { + "type": "object", + "properties": { + "query": { + "type": "string", + "description": "Short, focused search string (e.g., 'KServe inferenceService canary', 'Pipelines v2 disable cache').", + "minLength": 1 + }, + "top_k": { + "type": "integer", + "description": "Number of hits to retrieve (the assistant will read up to this many).", + "default": 5, + "minimum": 1, + "maximum": 10 + } + }, + "required": ["query"], + "additionalProperties": False + } + } + } +] + + +async def execute_tool(tool_call: Dict[str, Any]) -> tuple[str, List[str]]: + """Execute a tool call and return the result and citations""" + try: + function_name = tool_call.get("function", {}).get("name") + arguments = json.loads(tool_call.get("function", {}).get("arguments", "{}")) + + if function_name == "search_kubeflow_docs": + query = arguments.get("query", "") + top_k = arguments.get("top_k", 5) + + print(f"[TOOL] Executing Milvus search for: '{query}' (top_k={top_k})") + result = milvus_search(query, 15) + + # Collect citations + citations = [] + formatted_results = [] + + for hit in result.get("results", []): + citation_url = hit.get('citation_url', '') + if citation_url and citation_url not in citations: + citations.append(citation_url) + + formatted_results.append( + f"File: {hit.get('file_path', 'Unknown')}\n" + f"Content: {hit.get('content_text', '')}\n" + f"URL: {citation_url}\n" + f"Similarity: {hit.get('similarity', 0):.3f}\n" + ) + + formatted_text = "\n".join(formatted_results) if formatted_results else "No relevant results found." + return formatted_text, citations + + return f"Unknown tool: {function_name}", [] + + except Exception as e: + print(f"[ERROR] Tool execution failed: {e}") + return f"Tool execution failed: {e}", [] + +async def stream_llm_response(payload: Dict[str, Any], websocket, citations_collector: List[str] = None) -> None: + """Stream response from LLM to websocket, handling tool calls""" + if citations_collector is None: + citations_collector = [] + try: + async with httpx.AsyncClient(timeout=120) as client: + async with client.stream("POST", KSERVE_URL, json=payload) as response: + if response.status_code != 200: + error_msg = f"LLM service error: HTTP {response.status_code}" + print(f"[ERROR] {error_msg}") + await websocket.send(json.dumps({"type": "error", "content": error_msg})) + return + + # Buffer for accumulating tool calls + tool_calls_buffer = {} + + async for line in response.aiter_lines(): + if not line.startswith("data: "): + continue + + data = line[6:] # Remove "data: " prefix + if data == "[DONE]": + break + + try: + chunk = json.loads(data) + choices = chunk.get("choices", []) + if not choices: + continue + + delta = choices[0].get("delta", {}) + finish_reason = choices[0].get("finish_reason") + + # Handle tool calls in streaming + if "tool_calls" in delta: + tool_calls = delta["tool_calls"] + for tool_call in tool_calls: + index = tool_call.get("index", 0) + + # Initialize tool call buffer if needed + if index not in tool_calls_buffer: + tool_calls_buffer[index] = { + "id": tool_call.get("id", ""), + "type": tool_call.get("type", "function"), + "function": { + "name": tool_call.get("function", {}).get("name", ""), + "arguments": "" + } + } + + # Update tool call data + if tool_call.get("id"): + tool_calls_buffer[index]["id"] = tool_call["id"] + if tool_call.get("type"): + tool_calls_buffer[index]["type"] = tool_call["type"] + + function_data = tool_call.get("function", {}) + if function_data.get("name"): + tool_calls_buffer[index]["function"]["name"] = function_data["name"] + if "arguments" in function_data: + tool_calls_buffer[index]["function"]["arguments"] += function_data["arguments"] + + # Handle regular content + elif "content" in delta and delta["content"]: + await websocket.send(json.dumps({ + "type": "content", + "content": delta["content"] + })) + + # Handle finish reason - execute tools if needed + if finish_reason == "tool_calls": + print(f"[TOOL] Finish reason: tool_calls, executing {len(tool_calls_buffer)} tools") + + # Execute all accumulated tool calls + for tool_call in tool_calls_buffer.values(): + if tool_call["function"]["name"] and tool_call["function"]["arguments"]: + try: + print(f"[TOOL] Executing: {tool_call['function']['name']}") + print(f"[TOOL] Arguments: {tool_call['function']['arguments']}") + + result, tool_citations = await execute_tool(tool_call) + + # Collect citations + citations_collector.extend(tool_citations) + + # Send tool execution result to client + await websocket.send(json.dumps({ + "type": "tool_result", + "tool_name": tool_call["function"]["name"], + "content": result + })) + + # Make follow-up request with tool results + await handle_tool_follow_up(payload, tool_call, result, websocket, citations_collector) + + except Exception as e: + print(f"[ERROR] Tool execution error: {e}") + await websocket.send(json.dumps({ + "type": "error", + "content": f"Tool execution failed: {e}" + })) + + tool_calls_buffer.clear() + break # Tool execution complete, exit streaming loop + + except json.JSONDecodeError as e: + print(f"[ERROR] JSON decode error: {e}, line: {line}") + continue + + except Exception as e: + print(f"[ERROR] Streaming failed: {e}") + await websocket.send(json.dumps({"type": "error", "content": f"Streaming failed: {e}"})) + +async def handle_tool_follow_up(original_payload: Dict[str, Any], tool_call: Dict[str, Any], tool_result: str, websocket, citations_collector: List[str] = None) -> None: + """Handle follow-up request after tool execution""" + if citations_collector is None: + citations_collector = [] + try: + print("[TOOL] Handling follow-up request with tool results") + + # Create messages with tool call and result + messages = original_payload["messages"].copy() + + # Add assistant's tool call message + messages.append({ + "role": "assistant", + "tool_calls": [tool_call] + }) + + # Add tool result message + messages.append({ + "role": "tool", + "tool_call_id": tool_call["id"], + "content": tool_result + }) + + # Create follow-up payload - remove tools to get final response + follow_up_payload = { + "model": original_payload["model"], + "messages": messages, + "stream": True, + "max_tokens": 1000 + } + + # Stream the follow-up response + await stream_llm_response(follow_up_payload, websocket, citations_collector) + + except Exception as e: + print(f"[ERROR] Tool follow-up failed: {e}") + await websocket.send(json.dumps({"type": "error", "content": f"Tool follow-up failed: {e}"})) + +async def handle_chat(message: str, websocket) -> None: + """Handle chat with tool calling support""" + try: + print(f"[CHAT] Processing message: {message[:100]}...") + + # Create initial payload + payload = { + "model": MODEL, + "messages": [ + {"role": "system", "content": SYSTEM_PROMPT}, + {"role": "user", "content": message} + ], + "tools": TOOLS, + "tool_choice": "auto", + "stream": True, + "max_tokens": 1500 + } + + # Collect citations throughout the conversation + citations_collector = [] + + # Start streaming response + await stream_llm_response(payload, websocket, citations_collector) + + # Send citations if any were collected + if citations_collector: + # Remove duplicates while preserving order + unique_citations = [] + for citation in citations_collector: + if citation not in unique_citations: + unique_citations.append(citation) + + await websocket.send(json.dumps({ + "type": "citations", + "citations": unique_citations + })) + + # Send completion signal + await websocket.send(json.dumps({"type": "done"})) + + except Exception as e: + print(f"[ERROR] Chat handling failed: {e}") + await websocket.send(json.dumps({"type": "error", "content": f"Request failed: {e}"})) + +async def handle_websocket(websocket, path): + """Handle WebSocket connections""" + print(f"[WS] New connection from {websocket.remote_address}") + + try: + # Send welcome message + await websocket.send(json.dumps({ + "type": "system", + "content": "Connected to Kubeflow Documentation Assistant" + })) + + async for message in websocket: + try: + # Ensure we always deal with string, not bytes + if isinstance(message, (bytes, bytearray)): + message = message.decode("utf-8", errors="ignore") + + # Try to parse as JSON first + try: + msg_data = json.loads(message) + if isinstance(msg_data, dict) and "message" in msg_data: + message = msg_data["message"] + except json.JSONDecodeError: + # Treat as plain text message + pass + + print(f"[WS] Received: {message[:100]}...") + await handle_chat(message, websocket) + + except Exception as e: + print(f"[ERROR] Message processing error: {e}") + await websocket.send(json.dumps({ + "type": "error", + "content": f"Message processing failed: {e}" + })) + + except ConnectionClosedError: + print("[WS] Connection closed") + except Exception as e: + print(f"[ERROR] WebSocket error: {e}") + +async def health_check(path, request_headers): + """Handle HTTP health checks""" + if path == "/health": + return 200, [("Content-Type", "text/plain")], b"OK" + return None + +async def main(): + """Start the WebSocket server""" + print("🚀 Starting Kubeflow Docs WebSocket Server") + print(f" Port: {PORT}") + print(f" LLM Service: {KSERVE_URL}") + print(f" Milvus: {MILVUS_HOST}:{MILVUS_PORT}") + print(f" Collection: {MILVUS_COLLECTION}") + + # Configure logging + logging.getLogger("websockets").setLevel(logging.WARNING) + + # Start server + async with serve( + handle_websocket, + "0.0.0.0", + PORT, + process_request=health_check, + ping_interval=30, + ping_timeout=10 + ): + print("✅ WebSocket server is running...") + print(f" WebSocket: ws://localhost:{PORT}") + print(f" Health: http://localhost:{PORT}/health") + + # Keep server running + await asyncio.Future() + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/server/deployment.yaml b/server/deployment.yaml new file mode 100644 index 0000000..dcdec4c --- /dev/null +++ b/server/deployment.yaml @@ -0,0 +1,96 @@ +apiVersion: apps/v1 +kind: Deployment +metadata: + name: ws-proxy + labels: + app: ws-proxy + version: v1 +spec: + replicas: 1 + strategy: + type: RollingUpdate + rollingUpdate: + maxUnavailable: 0 + maxSurge: 1 + selector: + matchLabels: + app: ws-proxy + template: + metadata: + labels: + app: ws-proxy + version: v1 + spec: + containers: + - name: ws-proxy + image: santhoshtoorpu/ws-proxy:latest + imagePullPolicy: IfNotPresent # actually reduce pulls + resources: + requests: + cpu: "300m" + memory: "1Gi" + ephemeral-storage: "4Gi" + limits: + cpu: "1500m" + memory: "2Gi" + ephemeral-storage: "8Gi" + env: + # Match your app.py variable names + - name: KSERVE_URL + value: "http://llama.santhosh.svc.cluster.local/openai/v1/chat/completions" + - name: MODEL + value: "llama3.1-8B" + - name: PORT + value: "8000" + - name: PYTHONUNBUFFERED + value: "1" + + # Make cache paths explicit and writable (align with Dockerfile) + - name: HF_HOME + value: "/home/appuser/.cache/huggingface" + - name: TRANSFORMERS_CACHE + value: "/home/appuser/.cache/huggingface" + - name: SENTENCE_TRANSFORMERS_HOME + value: "/home/appuser/.cache/sentence_transformers" + - name: XDG_CACHE_HOME + value: "/home/appuser/.cache" + + ports: + - containerPort: 8000 + name: http + protocol: TCP + + livenessProbe: + httpGet: + path: /health + port: 8000 + initialDelaySeconds: 30 + periodSeconds: 10 + timeoutSeconds: 5 + failureThreshold: 3 + + readinessProbe: + httpGet: + path: /health + port: 8000 + initialDelaySeconds: 10 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 3 + + securityContext: + allowPrivilegeEscalation: false + runAsNonRoot: true + runAsUser: 1000 + capabilities: + drop: ["ALL"] + + # Optional: mount an emptyDir to persist cache across container restarts + volumeMounts: + - name: hf-cache + mountPath: /home/appuser/.cache + + volumes: + - name: hf-cache + emptyDir: + sizeLimit: 5Gi diff --git a/server/requirements.txt b/server/requirements.txt new file mode 100644 index 0000000..fba0111 --- /dev/null +++ b/server/requirements.txt @@ -0,0 +1,13 @@ +# Core dependencies +websockets +httpx + +# Milvus client +pymilvus + +# Optimized ML dependencies - use lighter alternatives +sentence-transformers +torch --extra-index-url https://download.pytorch.org/whl/cpu + +# Essential only +numpy diff --git a/server/service.yaml b/server/service.yaml new file mode 100644 index 0000000..f7a058d --- /dev/null +++ b/server/service.yaml @@ -0,0 +1,28 @@ +apiVersion: v1 +kind: Service +metadata: + name: ws-proxy + labels: + app: ws-proxy + version: v1 + annotations: + # Add service annotations for better observability + prometheus.io/scrape: "false" # Disable if no metrics endpoint +spec: + type: ClusterIP # Explicit service type + selector: + app: ws-proxy + ports: + - name: http + port: 80 + targetPort: 8000 + protocol: TCP + - name: websocket + port: 8000 + targetPort: 8000 + protocol: TCP + # Session affinity for websocket connections + sessionAffinity: ClientIP + sessionAffinityConfig: + clientIP: + timeoutSeconds: 3600 # 1 hour timeout for websocket sessions \ No newline at end of file