# Local Data Engineering Workflow

This notebook demonstrates a complete data engineering pipeline using:
- **dlt (data load tool)**: For data ingestion and schema management
- **DuckDB**: As a high-performance analytical database
- **Pandas**: For data manipulation and analysis

## Workflow Overview
1. Load data from CSV using dlt
2. Perform data quality checks
3. Run analytical queries
4. Export results to CSV files

## 1. Setup and Imports

In [7]:
# Import required libraries
import dlt
import duckdb
import pandas as pd
from pathlib import Path
from datetime import datetime

print("✓ All libraries imported successfully")
print(f"dlt version: {dlt.__version__}")
print(f"duckdb version: {duckdb.__version__}")
print(f"pandas version: {pd.__version__}")

✓ All libraries imported successfully
dlt version: 1.20.0
duckdb version: 1.4.3
pandas version: 2.3.3


## 2. Data Loading with dlt

We'll use dlt to load our sample CSV data into DuckDB. dlt automatically:
- Infers schema from the data
- Creates tables in DuckDB
- Handles data type conversions
- Manages schema evolution

In [8]:
@dlt.resource(name="sales_data", write_disposition="replace")
def load_sales_data():
    """
    Load sales data from CSV file.
    
    The @dlt.resource decorator marks this function as a data source.
    - name: The table name in the database
    - write_disposition="replace": Replaces existing data on each run
    """
    # Read CSV file
    csv_path = Path("../data/sample.csv")
    
    if not csv_path.exists():
        raise FileNotFoundError(f"Sample data not found at {csv_path}")
    
    # Load data with pandas for better control
    df = pd.read_csv(csv_path)
    
    # Convert to records (list of dicts) for dlt
    records = df.to_dict('records')
    
    print(f"✓ Loaded {len(records)} records from {csv_path}")
    
    return records

print("✓ Data source function defined")

✓ Data source function defined


In [9]:
# Configure and run the dlt pipeline
pipeline = dlt.pipeline(
    pipeline_name="local_data_pipeline",
    destination="duckdb",
    dataset_name="sales_analytics"
)

print("✓ Pipeline configured")
print(f"  Pipeline name: local_data_pipeline")
print(f"  Destination: DuckDB")
print(f"  Dataset: sales_analytics")
print()

# Execute the pipeline
print("Loading data into DuckDB...")
info = pipeline.run(load_sales_data())

print("\n" + "="*60)
print("✓ DATA LOADED SUCCESSFULLY")
print("="*60)
print(f"Loaded at: {info.started_at}")
print(f"Dataset: {info.dataset_name}")
print(f"Tables created: {list(info.load_packages[0].schema_update.keys()) if info.load_packages else 'N/A'}")

✓ Pipeline configured
  Pipeline name: local_data_pipeline
  Destination: DuckDB
  Dataset: sales_analytics

Loading data into DuckDB...
✓ Loaded 10 records from ..\data\sample.csv

✓ DATA LOADED SUCCESSFULLY
Loaded at: 2025-12-25 09:51:47.410778+00:00
Dataset: sales_analytics
Tables created: []


In [10]:
# Inspect the schema created by dlt
print("Schema Information:")
print("="*60)

# Get the DuckDB connection from dlt
with pipeline.sql_client() as client:
    # Query to show table structure
    with client.execute_query("DESCRIBE sales_analytics.sales_data") as cursor:
        schema_info = cursor.fetchall()
        
        print(f"\nTable: sales_data")
        print(f"{'Column':<20} {'Type':<15} {'Nullable':<10}")
        print("-"*50)
        for row in schema_info:
            print(f"{row[0]:<20} {row[1]:<15} {str(row[2]):<10}")

Schema Information:

Table: sales_data
Column               Type            Nullable  
--------------------------------------------------
date                 VARCHAR         YES       
product_category     VARCHAR         YES       
quantity             BIGINT          YES       
price                DOUBLE          YES       
region               VARCHAR         YES       
customer_id          VARCHAR         YES       
_dlt_load_id         VARCHAR         NO        
_dlt_id              VARCHAR         NO        


## 3. Data Quality Checks

Before analysis, we'll validate data quality by checking for:
- Null values
- Data type consistency
- Business rule violations
- Duplicates

In [11]:
def run_quality_checks():
    """
    Execute comprehensive data quality checks on the loaded data.
    """
    print("\n" + "="*60)
    print("DATA QUALITY CHECKS")
    print("="*60 + "\n")
    
    with pipeline.sql_client() as client:
        
        # Check 1: Null Values
        print("1. Checking for NULL values...")
        null_check_query = """
            SELECT 
                COUNT(*) as total_records,
                SUM(CASE WHEN date IS NULL THEN 1 ELSE 0 END) as null_dates,
                SUM(CASE WHEN product_category IS NULL THEN 1 ELSE 0 END) as null_categories,
                SUM(CASE WHEN quantity IS NULL THEN 1 ELSE 0 END) as null_quantities,
                SUM(CASE WHEN price IS NULL THEN 1 ELSE 0 END) as null_prices,
                SUM(CASE WHEN region IS NULL THEN 1 ELSE 0 END) as null_regions,
                SUM(CASE WHEN customer_id IS NULL THEN 1 ELSE 0 END) as null_customer_ids
            FROM sales_analytics.sales_data
        """
        
        with client.execute_query(null_check_query) as cursor:
            result = cursor.fetchone()
            has_nulls = any(result[1:])  # Check if any null counts > 0
            
            if has_nulls:
                print("   ⚠ WARNING: NULL values detected")
                for i, col in enumerate(['dates', 'categories', 'quantities', 'prices', 'regions', 'customer_ids']):
                    if result[i+1] > 0:
                        print(f"     - {col}: {result[i+1]} null values")
            else:
                print("   ✓ PASS: No NULL values found")
        
        # Check 2: Business Rule Validation
        print("\n2. Validating business rules...")
        business_rule_query = """
            SELECT 
                SUM(CASE WHEN quantity <= 0 THEN 1 ELSE 0 END) as invalid_quantity,
                SUM(CASE WHEN price <= 0 THEN 1 ELSE 0 END) as invalid_price
            FROM sales_analytics.sales_data
        """
        
        with client.execute_query(business_rule_query) as cursor:
            result = cursor.fetchone()
            if result[0] > 0 or result[1] > 0:
                print("   ⚠ WARNING: Business rule violations detected")
                if result[0] > 0:
                    print(f"     - {result[0]} records with quantity <= 0")
                if result[1] > 0:
                    print(f"     - {result[1]} records with price <= 0")
            else:
                print("   ✓ PASS: All business rules satisfied")
        
        # Check 3: Duplicates
        print("\n3. Checking for duplicates...")
        duplicate_query = """
            SELECT 
                date, product_category, quantity, price, region, customer_id,
                COUNT(*) as duplicate_count
            FROM sales_analytics.sales_data
            GROUP BY date, product_category, quantity, price, region, customer_id
            HAVING COUNT(*) > 1
        """
        
        with client.execute_query(duplicate_query) as cursor:
            duplicates = cursor.fetchall()
            if duplicates:
                print(f"   ⚠ WARNING: {len(duplicates)} duplicate record groups found")
            else:
                print("   ✓ PASS: No duplicates found")
        
        # Check 4: Data Statistics
        print("\n4. Data statistics...")
        stats_query = """
            SELECT 
                COUNT(*) as total_records,
                COUNT(DISTINCT customer_id) as unique_customers,
                COUNT(DISTINCT product_category) as unique_categories,
                COUNT(DISTINCT region) as unique_regions,
                MIN(date) as earliest_date,
                MAX(date) as latest_date
            FROM sales_analytics.sales_data
        """
        
        with client.execute_query(stats_query) as cursor:
            stats = cursor.fetchone()
            print(f"   Total records: {stats[0]}")
            print(f"   Unique customers: {stats[1]}")
            print(f"   Product categories: {stats[2]}")
            print(f"   Regions: {stats[3]}")
            print(f"   Date range: {stats[4]} to {stats[5]}")
    
    print("\n" + "="*60)
    print("✓ QUALITY CHECKS COMPLETED")
    print("="*60)

# Run the quality checks
run_quality_checks()


DATA QUALITY CHECKS

1. Checking for NULL values...
   ✓ PASS: No NULL values found

2. Validating business rules...
   ✓ PASS: All business rules satisfied

3. Checking for duplicates...
   ✓ PASS: No duplicates found

4. Data statistics...
   Total records: 10
   Unique customers: 7
   Product categories: 3
   Regions: 4
   Date range: 2024-01-15 to 2024-01-24

✓ QUALITY CHECKS COMPLETED


## 4. Analytics Queries

Now we'll perform various analytical queries to derive insights from the data.

In [None]:
# Summary Statistics
print("\n" + "="*60)
print("SUMMARY STATISTICS")
print("="*60 + "\n")

summary_query = """
    SELECT 
        COUNT(*) as total_transactions,
        SUM(quantity * price) as total_revenue,
        AVG(price) as avg_price,
        AVG(quantity) as avg_quantity,
        MIN(date) as earliest_date,
        MAX(date) as latest_date
    FROM sales_analytics.sales_data
"""

with pipeline.sql_client() as client:
    with client.execute_query(summary_query) as cursor:
        summary_df = cursor.df()
    
print(summary_df.to_string(index=False))
print("\n" + "="*60)

# Store for export
summary_stats = summary_df

In [None]:
# Category Analysis
print("\n" + "="*60)
print("SALES BY PRODUCT CATEGORY")
print("="*60 + "\n")

category_query = """
    SELECT 
        product_category,
        COUNT(*) as transactions,
        SUM(quantity) as total_units_sold,
        SUM(quantity * price) as total_revenue,
        ROUND(AVG(price), 2) as avg_price,
        ROUND(AVG(quantity), 2) as avg_quantity
    FROM sales_analytics.sales_data
    GROUP BY product_category
    ORDER BY total_revenue DESC
"""

with pipeline.sql_client() as client:
    with client.execute_query(category_query) as cursor:
        category_df = cursor.df()

print(category_df.to_string(index=False))
print("\n" + "="*60)

# Store for export
category_analysis = category_df

In [None]:
# Regional Analysis
print("\n" + "="*60)
print("SALES BY REGION")
print("="*60 + "\n")

regional_query = """
    SELECT 
        region,
        COUNT(*) as transactions,
        SUM(quantity) as total_units_sold,
        SUM(quantity * price) as total_revenue,
        ROUND(AVG(quantity * price), 2) as avg_transaction_value
    FROM sales_analytics.sales_data
    GROUP BY region
    ORDER BY total_revenue DESC
"""

with pipeline.sql_client() as client:
    with client.execute_query(regional_query) as cursor:
        regional_df = cursor.df()

print(regional_df.to_string(index=False))
print("\n" + "="*60)

# Store for export
regional_analysis = regional_df

## 5. Export Results to CSV

Finally, we'll export all analysis results to CSV files for further use.

In [None]:
# Create output directory if it doesn't exist
output_dir = Path("../output")
output_dir.mkdir(exist_ok=True)

print("\n" + "="*60)
print("EXPORTING RESULTS")
print("="*60 + "\n")

# Add timestamp to filenames for versioning
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

# Export summary statistics
summary_file = output_dir / f"summary_stats_{timestamp}.csv"
summary_stats.to_csv(summary_file, index=False)
print(f"✓ Exported: {summary_file}")

# Export category analysis
category_file = output_dir / f"category_analysis_{timestamp}.csv"
category_analysis.to_csv(category_file, index=False)
print(f"✓ Exported: {category_file}")

# Export regional analysis
regional_file = output_dir / f"regional_analysis_{timestamp}.csv"
regional_analysis.to_csv(regional_file, index=False)
print(f"✓ Exported: {regional_file}")

print("\n" + "="*60)
print("✓ ALL RESULTS EXPORTED SUCCESSFULLY")
print("="*60)

## Summary

This notebook demonstrated a complete data engineering workflow:

1. ✓ Loaded data from CSV using dlt with automatic schema inference
2. ✓ Stored data in DuckDB for high-performance analytics
3. ✓ Performed comprehensive data quality checks
4. ✓ Executed analytical queries for business insights
5. ✓ Exported results to CSV files with timestamps

### Key Learnings

- **dlt** simplifies data ingestion with automatic schema management
- **DuckDB** provides SQL analytics without a separate database server
- **Data quality checks** are essential before analysis
- **Pipeline automation** makes workflows reproducible

### Next Steps

- Add more data sources (APIs, databases, etc.)
- Implement incremental loading strategies
- Create data visualizations with matplotlib or plotly
- Schedule the pipeline for regular execution
- Add data transformation logic between loading and analysis