# Usecase Delivery Planning Agent

This notebook implements a RAG-based delivery planning agent using DSPy that helps users generate comprehensive project plans for data migration and delivery projects. The agent leverages indexed documents and follows a structured approach to gather requirements and generate actionable project plans.

## Features:
- **Intelligent Question Generation**: Automatically generates relevant questions based on project context
- **Document Retrieval**: Uses vector search to find relevant information from indexed documents
- **Structured Planning**: Generates comprehensive project plans with timelines, resources, and milestones
- **Risk Assessment**: Identifies potential risks and mitigation strategies
- **DSPy Optimization**: Uses DSPy for prompt optimization and performance improvement


## 1. Setup and Configuration


In [None]:
# Install required packages
%pip install dspy-ai databricks-vectorsearch pydantic


In [None]:
# Restart Python to ensure packages are loaded
dbutils.library.restartPython()


In [None]:
# Configuration
VECTOR_SEARCH_ENDPOINT_NAME = "use-case-planning-agent"  # Your vector search endpoint
VECTOR_SEARCH_INDEX_NAME = "usecase-planning-agent-index"  # Your vector search index
CATALOG_NAME = "vbdemos"
SCHEMA_NAME = "usecase_agent"
DOCUMENTS_TABLE = f"{CATALOG_NAME}.{SCHEMA_NAME}.usecase_planning_agent_pdf_parsed"

# Project planning categories and questions
PLANNING_CATEGORIES = {
    "Resource": [
        "How many team members are there?",
        "Are they using PS skills?",
        "Are they using an SI?",
        "Are the teams sufficiently skilled/trained?",
        "Are resources shared with other projects?",
        "Have resources done this work before?",
        "Is there a product owner?",
        "Are the DevOps/SecOps/Infra teams under this 'teams' control/purview?",
        "Program manager?",
        "Are the BAU teams that will ultimately manage the new system sufficiently trained?",
        "Has an end-user adoption plan been created?"
    ],
    "Current Process Maturity": [
        "Do they have a history of delays?",
        "Do they have change management authority/equivalent?",
        "Do they have an identified way of working - agile/waterfall?"
    ],
    "Customer Background": [
        "Does the customer have a specific deadline/reason for that deadline?",
        "Customer is already using cloud?",
        "Customer has Databricks elsewhere?",
        "Customer has security approval for this migration?",
        "Are there any key connectors that are needed?",
        "What are the key drivers of the migration?",
        "Are there any legal compliance or requirements to consider?"
    ],
    "Scope": [
        "Has a pilot or POC been conducted?",
        "Does the customer have visibility of all the data and pipelines that need migration?",
        "Is customer aware of where and who uses the data?",
        "Is lift and shift or redesign preferred?",
        "How many pipelines are to be migrated?",
        "Relative complexity of pipelines?",
        "Volume of data to be migrated?",
        "How frequently is the data updated?",
        "Is there a proposed UC design/infrastructure design?",
        "Is PII handling included?",
        "Does the migration include monitoring?",
        "Does the migration include optimization?",
        "Will it be run in parallel or phased move over?",
        "Awareness of business critical pipelines/pipelines that cannot be down?",
        "Do they have control over how they receive data?",
        "Are additional data quality checks needing to be implemented?",
        "Are there any key connectors that need to be migrated?",
        "What level of testing is required and who will be doing this?",
        "Are data consumers/systems that use the data known?",
        "Does customer already have a plan?",
        "What is the quality of the data?",
        "Are the data pathways known?",
        "Has a permissions model been agreed?",
        "Is there a new data layout/has the UC catalog/schema structure been designed and agreed?",
        "Is disaster recovery included in this migration?"
    ]
}

print("Configuration loaded successfully!")
print(f"Vector Search Endpoint: {VECTOR_SEARCH_ENDPOINT_NAME}")
print(f"Documents Table: {DOCUMENTS_TABLE}")
print(f"Planning Categories: {list(PLANNING_CATEGORIES.keys())}")


## 2. DSPy Setup and Model Configuration


In [None]:
import dspy
import json
from typing import List, Dict, Any
from databricks.vector_search.client import VectorSearchClient
from pyspark.sql import SparkSession

# Initialize Spark
spark = SparkSession.builder.appName("DeliveryPlanningAgent").getOrCreate()

# Configure DSPy with Databricks model
token = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiToken().get()
url = dbutils.notebook.entry_point.getDbutils().notebook().getContext().apiUrl().get() + '/serving-endpoints'

lm = dspy.LM(
    model="databricks/databricks-claude-sonnet-4",
    api_key=token,
    api_base=url,
)
dspy.configure(lm=lm)

# Initialize Vector Search client
vsc = VectorSearchClient(disable_notice=True)

print("DSPy and Vector Search configured successfully!")


## 3. DSPy Signatures and Modules


In [None]:
# DSPy Signatures for the delivery planning agent

class QuestionGenerator(dspy.Signature):
    """Generate relevant questions based on project context and category."""
    project_context: str = dspy.InputField(desc="Description of the project and current context")
    category: str = dspy.InputField(desc="Planning category (Resource, Scope, Customer Background, etc.)")
    existing_answers: str = dspy.InputField(desc="Previously answered questions and responses")
    questions: str = dspy.OutputField(desc="List of 3-5 most relevant questions for this category")

class DocumentRetriever(dspy.Signature):
    """Retrieve relevant documents based on questions and context."""
    question: str = dspy.InputField(desc="Specific question to find relevant information for")
    project_context: str = dspy.InputField(desc="Project context and background")
    retrieved_docs: str = dspy.OutputField(desc="Relevant document excerpts and information")

class AnswerAnalyzer(dspy.Signature):
    """Analyze answers and extract key insights for project planning."""
    question: str = dspy.InputField(desc="The question that was asked")
    answer: str = dspy.InputField(desc="The answer provided by the user")
    relevant_docs: str = dspy.InputField(desc="Relevant documents retrieved from knowledge base")
    insights: str = dspy.OutputField(desc="Key insights, risks, and implications for project planning")

class ProjectPlanGenerator(dspy.Signature):
    """Generate comprehensive project plan based on gathered information."""
    project_context: str = dspy.InputField(desc="Overall project context and objectives")
    gathered_insights: str = dspy.InputField(desc="All insights gathered from questions and documents")
    timeline_requirements: str = dspy.InputField(desc="Timeline constraints and requirements")
    project_plan: str = dspy.OutputField(desc="Comprehensive project plan with phases, milestones, resources, and risks")

class RiskAssessor(dspy.Signature):
    """Assess risks and provide mitigation strategies."""
    project_plan: str = dspy.InputField(desc="The proposed project plan")
    project_context: str = dspy.InputField(desc="Project context and constraints")
    risk_assessment: str = dspy.OutputField(desc="Identified risks, their likelihood, impact, and mitigation strategies")

print("DSPy signatures defined successfully!")


In [None]:
# DSPy Modules for the delivery planning agent

class QuestionGenerationModule(dspy.Module):
    """Module to generate relevant questions for each planning category."""
    
    def __init__(self):
        super().__init__()
        self.generate = dspy.ChainOfThought(QuestionGenerator)
    
    def forward(self, project_context: str, category: str, existing_answers: str = ""):
        return self.generate(
            project_context=project_context,
            category=category,
            existing_answers=existing_answers
        )

class DocumentRetrievalModule(dspy.Module):
    """Module to retrieve relevant documents using vector search."""
    
    def __init__(self, vector_search_endpoint: str, vector_search_index: str):
        super().__init__()
        self.vector_search_endpoint = vector_search_endpoint
        self.vector_search_index = vector_search_index
        self.retrieve = dspy.ChainOfThought(DocumentRetriever)
    
    def forward(self, question: str, project_context: str):
        # Perform vector search
        search_results = vsc.get_index(
            endpoint_name=self.vector_search_endpoint,
            index_name=self.vector_search_index
        ).similarity_search(
            query_text=question,
            columns=["chunk_text", "file_name", "chunk_type"],
            num_results=5
        )
        
        # Format retrieved documents
        retrieved_docs = "\n\n".join([
            f"Source: {doc.get('file_name', 'Unknown')}\nContent: {doc.get('chunk_text', '')}"
            for doc in search_results.get('result', {}).get('data_array', [])
        ])
        
        return self.retrieve(
            question=question,
            project_context=project_context,
            retrieved_docs=retrieved_docs
        )

class AnswerAnalysisModule(dspy.Module):
    """Module to analyze answers and extract insights."""
    
    def __init__(self):
        super().__init__()
        self.analyze = dspy.ChainOfThought(AnswerAnalyzer)
    
    def forward(self, question: str, answer: str, relevant_docs: str):
        return self.analyze(
            question=question,
            answer=answer,
            relevant_docs=relevant_docs
        )

class ProjectPlanGenerationModule(dspy.Module):
    """Module to generate comprehensive project plans."""
    
    def __init__(self):
        super().__init__()
        self.generate_plan = dspy.ChainOfThought(ProjectPlanGenerator)
    
    def forward(self, project_context: str, gathered_insights: str, timeline_requirements: str):
        return self.generate_plan(
            project_context=project_context,
            gathered_insights=gathered_insights,
            timeline_requirements=timeline_requirements
        )

class RiskAssessmentModule(dspy.Module):
    """Module to assess risks and provide mitigation strategies."""
    
    def __init__(self):
        super().__init__()
        self.assess_risks = dspy.ChainOfThought(RiskAssessor)
    
    def forward(self, project_plan: str, project_context: str):
        return self.assess_risks(
            project_plan=project_plan,
            project_context=project_context
        )

print("DSPy modules defined successfully!")


## 4. Main Delivery Planning Agent


In [None]:
class DeliveryPlanningAgent(dspy.Module):
    """Main delivery planning agent that orchestrates the entire planning process."""
    
    def __init__(self, vector_search_endpoint: str, vector_search_index: str):
        super().__init__()
        self.question_generator = QuestionGenerationModule()
        self.document_retriever = DocumentRetrievalModule(vector_search_endpoint, vector_search_index)
        self.answer_analyzer = AnswerAnalysisModule()
        self.plan_generator = ProjectPlanGenerationModule()
        self.risk_assessor = RiskAssessmentModule()
        
        # Store conversation state
        self.conversation_history = []
        self.gathered_insights = []
        self.project_context = ""
    
    def start_planning_session(self, project_context: str, timeline_requirements: str = ""):
        """Start a new planning session."""
        self.project_context = project_context
        self.conversation_history = []
        self.gathered_insights = []
        
        print("🚀 Starting Delivery Planning Session")
        print(f"📋 Project Context: {project_context}")
        if timeline_requirements:
            print(f"⏰ Timeline Requirements: {timeline_requirements}")
        print("\n" + "="*50)
        
        return self._generate_questions_for_category("Resource")
    
    def _generate_questions_for_category(self, category: str):
        """Generate questions for a specific category."""
        existing_answers = self._format_existing_answers()
        
        result = self.question_generator(
            project_context=self.project_context,
            category=category,
            existing_answers=existing_answers
        )
        
        print(f"\n📝 {category} Questions:")
        print("-" * 30)
        questions = result.questions.split('\n')
        for i, question in enumerate(questions, 1):
            if question.strip():
                print(f"{i}. {question.strip()}")
        
        return questions
    
    def answer_question(self, question: str, answer: str, category: str = ""):
        """Process a user's answer to a question."""
        print(f"\n💬 Processing Answer:")
        print(f"Q: {question}")
        print(f"A: {answer}")
        
        # Retrieve relevant documents
        doc_result = self.document_retriever(question, self.project_context)
        relevant_docs = doc_result.retrieved_docs
        
        # Analyze the answer
        analysis_result = self.answer_analyzer(question, answer, relevant_docs)
        insights = analysis_result.insights
        
        # Store in conversation history
        self.conversation_history.append({
            "question": question,
            "answer": answer,
            "category": category,
            "insights": insights,
            "relevant_docs": relevant_docs
        })
        
        self.gathered_insights.append(insights)
        
        print(f"\n🔍 Key Insights:")
        print(insights)
        
        return insights
    
    def generate_project_plan(self, timeline_requirements: str = ""):
        """Generate the final project plan based on all gathered information."""
        print("\n📊 Generating Project Plan...")
        print("="*50)
        
        # Combine all insights
        all_insights = "\n\n".join(self.gathered_insights)
        
        # Generate project plan
        plan_result = self.plan_generator(
            project_context=self.project_context,
            gathered_insights=all_insights,
            timeline_requirements=timeline_requirements
        )
        
        project_plan = plan_result.project_plan
        
        print("📋 PROJECT PLAN:")
        print("="*50)
        print(project_plan)
        
        # Assess risks
        risk_result = self.risk_assessor(project_plan, self.project_context)
        risk_assessment = risk_result.risk_assessment
        
        print("\n⚠️  RISK ASSESSMENT:")
        print("="*50)
        print(risk_assessment)
        
        return {
            "project_plan": project_plan,
            "risk_assessment": risk_assessment,
            "conversation_history": self.conversation_history
        }
    
    def _format_existing_answers(self):
        """Format existing answers for context."""
        if not self.conversation_history:
            return "No previous answers yet."
        
        formatted = []
        for entry in self.conversation_history:
            formatted.append(f"Q: {entry['question']}\nA: {entry['answer']}")
        
        return "\n\n".join(formatted)
    
    def get_next_category_questions(self):
        """Get questions for the next planning category."""
        categories = list(PLANNING_CATEGORIES.keys())
        completed_categories = set(entry.get('category', '') for entry in self.conversation_history)
        
        for category in categories:
            if category not in completed_categories:
                return self._generate_questions_for_category(category)
        
        return None  # All categories completed

print("Delivery Planning Agent defined successfully!")


## 5. Example Usage and Testing


In [None]:
# Initialize the delivery planning agent
agent = DeliveryPlanningAgent(VECTOR_SEARCH_ENDPOINT_NAME, VECTOR_SEARCH_INDEX_NAME)

# Example: Start a planning session
project_context = """
We are planning a data migration project for a large financial services company. 
They want to migrate their existing Oracle data warehouse to Databricks on Azure. 
The migration involves 50+ data pipelines, 2TB of data, and needs to be completed 
within 6 months. The company has some cloud experience but limited Databricks knowledge.
"""

timeline_requirements = "6 months deadline, must be completed by end of Q2 2024"

# Start the planning session
questions = agent.start_planning_session(project_context, timeline_requirements)

print("\n" + "="*50)
print("Example: Answer some questions to see the agent in action")
print("="*50)


In [None]:
# Example: Simulate answering questions
# In a real scenario, these would be user inputs

# Answer Resource questions
resource_answers = [
    ("How many team members are there?", "We have 8 team members including 2 data engineers, 3 analysts, 2 developers, and 1 project manager"),
    ("Are the teams sufficiently skilled/trained?", "The data engineers have some cloud experience but no Databricks experience. The analysts are familiar with SQL but not Spark. We need training."),
    ("Is there a product owner?", "Yes, we have a product owner from the business side who understands the data requirements"),
    ("Are the DevOps/SecOps/Infra teams under this 'teams' control/purview?", "No, they are separate teams with their own priorities and reporting lines")
]

print("📝 Processing Resource Questions...")
for question, answer in resource_answers:
    agent.answer_question(question, answer, "Resource")

print("\n" + "="*50)
print("Moving to next category...")
print("="*50)

# Get next category questions
next_questions = agent.get_next_category_questions()


In [None]:
# Answer Scope questions
scope_answers = [
    ("How many pipelines are to be migrated?", "We have approximately 50 data pipelines that need to be migrated"),
    ("Volume of data to be migrated?", "The total data volume is about 2TB across multiple tables and schemas"),
    ("Is lift and shift or redesign preferred?", "We prefer a hybrid approach - lift and shift for simple pipelines, redesign for complex ones"),
    ("Does the migration include monitoring?", "Yes, we need comprehensive monitoring and alerting for the new system"),
    ("Will it be run in parallel or phased move over?", "We plan to do a phased migration over 6 months, starting with non-critical pipelines")
]

print("📝 Processing Scope Questions...")
for question, answer in scope_answers:
    agent.answer_question(question, answer, "Scope")

print("\n" + "="*50)
print("Generating Project Plan...")
print("="*50)

# Generate the final project plan
final_plan = agent.generate_project_plan(timeline_requirements)

print("\n✅ Planning session completed!")
print("📊 Summary:")
print(f"- Total questions answered: {len(agent.conversation_history)}")
print(f"- Categories covered: {set(entry['category'] for entry in agent.conversation_history)}")
print(f"- Key insights gathered: {len(agent.gathered_insights)}")


## 6. Interactive Planning Interface


In [None]:
def interactive_planning_session():
    """Interactive function for users to run their own planning sessions."""
    
    print("🚀 Welcome to the Delivery Planning Agent!")
    print("="*50)
    
    # Get project context from user
    project_context = input("Please describe your project context: ")
    timeline_requirements = input("Any specific timeline requirements? (press Enter to skip): ")
    
    # Initialize agent
    agent = DeliveryPlanningAgent(VECTOR_SEARCH_ENDPOINT_NAME, VECTOR_SEARCH_INDEX_NAME)
    
    # Start planning session
    questions = agent.start_planning_session(project_context, timeline_requirements)
    
    print("\n📝 Please answer the questions above. Type 'next' to move to the next category, or 'plan' to generate the project plan.")
    
    while True:
        user_input = input("\nYour response (or 'next'/'plan'): ").strip()
        
        if user_input.lower() == 'next':
            next_questions = agent.get_next_category_questions()
            if next_questions is None:
                print("✅ All categories completed! Type 'plan' to generate your project plan.")
            continue
        
        elif user_input.lower() == 'plan':
            final_plan = agent.generate_project_plan(timeline_requirements)
            break
        
        else:
            # This is a simplified version - in practice, you'd need to match the response to the current question
            print("Please provide a more specific answer or use 'next'/'plan' commands.")
    
    return final_plan

# Uncomment the line below to run an interactive session
# interactive_planning_session()

print("Interactive planning interface ready!")
print("To start an interactive session, uncomment and run: interactive_planning_session()")
