# Data Analyst Assistant Agent

## üéØ Project Goal
Build an intelligent agent that supports data analysts by:
* **Answering data requests** with 100% accurate results
* **Querying source files** directly from storage containers
* **Breaking down queries** into clear, understandable steps
* **Generating KPIs and metrics** for management reports
* **Validating results** to ensure accuracy and reliability

---

## üë• Target Users
* **Data Analysts** - Daily data queries and analysis
* **Upper Management** - KPIs, metrics, and executive reports
* **Data Engineers** - Supporting infrastructure and data quality

---

## üèóÔ∏è Architecture Components

### 1. **File Discovery Engine**
   * Scans storage containers (Volumes, DBFS, cloud storage)
   * Detects file formats (CSV, Parquet, JSON, Delta)
   * Builds searchable data catalog
   * Infers schemas automatically

### 2. **Natural Language Query Parser**
   * Understands analyst requests in plain English
   * Extracts intent (metrics, filters, time ranges)
   * Maps to available data sources
   * Handles ambiguity with clarifying questions

### 3. **SQL Generation Engine**
   * Converts requests to optimized SQL
   * Supports complex aggregations and joins
   * Applies best practices (predicate pushdown, column pruning)
   * Generates efficient queries for large datasets

### 4. **Query Explanation System**
   * Breaks down SQL into plain English steps
   * Shows data sources and transformations
   * Visualizes query logic flow
   * Explains calculations and business logic

### 5. **Execution & Validation Layer**
   * Executes queries with error handling
   * Validates results (null checks, range validation)
   * Detects anomalies and outliers
   * Provides confidence scores

### 6. **Result Formatting & Reporting**
   * Formats output for readability
   * Generates visualizations
   * Creates exportable reports
   * Supports multiple output formats

In [0]:
# Connect to best_selling_books volume in Unity Catalog
volume_path = "/Volumes/prep_databricks/default/best_selling_books"

print("üìö Best Selling Books Data Analysis")
print("="*80)
print(f"\nüìÇ Volume Path: {volume_path}\n")

# List all files
files = dbutils.fs.ls(volume_path)
print(f"üìÑ Available files ({len(files)}):")
for file in files:
    size_mb = file.size / (1024*1024)
    print(f"   ‚Ä¢ {file.name} ({size_mb:.2f} MB)")

print("\n‚úÖ Connected to Unity Catalog volume successfully!")

In [0]:
# Read best selling books data from Unity Catalog volume
print("üìä Reading data files...\n")

# Read individual year files
df_2023 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(f"{volume_path}/best sellin books 2023.csv")
df_2024 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(f"{volume_path}/best sellin books 2024.csv")
df_2025 = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(f"{volume_path}/best sellin books 2025.csv")

# Read total/combined file
df_total = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(f"{volume_path}/best sellin books total.csv")

print("‚úÖ Data loaded successfully!\n")
print("Available DataFrames:")
print(f"   ‚Ä¢ df_2023: {df_2023.count():,} rows, {len(df_2023.columns)} columns")
print(f"   ‚Ä¢ df_2024: {df_2024.count():,} rows, {len(df_2024.columns)} columns")
print(f"   ‚Ä¢ df_2025: {df_2025.count():,} rows, {len(df_2025.columns)} columns")
print(f"   ‚Ä¢ df_total: {df_total.count():,} rows, {len(df_total.columns)} columns")

print("\nüîç Schema (df_total):")
df_total.printSchema()

print("\nüìù Sample data (first 5 rows):")
display(df_total.limit(5))

In [0]:
# Simple Data Analysis Agent for Best Selling Books
from pyspark.sql.functions import col, count, avg, sum as spark_sum, desc, round as spark_round, when

class BooksAnalysisAgent:
    def __init__(self, df):
        self.df = df
        self.df_name = "Best Selling Books"
        
    def analyze_request(self, request):
        """
        Analyze natural language requests about the books data.
        """
        request_lower = request.lower()
        
        print(f"\nü§ñ Agent analyzing: '{request}'")
        print("="*80)
        
        # Top rated books
        if "top rated" in request_lower or "highest rated" in request_lower:
            return self._top_rated_books()
        
        # Most reviewed
        elif "most reviewed" in request_lower or "popular" in request_lower:
            return self._most_reviewed_books()
        
        # Genre analysis
        elif "genre" in request_lower:
            return self._genre_analysis()
        
        # Price analysis
        elif "price" in request_lower or "expensive" in request_lower or "cheap" in request_lower:
            return self._price_analysis()
        
        # Author analysis
        elif "author" in request_lower:
            return self._author_analysis()
        
        # Year trends
        elif "year" in request_lower or "trend" in request_lower:
            return self._year_trends()
        
        # Summary
        elif "summary" in request_lower or "overview" in request_lower:
            return self._data_summary()
        
        else:
            print("\n‚ùì I can help you analyze:")
            print("  ‚Ä¢ Top rated books")
            print("  ‚Ä¢ Most reviewed books")
            print("  ‚Ä¢ Genre analysis")
            print("  ‚Ä¢ Price analysis")
            print("  ‚Ä¢ Author analysis")
            print("  ‚Ä¢ Year trends")
            print("  ‚Ä¢ Data summary")
            return None
    
    def _top_rated_books(self, limit=10):
        print("\nüìä Analysis: Top Rated Books")
        print("-"*80)
        
        # Clean rating column and convert to numeric
        result = (self.df
                 .withColumn("rating_numeric", 
                            col("Rating").substr(1, 3).cast("double"))
                 .orderBy(desc("rating_numeric"), desc("reviews count"))
                 .select("Book name", "Author", "Rating", "reviews count", "Genre")
                 .limit(limit))
        
        print(f"\n‚úÖ Found top {limit} highest rated books:")
        display(result)
        return result
    
    def _most_reviewed_books(self, limit=10):
        print("\nüìä Analysis: Most Reviewed Books")
        print("-"*80)
        
        result = (self.df
                 .orderBy(desc("reviews count"))
                 .select("Book name", "Author", "Rating", "reviews count", "Genre")
                 .limit(limit))
        
        print(f"\n‚úÖ Found top {limit} most reviewed books:")
        display(result)
        return result
    
    def _genre_analysis(self):
        print("\nüìä Analysis: Books by Genre")
        print("-"*80)
        
        result = (self.df
                 .groupBy("Genre")
                 .agg(
                     count("*").alias("book_count"),
                     avg("reviews count").alias("avg_reviews")
                 )
                 .orderBy(desc("book_count")))
        
        print("\n‚úÖ Genre breakdown:")
        display(result)
        return result
    
    def _price_analysis(self):
        print("\nüìä Analysis: Price Distribution")
        print("-"*80)
        
        # Clean price column
        result = (self.df
                 .withColumn("price_numeric", 
                            col("price").substr(2, 10).cast("double"))
                 .filter(col("price_numeric").isNotNull())
                 .groupBy("form")
                 .agg(
                     count("*").alias("count"),
                     spark_round(avg("price_numeric"), 2).alias("avg_price"),
                     spark_round(spark_sum("price_numeric") / spark_sum("reviews count") * 1000, 4).alias("price_per_1k_reviews")
                 )
                 .orderBy(desc("count")))
        
        print("\n‚úÖ Price analysis by format:")
        display(result)
        return result
    
    def _author_analysis(self, limit=10):
        print("\nüìä Analysis: Top Authors")
        print("-"*80)
        
        result = (self.df
                 .groupBy("Author")
                 .agg(
                     count("*").alias("book_count"),
                     spark_sum("reviews count").alias("total_reviews")
                 )
                 .orderBy(desc("book_count"), desc("total_reviews"))
                 .limit(limit))
        
        print(f"\n‚úÖ Top {limit} authors by number of books:")
        display(result)
        return result
    
    def _year_trends(self):
        print("\nüìä Analysis: Year-over-Year Trends")
        print("-"*80)
        
        # Count books that appeared in each year
        result = self.df.select(
            spark_sum(when(col("id_2023").isNotNull(), 1).otherwise(0)).alias("books_in_2023"),
            spark_sum(when(col("id_2024").isNotNull(), 1).otherwise(0)).alias("books_in_2024"),
            spark_sum(when(col("id_2025").isNotNull(), 1).otherwise(0)).alias("books_in_2025")
        )
        
        print("\n‚úÖ Books appearing in each year's top 100:")
        display(result)
        
        # Books that appeared in all years
        all_years = (self.df
                    .filter(col("id_2023").isNotNull() & 
                           col("id_2024").isNotNull() & 
                           col("id_2025").isNotNull())
                    .select("Book name", "Author", "id_2023", "id_2024", "id_2025"))
        
        print("\n‚úÖ Books that appeared in ALL three years:")
        display(all_years)
        return result
    
    def _data_summary(self):
        print("\nüìä Analysis: Data Summary")
        print("-"*80)
        
        total_books = self.df.count()
        total_reviews = self.df.agg(spark_sum("reviews count")).collect()[0][0]
        unique_authors = self.df.select("Author").distinct().count()
        unique_genres = self.df.select("Genre").distinct().count()
        
        print(f"\n‚úÖ Dataset Overview:")
        print(f"  üìö Total unique books: {total_books:,}")
        print(f"  ‚≠ê Total reviews: {total_reviews:,}")
        print(f"  ‚úçÔ∏è  Unique authors: {unique_authors:,}")
        print(f"  üé≠ Unique genres: {unique_genres:,}")
        
        # Show sample
        print("\nüìñ Sample data:")
        display(self.df.limit(5))
        
        return self.df

# Initialize the agent with our data
agent = BooksAnalysisAgent(df_total)

print("\n‚úÖ Books Analysis Agent initialized!")
print("\nTry asking questions like:")
print("  ‚Ä¢ agent.analyze_request('Show me the top rated books')")
print("  ‚Ä¢ agent.analyze_request('What are the most reviewed books?')")
print("  ‚Ä¢ agent.analyze_request('Analyze by genre')")
print("  ‚Ä¢ agent.analyze_request('Show me price analysis')")
print("  ‚Ä¢ agent.analyze_request('Who are the top authors?')")
print("  ‚Ä¢ agent.analyze_request('Show year trends')")
print("  ‚Ä¢ agent.analyze_request('Give me a summary')")


In [0]:
# Ask the agent a question
agent.analyze_request("Show me the top genre in books")

In [0]:
# Let's run the agent with different analysis requests

# Analysis 1: Data Summary
agent.analyze_request("Give me a summary of the data")

In [0]:
# Analysis 2: Top Rated Books
agent.analyze_request("Show me the top rated books")

In [0]:
# Analysis 3: Genre Breakdown
agent.analyze_request("Analyze by genre")

In [0]:
# Analysis 4: Year-over-Year Trends
agent.analyze_request("Show year trends")

## üîÑ Agent Workflow

```
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ                    DATA ANALYST REQUEST                         ‚îÇ
‚îÇ  "Show me total revenue by region for last quarter"             ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                         ‚îÇ
                         ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  STEP 1: UNDERSTAND REQUEST                                     ‚îÇ
‚îÇ  ‚Ä¢ Parse natural language                                       ‚îÇ
‚îÇ  ‚Ä¢ Extract: metric=revenue, dimension=region, time=last_quarter ‚îÇ
‚îÇ  ‚Ä¢ Identify required data sources                               ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                         ‚îÇ
                         ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  STEP 2: DISCOVER DATA SOURCES                                  ‚îÇ
‚îÇ  ‚Ä¢ Scan storage containers                                      ‚îÇ
‚îÇ  ‚Ä¢ Find relevant files: sales_data.parquet, regions.csv         ‚îÇ
‚îÇ  ‚Ä¢ Load schemas and metadata                                    ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                         ‚îÇ
                         ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  STEP 3: GENERATE SQL QUERY                                     ‚îÇ
‚îÇ  ‚Ä¢ Build optimized SQL                                          ‚îÇ
‚îÇ  ‚Ä¢ Apply filters (date range)                                   ‚îÇ
‚îÇ  ‚Ä¢ Add aggregations (SUM, GROUP BY)                             ‚îÇ
‚îÇ  ‚Ä¢ Optimize for performance                                     ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                         ‚îÇ
                         ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  STEP 4: EXPLAIN QUERY (Before Execution)                       ‚îÇ
‚îÇ  ‚úì Data Source: /volumes/sales/data/sales_2024.parquet         ‚îÇ
‚îÇ  ‚úì Filter: date >= '2024-10-01' AND date <= '2024-12-31'       ‚îÇ
‚îÇ  ‚úì Calculation: SUM(amount) AS total_revenue                    ‚îÇ
‚îÇ  ‚úì Grouping: BY region                                          ‚îÇ
‚îÇ  ‚úì Expected rows: ~5 regions                                    ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                         ‚îÇ
                         ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  STEP 5: EXECUTE QUERY                                          ‚îÇ
‚îÇ  ‚Ä¢ Run SQL against data files                                   ‚îÇ
‚îÇ  ‚Ä¢ Monitor execution time                                       ‚îÇ
‚îÇ  ‚Ä¢ Capture any errors                                           ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                         ‚îÇ
                         ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  STEP 6: VALIDATE RESULTS                                       ‚îÇ
‚îÇ  ‚úì Row count check: 5 rows (expected ~5) ‚úì                     ‚îÇ
‚îÇ  ‚úì Null check: No nulls in revenue ‚úì                           ‚îÇ
‚îÇ  ‚úì Range check: Revenue values reasonable ‚úì                    ‚îÇ
‚îÇ  ‚úì Confidence score: 98%                                        ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
                         ‚îÇ
                         ‚ñº
‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê
‚îÇ  STEP 7: FORMAT & PRESENT RESULTS                               ‚îÇ
‚îÇ  üìä Total Revenue by Region (Q4 2024)                           ‚îÇ
‚îÇ  ‚îå‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¨‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îê                            ‚îÇ
‚îÇ  ‚îÇ Region       ‚îÇ Total Revenue   ‚îÇ                            ‚îÇ
‚îÇ  ‚îú‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îº‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î§                            ‚îÇ
‚îÇ  ‚îÇ North        ‚îÇ $1,234,567      ‚îÇ                            ‚îÇ
‚îÇ  ‚îÇ South        ‚îÇ $987,654        ‚îÇ                            ‚îÇ
‚îÇ  ‚îÇ East         ‚îÇ $1,456,789      ‚îÇ                            ‚îÇ
‚îÇ  ‚îÇ West         ‚îÇ $1,098,765      ‚îÇ                            ‚îÇ
‚îÇ  ‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚î¥‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò                            ‚îÇ
‚îÇ  ‚úÖ Results validated with 98% confidence                       ‚îÇ
‚îî‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îÄ‚îò
```

## ‚ú® Key Features

### üéØ Accuracy Guarantees
* **Schema validation** - Verify data types and structure
* **Result validation** - Check for nulls, outliers, anomalies
* **Confidence scoring** - Rate result reliability (0-100%)
* **Audit trail** - Log all queries and results
* **Data lineage** - Track data source to result

### üìù Query Explanation
* **Plain English breakdown** - No SQL jargon
* **Step-by-step logic** - Show transformation flow
* **Data source transparency** - Which files are used
* **Calculation details** - How metrics are computed
* **Visual query plans** - Diagram query execution

### üóÇÔ∏è Data Source Management
* **Auto-discovery** - Find files in storage containers
* **Format detection** - CSV, Parquet, JSON, Delta, Avro
* **Schema inference** - Automatic column type detection
* **Metadata catalog** - Searchable data inventory
* **Version tracking** - Handle schema evolution

### üìä KPI & Metrics Library
* **Pre-built metrics** - Revenue, growth, conversion, churn
* **Custom definitions** - Define new KPIs
* **Time intelligence** - YoY, MoM, QoQ comparisons
* **Dimensional analysis** - Slice by region, product, customer
* **Trend analysis** - Historical patterns

### üöÄ Performance Optimization
* **Query optimization** - Predicate pushdown, column pruning
* **Caching** - Reuse results for similar queries
* **Incremental processing** - Only query new data
* **Parallel execution** - Leverage Spark parallelism
* **Resource management** - Efficient memory usage

### üîí Security & Governance
* **Access control** - Respect Unity Catalog permissions
* **Data masking** - Hide sensitive fields
* **Audit logging** - Track all data access
* **Compliance** - GDPR, HIPAA support
* **Data quality** - Flag issues and anomalies

## üíº Example Use Cases

### Use Case 1: Daily Sales Report
**Analyst Request:**
> "Show me yesterday's sales by product category"

**Agent Actions:**
1. Identifies date filter: yesterday
2. Finds sales data files
3. Generates SQL with date filter and GROUP BY
4. Explains: "Querying sales_2024.parquet, filtering for 2024-02-09, summing sales by category"
5. Validates: 12 categories found, no nulls
6. Returns formatted table with totals

---

### Use Case 2: Executive KPI Dashboard
**Management Request:**
> "What's our month-over-month revenue growth?"

**Agent Actions:**
1. Recognizes KPI: MoM revenue growth
2. Calculates current month and previous month revenue
3. Computes growth percentage
4. Explains: "Comparing Jan 2026 ($X) vs Dec 2025 ($Y), growth = Z%"
5. Validates: Both months have complete data
6. Returns growth metric with confidence score

---

### Use Case 3: Ad-hoc Analysis
**Analyst Request:**
> "Which customers in the West region spent more than $10,000 last quarter?"

**Agent Actions:**
1. Parses filters: region=West, amount>10000, time=last_quarter
2. Finds customer and transaction files
3. Generates JOIN query
4. Explains: "Joining customers.csv with transactions.parquet, filtering by region and amount"
5. Validates: 47 customers found, all amounts > $10,000
6. Returns customer list with spend amounts

---

### Use Case 4: Data Quality Check
**Analyst Request:**
> "Are there any missing values in our customer email addresses?"

**Agent Actions:**
1. Identifies data quality query
2. Finds customer data file
3. Generates null check query
4. Explains: "Counting NULL and empty strings in email column"
5. Validates: Found 23 missing emails out of 10,000 records
6. Returns count and percentage, flags data quality issue

---

### Use Case 5: Trend Analysis
**Management Request:**
> "Show me weekly active users for the past 6 months"

**Agent Actions:**
1. Recognizes time-series analysis
2. Finds user activity logs
3. Generates weekly aggregation query
4. Explains: "Counting distinct users per week from Aug 2025 to Jan 2026"
5. Validates: 26 weeks of data, no gaps
6. Returns time-series data with trend visualization

In [0]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import col, count, sum as spark_sum, avg, min as spark_min, max as spark_max
from pyspark.sql.functions import current_timestamp, lit, when, trim, lower, upper, to_date, datediff
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, DateType
from typing import Dict, List, Optional, Tuple, Any
from datetime import datetime, timedelta
from dataclasses import dataclass, field
import json
import re
import os

@dataclass
class DataSource:
    """Represents a discovered data source."""
    path: str
    format: str
    schema: StructType
    row_count: Optional[int] = None
    size_bytes: Optional[int] = None
    last_modified: Optional[str] = None
    columns: List[str] = field(default_factory=list)
    
@dataclass
class QueryRequest:
    """Represents an analyst's data request."""
    original_text: str
    intent: str
    metrics: List[str] = field(default_factory=list)
    dimensions: List[str] = field(default_factory=list)
    filters: Dict[str, Any] = field(default_factory=dict)
    time_range: Optional[Dict[str, str]] = None
    
@dataclass
class QueryResult:
    """Represents query execution results with metadata."""
    data: DataFrame
    sql_query: str
    explanation: List[str]
    execution_time_ms: float
    row_count: int
    validation_score: float
    warnings: List[str] = field(default_factory=list)
    data_sources: List[str] = field(default_factory=list)

print("‚úÖ Data classes defined")

In [0]:
class DataAnalystAgent:
    """
    AI Agent for supporting data analysts with accurate, explainable query results.
    """
    
    def __init__(self, spark: SparkSession, storage_paths: List[str] = None):
        self.spark = spark
        self.storage_paths = storage_paths or []
        self.data_catalog: Dict[str, DataSource] = {}
        self.query_history: List[QueryResult] = []
        self.kpi_definitions: Dict[str, Dict] = {}
        
        # Initialize KPI library
        self._initialize_kpi_library()
        
        print("ü§ñ Data Analyst Agent initialized")
        print(f"   Spark version: {spark.version}")
        print(f"   Storage paths: {len(self.storage_paths)} configured")
    
    def _initialize_kpi_library(self):
        """Initialize common KPI definitions."""
        self.kpi_definitions = {
            "revenue": {
                "metric": "SUM(amount)",
                "description": "Total revenue",
                "required_columns": ["amount"]
            },
            "average_order_value": {
                "metric": "AVG(amount)",
                "description": "Average order value",
                "required_columns": ["amount"]
            },
            "customer_count": {
                "metric": "COUNT(DISTINCT customer_id)",
                "description": "Unique customer count",
                "required_columns": ["customer_id"]
            }
        }
    
    def add_storage_path(self, path: str):
        """Add a storage path to scan for data files."""
        if path not in self.storage_paths:
            self.storage_paths.append(path)
            print(f"‚úÖ Added storage path: {path}")
    
    def get_catalog_summary(self) -> Dict:
        """Get summary of discovered data sources."""
        return {
            "total_sources": len(self.data_catalog),
            "formats": list(set(ds.format for ds in self.data_catalog.values())),
            "total_columns": sum(len(ds.columns) for ds in self.data_catalog.values()),
            "sources": list(self.data_catalog.keys())
        }

print("‚úÖ DataAnalystAgent class initialized")

In [0]:
def discover_files(self, path: str = None, pattern: str = "*") -> List[DataSource]:
    """
    Discover data files in storage containers.
    
    Args:
        path: Specific path to scan (uses configured paths if None)
        pattern: File pattern to match (e.g., '*.parquet', 'sales_*')
        
    Returns:
        List of discovered DataSource objects
    """
    print(f"üîç Discovering data files...")
    
    paths_to_scan = [path] if path else self.storage_paths
    discovered = []
    
    for scan_path in paths_to_scan:
        try:
            # List files using dbutils
            try:
                files = dbutils.fs.ls(scan_path)
            except:
                print(f"   ‚ö†Ô∏è  Could not list directory: {scan_path}")
                continue
            
            for file_info in files:
                file_path = file_info.path
                file_name = file_info.name
                
                # Skip directories and hidden files
                if file_info.isDir() or file_name.startswith('.'):
                    continue
                
                # Detect file format
                file_format = self._detect_format(file_name)
                if not file_format:
                    continue
                
                # Try to read schema
                try:
                    if file_format == "delta":
                        df = self.spark.read.format("delta").load(file_path)
                    elif file_format == "parquet":
                        df = self.spark.read.parquet(file_path)
                    elif file_format == "csv":
                        df = self.spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
                    elif file_format == "json":
                        df = self.spark.read.json(file_path)
                    else:
                        continue
                    
                    # Create DataSource object
                    data_source = DataSource(
                        path=file_path,
                        format=file_format,
                        schema=df.schema,
                        columns=[field.name for field in df.schema.fields],
                        size_bytes=file_info.size,
                        last_modified=str(file_info.modificationTime)
                    )
                    
                    # Add to catalog
                    self.data_catalog[file_name] = data_source
                    discovered.append(data_source)
                    
                    print(f"   ‚úÖ Found: {file_name} ({file_format}, {len(data_source.columns)} columns)")
                    
                except Exception as e:
                    print(f"   ‚ùå Error reading {file_name}: {str(e)}")
                    continue
        
        except Exception as e:
            print(f"   ‚ùå Error scanning {scan_path}: {str(e)}")
            continue
    
    print(f"\n‚úÖ Discovery complete: {len(discovered)} files found")
    return discovered

def _detect_format(self, filename: str) -> Optional[str]:
    """Detect file format from filename."""
    filename_lower = filename.lower()
    if filename_lower.endswith('.parquet'):
        return 'parquet'
    elif filename_lower.endswith('.csv'):
        return 'csv'
    elif filename_lower.endswith('.json'):
        return 'json'
    elif filename_lower.endswith('.delta') or '_delta_log' in filename_lower:
        return 'delta'
    return None

# Add methods to class
DataAnalystAgent.discover_files = discover_files
DataAnalystAgent._detect_format = _detect_format

print("‚úÖ File discovery methods added to DataAnalystAgent")

In [0]:
def understand_request(self, request_text: str) -> QueryRequest:
    """
    Parse natural language request into structured QueryRequest.
    
    Args:
        request_text: Analyst's request in natural language
        
    Returns:
        QueryRequest object with parsed intent and parameters
    """
    print(f"üß† Understanding request: '{request_text}'")
    
    request_lower = request_text.lower()
    
    # Detect intent
    intent = self._detect_intent(request_lower)
    
    # Extract metrics
    metrics = self._extract_metrics(request_lower)
    
    # Extract dimensions
    dimensions = self._extract_dimensions(request_lower)
    
    # Extract filters
    filters = self._extract_filters(request_lower)
    
    # Extract time range
    time_range = self._extract_time_range(request_lower)
    
    query_request = QueryRequest(
        original_text=request_text,
        intent=intent,
        metrics=metrics,
        dimensions=dimensions,
        filters=filters,
        time_range=time_range
    )
    
    print(f"   Intent: {intent}")
    print(f"   Metrics: {metrics}")
    print(f"   Dimensions: {dimensions}")
    if filters:
        print(f"   Filters: {filters}")
    if time_range:
        print(f"   Time range: {time_range}")
    
    return query_request

def _detect_intent(self, text: str) -> str:
    """Detect query intent from text."""
    if any(word in text for word in ['total', 'sum', 'count', 'average', 'avg']):
        return 'aggregate'
    elif any(word in text for word in ['growth', 'change', 'trend', 'over time']):
        return 'trend'
    elif any(word in text for word in ['kpi', 'metric', 'performance']):
        return 'kpi'
    elif any(word in text for word in ['where', 'filter', 'only', 'specific']):
        return 'filter'
    elif any(word in text for word in ['join', 'combine', 'merge', 'with']):
        return 'join'
    else:
        return 'general'

def _extract_metrics(self, text: str) -> List[str]:
    """Extract metrics from text."""
    metrics = []
    metric_keywords = {
        'revenue': ['revenue', 'sales', 'income'],
        'count': ['count', 'number of', 'how many'],
        'average': ['average', 'avg', 'mean'],
        'total': ['total', 'sum'],
        'max': ['maximum', 'max', 'highest'],
        'min': ['minimum', 'min', 'lowest']
    }
    
    for metric, keywords in metric_keywords.items():
        if any(kw in text for kw in keywords):
            metrics.append(metric)
    
    return metrics if metrics else ['count']

def _extract_dimensions(self, text: str) -> List[str]:
    """Extract dimensions (group by columns) from text."""
    dimensions = []
    
    # Common dimensions
    common_dims = ['region', 'category', 'product', 'customer', 'date', 'month', 'year', 'quarter']
    
    for dim in common_dims:
        if dim in text:
            dimensions.append(dim)
    
    return dimensions

def _extract_filters(self, text: str) -> Dict[str, Any]:
    """Extract filter conditions from text."""
    filters = {}
    
    # Extract comparison filters
    amount_pattern = r'(more than|greater than|less than|at least|over|under)\s+(\$?[\d,]+)'
    matches = re.findall(amount_pattern, text)
    if matches:
        operator, value = matches[0]
        value_clean = value.replace('$', '').replace(',', '')
        if 'more than' in operator or 'greater than' in operator or 'over' in operator:
            filters['amount'] = {'operator': '>', 'value': float(value_clean)}
        elif 'less than' in operator or 'under' in operator:
            filters['amount'] = {'operator': '<', 'value': float(value_clean)}
    
    # Extract region/category filters
    if 'in the' in text or 'from' in text:
        for word in text.split():
            if word.capitalize() in ['North', 'South', 'East', 'West']:
                filters['region'] = word.capitalize()
    
    return filters

def _extract_time_range(self, text: str) -> Optional[Dict[str, str]]:
    """Extract time range from text."""
    today = datetime.now()
    
    if 'yesterday' in text:
        date = today - timedelta(days=1)
        return {'start': date.strftime('%Y-%m-%d'), 'end': date.strftime('%Y-%m-%d')}
    elif 'last week' in text:
        start = today - timedelta(days=7)
        return {'start': start.strftime('%Y-%m-%d'), 'end': today.strftime('%Y-%m-%d')}
    elif 'last month' in text:
        start = today - timedelta(days=30)
        return {'start': start.strftime('%Y-%m-%d'), 'end': today.strftime('%Y-%m-%d')}
    elif 'last quarter' in text or 'q4' in text or 'quarter' in text:
        start = today - timedelta(days=90)
        return {'start': start.strftime('%Y-%m-%d'), 'end': today.strftime('%Y-%m-%d')}
    elif 'last year' in text:
        start = today - timedelta(days=365)
        return {'start': start.strftime('%Y-%m-%d'), 'end': today.strftime('%Y-%m-%d')}
    
    return None

# Add methods to class
DataAnalystAgent.understand_request = understand_request
DataAnalystAgent._detect_intent = _detect_intent
DataAnalystAgent._extract_metrics = _extract_metrics
DataAnalystAgent._extract_dimensions = _extract_dimensions
DataAnalystAgent._extract_filters = _extract_filters
DataAnalystAgent._extract_time_range = _extract_time_range

print("‚úÖ Request understanding methods added to DataAnalystAgent")

In [0]:
def generate_query(self, request: QueryRequest, data_source: DataSource) -> Tuple[str, List[str]]:
    """
    Generate SQL query from QueryRequest.
    
    Args:
        request: Parsed query request
        data_source: Data source to query
        
    Returns:
        Tuple of (SQL query string, explanation steps)
    """
    print(f"üõ†Ô∏è Generating SQL query...")
    
    explanation = []
    explanation.append(f"Data source: {data_source.path} ({data_source.format} format)")
    
    # Build SELECT clause
    select_parts = []
    
    # Add dimensions
    for dim in request.dimensions:
        if dim in data_source.columns:
            select_parts.append(dim)
            explanation.append(f"Group results by: {dim}")
    
    # Add metrics
    for metric in request.metrics:
        if metric == 'revenue' or metric == 'total':
            if 'amount' in data_source.columns:
                select_parts.append("SUM(amount) AS total_amount")
                explanation.append("Calculate: Total sum of amount column")
            elif 'revenue' in data_source.columns:
                select_parts.append("SUM(revenue) AS total_revenue")
                explanation.append("Calculate: Total sum of revenue column")
        elif metric == 'count':
            select_parts.append("COUNT(*) AS record_count")
            explanation.append("Calculate: Count of all records")
        elif metric == 'average':
            if 'amount' in data_source.columns:
                select_parts.append("AVG(amount) AS average_amount")
                explanation.append("Calculate: Average of amount column")
    
    # Default to COUNT(*) if no metrics
    if not any('AS' in part for part in select_parts):
        select_parts.append("COUNT(*) AS record_count")
        explanation.append("Calculate: Count of all records")
    
    select_clause = "SELECT " + ", ".join(select_parts)
    
    # Build FROM clause using read_files()
    from_clause = f"FROM read_files('{data_source.path}')"
    
    # Build WHERE clause
    where_conditions = []
    
    # Add time range filter
    if request.time_range:
        date_col = next((col for col in data_source.columns if 'date' in col.lower()), None)
        if date_col:
            where_conditions.append(f"{date_col} >= '{request.time_range['start']}'")
            where_conditions.append(f"{date_col} <= '{request.time_range['end']}'")
            explanation.append(f"Filter: Date range from {request.time_range['start']} to {request.time_range['end']}")
    
    # Add other filters
    for col_name, filter_spec in request.filters.items():
        if col_name in data_source.columns:
            if isinstance(filter_spec, dict):
                operator = filter_spec.get('operator', '=')
                value = filter_spec.get('value')
                where_conditions.append(f"{col_name} {operator} {value}")
                explanation.append(f"Filter: {col_name} {operator} {value}")
            else:
                where_conditions.append(f"{col_name} = '{filter_spec}'")
                explanation.append(f"Filter: {col_name} = {filter_spec}")
    
    where_clause = "WHERE " + " AND ".join(where_conditions) if where_conditions else ""
    
    # Build GROUP BY clause
    group_by_clause = ""
    if request.dimensions:
        group_by_cols = [dim for dim in request.dimensions if dim in data_source.columns]
        if group_by_cols:
            group_by_clause = "GROUP BY " + ", ".join(group_by_cols)
    
    # Build ORDER BY clause
    order_by_clause = ""
    if request.dimensions:
        order_by_clause = f"ORDER BY {request.dimensions[0]}"
    
    # Combine all parts
    query_parts = [select_clause, from_clause]
    if where_clause:
        query_parts.append(where_clause)
    if group_by_clause:
        query_parts.append(group_by_clause)
    if order_by_clause:
        query_parts.append(order_by_clause)
    
    sql_query = "\n".join(query_parts)
    
    print(f"   ‚úÖ Query generated ({len(explanation)} explanation steps)")
    return sql_query, explanation

# Add method to class
DataAnalystAgent.generate_query = generate_query

print("‚úÖ Query generation methods added to DataAnalystAgent")

In [0]:
def execute_query(self, sql_query: str, explanation: List[str], data_sources: List[str]) -> QueryResult:
    """
    Execute SQL query and validate results.
    
    Args:
        sql_query: SQL query to execute
        explanation: Query explanation steps
        data_sources: List of data source paths used
        
    Returns:
        QueryResult with data and validation metadata
    """
    print(f"\n‚ñ∂Ô∏è Executing query...")
    
    start_time = datetime.now()
    warnings = []
    
    try:
        # Execute query
        result_df = self.spark.sql(sql_query)
        
        # Get row count
        row_count = result_df.count()
        
        execution_time = (datetime.now() - start_time).total_seconds() * 1000
        
        print(f"   ‚úÖ Query executed in {execution_time:.0f}ms")
        print(f"   ‚úÖ Returned {row_count} rows")
        
        # Validate results
        validation_score, validation_warnings = self._validate_results(result_df, row_count)
        warnings.extend(validation_warnings)
        
        return QueryResult(
            data=result_df,
            sql_query=sql_query,
            explanation=explanation,
            execution_time_ms=execution_time,
            row_count=row_count,
            validation_score=validation_score,
            warnings=warnings,
            data_sources=data_sources
        )
        
    except Exception as e:
        print(f"   ‚ùå Query execution failed: {str(e)}")
        raise

def _validate_results(self, df: DataFrame, row_count: int) -> Tuple[float, List[str]]:
    """
    Validate query results and return confidence score.
    
    Args:
        df: Result DataFrame
        row_count: Number of rows returned
        
    Returns:
        Tuple of (validation score 0-100, list of warnings)
    """
    score = 100.0
    warnings = []
    
    # Check 1: Empty results
    if row_count == 0:
        score -= 50
        warnings.append("‚ö†Ô∏è  Query returned no results")
        return score, warnings
    
    # Check 2: Null values in results
    for col_name in df.columns:
        null_count = df.filter(df[col_name].isNull()).count()
        if null_count > 0:
            null_pct = (null_count / row_count) * 100
            if null_pct > 50:
                score -= 20
                warnings.append(f"‚ö†Ô∏è  Column '{col_name}' has {null_pct:.1f}% null values")
            elif null_pct > 10:
                score -= 5
                warnings.append(f"‚ö†Ô∏è  Column '{col_name}' has {null_pct:.1f}% null values")
    
    # Check 3: Reasonable row count
    if row_count > 1000000:
        score -= 10
        warnings.append(f"‚ö†Ô∏è  Large result set ({row_count:,} rows) - consider adding filters")
    
    # Check 4: Data type consistency for amount columns
    for field in df.schema.fields:
        if 'amount' in field.name.lower() or 'revenue' in field.name.lower():
            # Check for negative values in amount columns
            try:
                negative_count = df.filter(df[field.name] < 0).count()
                if negative_count > 0:
                    score -= 10
                    warnings.append(f"‚ö†Ô∏è  Column '{field.name}' has {negative_count} negative values")
            except:
                pass  # Skip if comparison not supported
    
    return max(score, 0), warnings

# Add methods to class
DataAnalystAgent.execute_query = execute_query
DataAnalystAgent._validate_results = _validate_results

print("‚úÖ Query execution and validation methods added to DataAnalystAgent")

In [0]:
def explain_query(self, result: QueryResult) -> None:
    """
    Print detailed query explanation in plain English.
    
    Args:
        result: QueryResult object
    """
    print("\n" + "="*80)
    print("üìù QUERY EXPLANATION")
    print("="*80)
    
    print("\nüìÑ What this query does:")
    for i, step in enumerate(result.explanation, 1):
        print(f"   {i}. {step}")
    
    print(f"\nüíæ Data Sources:")
    for source in result.data_sources:
        print(f"   ‚Ä¢ {source}")
    
    print(f"\n‚è±Ô∏è  Execution Time: {result.execution_time_ms:.0f}ms")
    print(f"üìà Rows Returned: {result.row_count:,}")
    
    print(f"\n‚úÖ Validation Score: {result.validation_score:.0f}%")
    if result.warnings:
        print("\n‚ö†Ô∏è  Warnings:")
        for warning in result.warnings:
            print(f"   {warning}")
    else:
        print("   No issues detected - results are reliable")
    
    print("\n" + "="*80)
    print("üìä SQL QUERY")
    print("="*80)
    print(result.sql_query)
    print("="*80 + "\n")

def format_results(self, result: QueryResult, limit: int = 100) -> None:
    """
    Display formatted query results.
    
    Args:
        result: QueryResult object
        limit: Maximum rows to display
    """
    print(f"\nüìä QUERY RESULTS ({result.row_count:,} total rows)")
    print("="*80)
    
    if result.row_count == 0:
        print("‚ö†Ô∏è  No results found")
        return
    
    # Display results
    display(result.data.limit(limit))
    
    if result.row_count > limit:
        print(f"\nüìå Showing first {limit} of {result.row_count:,} rows")

# Add methods to class
DataAnalystAgent.explain_query = explain_query
DataAnalystAgent.format_results = format_results

print("‚úÖ Result formatting methods added to DataAnalystAgent")

In [0]:
def answer_request(self, request_text: str, data_source_name: str = None) -> QueryResult:
    """
    Complete workflow: understand request, generate query, execute, and explain.
    
    This is the main method analysts will use.
    
    Args:
        request_text: Analyst's request in natural language
        data_source_name: Specific data source to use (auto-detect if None)
        
    Returns:
        QueryResult object with data and metadata
    """
    print("\n" + "üöÄ "*30)
    print("üöÄ PROCESSING ANALYST REQUEST")
    print("üöÄ "*30 + "\n")
    print(f"üí¨ Request: '{request_text}'\n")
    
    # Step 1: Understand request
    request = self.understand_request(request_text)
    
    # Step 2: Find appropriate data source
    if data_source_name:
        if data_source_name not in self.data_catalog:
            raise ValueError(f"Data source '{data_source_name}' not found in catalog")
        data_source = self.data_catalog[data_source_name]
    else:
        # Auto-select first available data source
        if not self.data_catalog:
            raise ValueError("No data sources available. Run discover_files() first.")
        data_source = list(self.data_catalog.values())[0]
        print(f"\nüíæ Auto-selected data source: {list(self.data_catalog.keys())[0]}")
    
    # Step 3: Generate query
    sql_query, explanation = self.generate_query(request, data_source)
    
    # Step 4: Execute query
    result = self.execute_query(sql_query, explanation, [data_source.path])
    
    # Step 5: Explain and display
    self.explain_query(result)
    self.format_results(result)
    
    # Save to history
    self.query_history.append(result)
    
    print("\n" + "‚úÖ "*30)
    print("‚úÖ REQUEST COMPLETED SUCCESSFULLY")
    print("‚úÖ "*30 + "\n")
    
    return result

# Add method to class
DataAnalystAgent.answer_request = answer_request

print("‚úÖ Main workflow method added to DataAnalystAgent")
print("\n" + "="*80)
print("‚úÖ DATA ANALYST AGENT MODULE COMPLETE")
print("="*80)
print("\nüéâ The agent is ready to use!")
print("\nQuick Start:")
print("  1. agent = DataAnalystAgent(spark, ['/path/to/data'])")
print("  2. agent.discover_files()")
print("  3. result = agent.answer_request('Show me total revenue by region')")
print("\n" + "="*80)

In [0]:
# Generate sample sales data for testing
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType, IntegerType
from datetime import datetime, timedelta
import random

print("üìä Generating sample sales data...\n")

# Set seed for reproducibility
random.seed(42)

# Generate dates for the past year
start_date = datetime(2024, 1, 1)
end_date = datetime(2025, 2, 10)
date_range = [(start_date + timedelta(days=x)).strftime('%Y-%m-%d') 
              for x in range((end_date - start_date).days)]

# Sample data parameters
regions = ['North', 'South', 'East', 'West']
categories = ['Electronics', 'Clothing', 'Food', 'Home', 'Sports']
products = {
    'Electronics': ['Laptop', 'Phone', 'Tablet', 'Headphones'],
    'Clothing': ['Shirt', 'Pants', 'Jacket', 'Shoes'],
    'Food': ['Snacks', 'Beverages', 'Frozen', 'Fresh'],
    'Home': ['Furniture', 'Decor', 'Kitchen', 'Bedding'],
    'Sports': ['Equipment', 'Apparel', 'Footwear', 'Accessories']
}

# Generate 1000 sales records
sales_data = []
for i in range(1000):
    date = random.choice(date_range)
    region = random.choice(regions)
    category = random.choice(categories)
    product = random.choice(products[category])
    amount = round(random.uniform(10, 5000), 2)
    quantity = random.randint(1, 10)
    customer_id = f"CUST{random.randint(1000, 9999)}"
    
    sales_data.append((
        i + 1,
        date,
        region,
        category,
        product,
        amount,
        quantity,
        customer_id
    ))

# Create DataFrame
schema = StructType([
    StructField("transaction_id", IntegerType(), False),
    StructField("date", StringType(), False),
    StructField("region", StringType(), False),
    StructField("category", StringType(), False),
    StructField("product", StringType(), False),
    StructField("amount", DoubleType(), False),
    StructField("quantity", IntegerType(), False),
    StructField("customer_id", StringType(), False)
])

sales_df = spark.createDataFrame(sales_data, schema)

print(f"‚úÖ Generated {sales_df.count()} sales records")
print(f"   Date range: {min(date_range)} to {max(date_range)}")
print(f"   Regions: {', '.join(regions)}")
print(f"   Categories: {', '.join(categories)}")
print("\nüìä Sample data preview:")
display(sales_df.limit(10))

In [0]:
# Save sample data to DBFS for testing
import os

# Define storage path
storage_path = "/tmp/analyst_agent_demo/sales_data.parquet"

print(f"üíæ Saving sample data to: {storage_path}\n")

# Save as Parquet
sales_df.write.mode("overwrite").parquet(storage_path)

print("‚úÖ Sample data saved successfully")
print(f"\nüìÅ File location: {storage_path}")
print(f"   Format: Parquet")
print(f"   Rows: {sales_df.count()}")
print(f"   Columns: {len(sales_df.columns)}")

# Verify file exists
try:
    files = dbutils.fs.ls("/tmp/analyst_agent_demo/")
    print(f"\n‚úÖ Verification: Found {len(files)} file(s) in directory")
    for f in files:
        print(f"   ‚Ä¢ {f.name} ({f.size} bytes)")
except Exception as e:
    print(f"\n‚ö†Ô∏è  Could not verify: {str(e)}")

print("\n" + "="*80)
print("‚úÖ SAMPLE DATA READY FOR TESTING")
print("="*80)

# üéØ Demo: Data Analyst Assistant Agent

This section demonstrates the complete workflow of the Data Analyst Assistant Agent.

## What You'll See:
1. **Agent Initialization** - Set up the agent with storage paths
2. **File Discovery** - Automatically find and catalog data files
3. **Natural Language Queries** - Ask questions in plain English
4. **Query Explanation** - See exactly what the SQL does
5. **Result Validation** - Get confidence scores on accuracy
6. **Formatted Results** - View clean, readable output

---

## Demo Scenarios:
* üìä **Simple Aggregation** - "Show me total revenue"
* üó∫Ô∏è **Dimensional Analysis** - "Show me revenue by region"
* üìÖ **Time-Based Query** - "Show me sales from last quarter"
* üîç **Filtered Query** - "Show me sales in the West region"
* üìä **KPI Calculation** - "What's the average order value?"

In [0]:
# Demo 1: Initialize the agent and discover data files

print("üöÄ DEMO 1: Agent Initialization & File Discovery")
print("="*80 + "\n")

# Step 1: Initialize the agent
print("Step 1: Initializing Data Analyst Agent...\n")
agent = DataAnalystAgent(spark, storage_paths=["/tmp/analyst_agent_demo/"])

print("\n" + "-"*80 + "\n")

# Step 2: Discover files
print("Step 2: Discovering data files...\n")
discovered_files = agent.discover_files()

print("\n" + "-"*80 + "\n")

# Step 3: Show catalog summary
print("Step 3: Data Catalog Summary\n")
catalog = agent.get_catalog_summary()
print(f"üìä Total data sources: {catalog['total_sources']}")
print(f"üìÅ File formats: {', '.join(catalog['formats'])}")
print(f"üìä Total columns: {catalog['total_columns']}")
print(f"\nüíæ Available sources:")
for source in catalog['sources']:
    print(f"   ‚Ä¢ {source}")

print("\n" + "="*80)
print("‚úÖ Agent is ready to answer requests!")
print("="*80)