# Data Lineage Pipeline with PySpark and Airflow

Welcome to the comprehensive Data Lineage Pipeline notebook! This notebook demonstrates:

🔄 **PySpark Data Transformations** - Multiple transformation options with interactive selection
📊 **Data Lineage Tracking** - Complete tracking from source to destination
🌊 **Airflow Integration** - Generate production-ready DAGs
📈 **Interactive Visualizations** - Beautiful flowcharts and pipeline diagrams
🎛️ **Interactive UI** - Choose transformations using widgets

## Table of Contents
1. [Setup Environment and Dependencies](#setup)
2. [Initialize Spark Session and Airflow DAG](#init)
3. [Load Sample E-commerce Dataset](#load)
4. [Data Quality Assessment](#quality)
5. [Interactive Transformation Selection](#transformations)
6. [Execute Selected Transformations Pipeline](#execute)
7. [Capture Data Lineage Metadata](#lineage)
8. [Create Interactive Lineage Visualization](#visualization)
9. [Generate Airflow DAG Code](#airflow)
10. [Export Pipeline Results](#export)

## 1. Setup Environment and Dependencies <a id="setup"></a>

First, let's install and import all the required libraries for our data lineage pipeline.

In [None]:
# Install required packages (uncomment if needed)
# !pip install pyspark pandas plotly networkx ipywidgets faker seaborn matplotlib

# Import core libraries
import sys
import os
from datetime import datetime, timedelta
import json
import warnings
warnings.filterwarnings('ignore')

# Add project root to path
project_root = os.path.abspath(os.path.join(os.getcwd(), '..'))
sys.path.append(project_root)

print(f"Project root: {project_root}")
print("✅ Environment setup complete!")

In [None]:
# Data processing libraries
import pandas as pd
import numpy as np
from faker import Faker

# PySpark libraries
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window

# Visualization libraries
import plotly.graph_objects as go
import plotly.express as px
from plotly.subplots import make_subplots
import matplotlib.pyplot as plt
import seaborn as sns

# Interactive widgets
import ipywidgets as widgets
from IPython.display import display, HTML, clear_output

# Network analysis for lineage
import networkx as nx

# Our custom modules
try:
    from src.transformations import (
        SparkTransformationEngine, 
        AVAILABLE_TRANSFORMATIONS,
        DataLineageTracker
    )
    from src.lineage import create_lineage_visualization
    print("✅ Custom modules imported successfully!")
except ImportError as e:
    print(f"⚠️ Custom modules not found: {e}")
    print("Make sure you're running from the project root or generate sample data first")

print("✅ All libraries imported successfully!")

## 2. Initialize Spark Session and Airflow DAG <a id="init"></a>

Let's initialize our Spark session with optimized configurations and set up the data lineage tracking system.

In [None]:
# Initialize Spark session with optimized configuration
def create_spark_session(app_name="DataLineageNotebook"):
    """Create an optimized Spark session"""
    spark = SparkSession.builder \
        .appName(app_name) \
        .config("spark.sql.adaptive.enabled", "true") \
        .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
        .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
        .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
        .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \
        .getOrCreate()
    
    # Set log level to reduce verbosity
    spark.sparkContext.setLogLevel("WARN")
    return spark

# Initialize the transformation engine
try:
    engine = SparkTransformationEngine("DataLineageNotebook")
    spark = engine.spark
    print("✅ Spark session initialized successfully!")
    print(f"🔧 Spark version: {spark.version}")
    print(f"📊 Available cores: {spark.sparkContext.defaultParallelism}")
    
    # Initialize lineage tracker
    lineage_tracker = engine.lineage_tracker
    print("✅ Lineage tracker initialized!")
    
except Exception as e:
    print(f"❌ Failed to initialize Spark: {e}")
    print("Creating basic Spark session...")
    spark = create_spark_session()
    lineage_tracker = DataLineageTracker()

# Global variables for tracking
transformation_history = []
loaded_datasets = {}
current_lineage = None

## 3. Load Sample E-commerce Dataset <a id="load"></a>

Let's generate and load comprehensive e-commerce sample data that includes customers, products, orders, and inventory information.

In [None]:
# Function to generate sample data if it doesn't exist
def generate_sample_data():
    """Generate sample e-commerce data with improved error handling"""
    try:
        # First, let's try to import and run the data generation
        from data.generate_sample_data import save_sample_data
        save_sample_data()
        print("✅ Sample data generated successfully!")
        return True
    except ImportError as e:
        print(f"❌ Could not import data generator: {e}")
        print("🔧 Creating sample data using inline generation...")
        return generate_inline_sample_data()
    except TypeError as e:
        if "weights" in str(e):
            print(f"❌ Data generation failed due to weights parameter: {e}")
            print("🔧 This is likely due to an older Python version.")
            print("✅ Data generation script has been fixed! Please try running the cell again.")
            return False
        else:
            print(f"❌ Failed to generate sample data: {e}")
            return False
    except Exception as e:
        print(f"❌ Failed to generate sample data: {e}")
        print("🔧 Creating minimal sample data...")
        return generate_inline_sample_data()

def generate_inline_sample_data():
    """Generate minimal sample data inline as fallback"""
    try:
        import pandas as pd
        import numpy as np
        from faker import Faker
        import random
        
        fake = Faker()
        fake.seed(42)
        np.random.seed(42)
        random.seed(42)
        
        print("📊 Generating minimal sample datasets...")
        
        # Create data directory
        data_dir = os.path.join(project_root, 'data')
        os.makedirs(data_dir, exist_ok=True)
        
        # Generate customers (simplified)
        customers_data = []
        for i in range(1000):
            customer = {
                'customer_id': f"CUST_{i+1:06d}",
                'first_name': fake.first_name(),
                'last_name': fake.last_name(),
                'email': fake.email(),
                'city': fake.city(),
                'state': fake.state(),
                'age': random.randint(18, 80),
                'registration_date': fake.date_between(start_date='-2y', end_date='today'),
                'is_active': np.random.choice([True, False], p=[0.8, 0.2])  # Fixed weights issue
            }
            customers_data.append(customer)
        
        customers_df = pd.DataFrame(customers_data)
        customers_df.to_csv(os.path.join(data_dir, 'customers.csv'), index=False)
        print("✅ Generated customers.csv")
        
        # Generate products (simplified)
        products_data = []
        categories = ['Electronics', 'Clothing', 'Books', 'Home']
        
        for i in range(100):
            product = {
                'product_id': f"PROD_{i+1:06d}",
                'product_name': fake.catch_phrase(),
                'category': random.choice(categories),
                'price': round(random.uniform(10, 500), 2),
                'stock_quantity': random.randint(0, 100),
                'is_discontinued': np.random.choice([True, False], p=[0.1, 0.9])  # Fixed weights issue
            }
            products_data.append(product)
        
        products_df = pd.DataFrame(products_data)
        products_df.to_csv(os.path.join(data_dir, 'products.csv'), index=False)
        print("✅ Generated products.csv")
        
        # Generate orders (simplified)
        orders_data = []
        for i in range(500):
            order = {
                'order_id': f"ORDER_{i+1:08d}",
                'customer_id': f"CUST_{random.randint(1, 1000):06d}",
                'order_date': fake.date_between(start_date='-1y', end_date='today'),
                'total_amount': round(random.uniform(20, 1000), 2),
                'status': random.choice(['Completed', 'Pending', 'Cancelled'])
            }
            orders_data.append(order)
        
        orders_df = pd.DataFrame(orders_data)
        orders_df.to_csv(os.path.join(data_dir, 'orders.csv'), index=False)
        print("✅ Generated orders.csv")
        
        # Generate order_items (simplified)
        order_items_data = []
        for i, order in enumerate(orders_data):
            num_items = random.randint(1, 5)
            for j in range(num_items):
                item = {
                    'order_item_id': f"ITEM_{i+1:08d}_{j+1}",
                    'order_id': order['order_id'],
                    'product_id': f"PROD_{random.randint(1, 100):06d}",
                    'quantity': random.randint(1, 5),
                    'unit_price': round(random.uniform(10, 200), 2)
                }
                order_items_data.append(item)
        
        order_items_df = pd.DataFrame(order_items_data)
        order_items_df.to_csv(os.path.join(data_dir, 'order_items.csv'), index=False)
        print("✅ Generated order_items.csv")
        
        # Generate inventory (simplified)
        inventory_data = []
        for i in range(100):
            inventory = {
                'product_id': f"PROD_{i+1:06d}",
                'warehouse_location': random.choice(['Warehouse_A', 'Warehouse_B', 'Warehouse_C']),
                'quantity_available': random.randint(0, 200),
                'last_updated': fake.date_between(start_date='-30d', end_date='today')
            }
            inventory_data.append(inventory)
        
        inventory_df = pd.DataFrame(inventory_data)
        inventory_df.to_csv(os.path.join(data_dir, 'inventory.csv'), index=False)
        print("✅ Generated inventory.csv")
        
        print("✅ Minimal sample data generated successfully!")
        return True
        
    except Exception as e:
        print(f"❌ Failed to generate inline sample data: {e}")
        return False

# Function to load datasets
def load_datasets():
    """Load all available datasets"""
    data_dir = os.path.join(project_root, 'data')
    datasets = {}
    
    # Dataset definitions
    dataset_files = {
        'customers': 'customers.csv',
        'products': 'products.csv', 
        'orders': 'orders.csv',
        'order_items': 'order_items.csv',
        'inventory': 'inventory.csv'
    }
    
    print("📁 Loading datasets...")
    
    for dataset_name, filename in dataset_files.items():
        file_path = os.path.join(data_dir, filename)
        
        if os.path.exists(file_path):
            try:
                if 'engine' in globals():
                    df = engine.load_data(file_path, dataset_name, 'csv')
                else:
                    df = spark.read.option("header", "true").option("inferSchema", "true").csv(file_path)
                
                datasets[dataset_name] = df
                row_count = df.count()
                col_count = len(df.columns)
                print(f"✅ {dataset_name}: {row_count:,} rows, {col_count} columns")
                
            except Exception as e:
                print(f"❌ Failed to load {dataset_name}: {e}")
        else:
            print(f"⚠️ {filename} not found")
    
    return datasets

# Check if data exists, generate if needed
data_dir = os.path.join(project_root, 'data')
customers_file = os.path.join(data_dir, 'customers.csv')

if not os.path.exists(customers_file):
    print("📊 Sample data not found. Generating...")
    if generate_sample_data():
        print("✅ Sample data generation completed!")
    else:
        print("❌ Could not generate sample data. Please check the error messages above.")
        print("💡 The data generation script has been fixed. Please try again!")

# Load all datasets
loaded_datasets = load_datasets()
print(f"\n📈 Summary: Loaded {len(loaded_datasets)} datasets")

In [None]:
# Preview the datasets
def show_dataset_info(dataset_name, df, sample_size=5):
    """Display information about a dataset"""
    print(f"\n📊 Dataset: {dataset_name}")
    print(f"📏 Shape: {df.count():,} rows × {len(df.columns)} columns")
    
    print(f"\n🏗️ Schema:")
    for field in df.schema.fields:
        print(f"  • {field.name}: {field.dataType}")
    
    print(f"\n👀 Sample Data (first {sample_size} rows):")
    display(df.limit(sample_size).toPandas())

# Show information for each dataset
for name, df in loaded_datasets.items():
    show_dataset_info(name, df)

## 4. Data Quality Assessment <a id="quality"></a>

Before applying transformations, let's assess the quality of our data by checking for missing values, duplicates, and other data quality issues.

In [None]:
# Data quality assessment functions
def assess_data_quality(df, dataset_name):
    """Comprehensive data quality assessment"""
    print(f"\n🔍 Data Quality Assessment for {dataset_name}")
    print("=" * 50)
    
    # Basic statistics
    total_rows = df.count()
    total_cols = len(df.columns)
    print(f"📊 Total Records: {total_rows:,}")
    print(f"📊 Total Columns: {total_cols}")
    
    # Check for null values
    print(f"\n🔍 Null Value Analysis:")
    null_counts = []
    for column in df.columns:
        null_count = df.filter(col(column).isNull()).count()
        null_percentage = (null_count / total_rows) * 100
        null_counts.append({
            'Column': column,
            'Null_Count': null_count,
            'Null_Percentage': round(null_percentage, 2)
        })
    
    null_df = pd.DataFrame(null_counts)
    null_df = null_df[null_df['Null_Count'] > 0].sort_values('Null_Percentage', ascending=False)
    
    if len(null_df) > 0:
        print("Columns with null values:")
        display(null_df)
    else:
        print("✅ No null values found!")
    
    # Check for duplicates
    duplicate_count = df.count() - df.distinct().count()
    print(f"\n🔍 Duplicate Analysis:")
    print(f"Duplicate Records: {duplicate_count:,} ({(duplicate_count/total_rows)*100:.2f}%)")
    
    # Data type analysis
    print(f"\n🔍 Data Type Distribution:")
    type_counts = {}
    for field in df.schema.fields:
        data_type = str(field.dataType)
        if data_type in type_counts:
            type_counts[data_type] += 1
        else:
            type_counts[data_type] = 1
    
    for data_type, count in type_counts.items():
        print(f"  • {data_type}: {count} columns")
    
    return {
        'total_rows': total_rows,
        'total_cols': total_cols,
        'null_analysis': null_df.to_dict('records') if len(null_df) > 0 else [],
        'duplicate_count': duplicate_count,
        'data_types': type_counts
    }

# Assess quality for all datasets
quality_reports = {}
for name, df in loaded_datasets.items():
    quality_reports[name] = assess_data_quality(df, name)

In [None]:
# Create interactive data quality dashboard
def create_quality_dashboard():
    """Create interactive data quality visualizations"""
    
    # Prepare data for visualization
    datasets = list(quality_reports.keys())
    row_counts = [quality_reports[ds]['total_rows'] for ds in datasets]
    col_counts = [quality_reports[ds]['total_cols'] for ds in datasets]
    duplicate_counts = [quality_reports[ds]['duplicate_count'] for ds in datasets]
    
    # Create subplots
    fig = make_subplots(
        rows=2, cols=2,
        subplot_titles=('Dataset Sizes', 'Column Counts', 'Duplicate Records', 'Null Value Heatmap'),
        specs=[[{"type": "bar"}, {"type": "bar"}],
               [{"type": "bar"}, {"type": "heatmap"}]]
    )
    
    # Dataset sizes
    fig.add_trace(
        go.Bar(x=datasets, y=row_counts, name="Row Count", marker_color='lightblue'),
        row=1, col=1
    )
    
    # Column counts
    fig.add_trace(
        go.Bar(x=datasets, y=col_counts, name="Column Count", marker_color='lightgreen'),
        row=1, col=2
    )
    
    # Duplicate counts
    fig.add_trace(
        go.Bar(x=datasets, y=duplicate_counts, name="Duplicate Count", marker_color='lightcoral'),
        row=2, col=1
    )
    
    # Null value heatmap (simplified)
    null_matrix = []
    dataset_labels = []
    for ds in datasets:
        null_data = quality_reports[ds]['null_analysis']
        if null_data:
            null_percentages = [item['Null_Percentage'] for item in null_data[:5]]  # Top 5 columns
            null_matrix.append(null_percentages + [0] * (5 - len(null_percentages)))
        else:
            null_matrix.append([0] * 5)
        dataset_labels.append(ds)
    
    if any(any(row) for row in null_matrix):
        fig.add_trace(
            go.Heatmap(z=null_matrix, y=dataset_labels, x=[f'Col_{i+1}' for i in range(5)],
                      colorscale='Reds', showscale=True),
            row=2, col=2
        )
    
    fig.update_layout(height=800, title_text="Data Quality Dashboard", showlegend=False)
    fig.show()

# Display the dashboard
create_quality_dashboard()

## 5. Interactive Transformation Selection <a id="transformations"></a>

Now comes the exciting part! Use the interactive widgets below to select and configure various PySpark transformations. Choose from data cleaning, aggregations, joins, and advanced analytics operations.

In [None]:
# Interactive transformation selection system
class TransformationSelector:
    def __init__(self, datasets, transformations):
        self.datasets = datasets
        self.transformations = transformations
        self.selected_transformations = []
        self.setup_widgets()
    
    def setup_widgets(self):
        """Setup interactive widgets for transformation selection"""
        
        # Dataset selection
        self.dataset_dropdown = widgets.Dropdown(
            options=list(self.datasets.keys()),
            description='Dataset:',
            style={'description_width': 'initial'}
        )
        
        # Transformation category
        self.category_dropdown = widgets.Dropdown(
            options=list(self.transformations.keys()),
            description='Category:',
            style={'description_width': 'initial'}
        )
        
        # Transformation type (updated based on category)
        self.transform_dropdown = widgets.Dropdown(
            options=[],
            description='Transform:',
            style={'description_width': 'initial'}
        )
        
        # Parameter configuration area
        self.param_area = widgets.VBox([])
        
        # Buttons
        self.add_button = widgets.Button(
            description='Add Transformation',
            button_style='success',
            icon='plus'
        )
        
        self.execute_button = widgets.Button(
            description='Execute Pipeline',
            button_style='primary',
            icon='play'
        )
        
        self.clear_button = widgets.Button(
            description='Clear Pipeline',
            button_style='warning',
            icon='trash'
        )
        
        # Output area
        self.output_area = widgets.Output()
        
        # Pipeline display
        self.pipeline_display = widgets.HTML(value="<h4>📋 Current Pipeline: Empty</h4>")
        
        # Setup event handlers
        self.category_dropdown.observe(self.update_transformations, names='value')
        self.transform_dropdown.observe(self.update_parameters, names='value')
        self.add_button.on_click(self.add_transformation)
        self.execute_button.on_click(self.execute_pipeline)
        self.clear_button.on_click(self.clear_pipeline)
    
    def update_transformations(self, change):
        """Update transformation options based on selected category"""
        category = change['new']
        if category and category in self.transformations:
            transforms = self.transformations[category]
            options = [(config['name'], key) for key, config in transforms.items()]
            self.transform_dropdown.options = options
    
    def update_parameters(self, change):
        """Update parameter widgets based on selected transformation"""
        transform_key = change['new']
        category = self.category_dropdown.value
        
        if not transform_key or not category:
            return
        
        transform_config = self.transformations[category][transform_key]
        param_widgets = []
        
        # Create description
        desc_widget = widgets.HTML(f"<b>Description:</b> {transform_config['description']}")
        param_widgets.append(desc_widget)
        
        # Create parameter widgets
        for param_name, param_config in transform_config.get('parameters', {}).items():
            param_type = param_config['type']
            description = param_config.get('description', param_name)
            
            if param_type == 'select':
                # Get column options for current dataset
                dataset_name = self.dataset_dropdown.value
                if dataset_name in self.datasets:
                    options = param_config.get('options', self.datasets[dataset_name].columns)
                else:
                    options = param_config.get('options', [])\n                \n                widget = widgets.Dropdown(\n                    options=options,\n                    description=f'{param_name}:',\n                    style={'description_width': 'initial'}\n                )\n            \n            elif param_type == 'multiselect':\n                dataset_name = self.dataset_dropdown.value\n                if dataset_name in self.datasets:\n                    options = param_config.get('options', self.datasets[dataset_name].columns)\n                else:\n                    options = param_config.get('options', [])\n                \n                widget = widgets.SelectMultiple(\n                    options=options,\n                    description=f'{param_name}:',\n                    style={'description_width': 'initial'}\n                )\n            \n            elif param_type == 'text':\n                widget = widgets.Text(\n                    value=str(param_config.get('default', '')),\n                    description=f'{param_name}:',\n                    style={'description_width': 'initial'}\n                )\n            \n            elif param_type == 'number':\n                widget = widgets.FloatText(\n                    value=float(param_config.get('default', 0)),\n                    description=f'{param_name}:',\n                    style={'description_width': 'initial'}\n                )\n            \n            else:\n                widget = widgets.Text(\n                    description=f'{param_name}:',\n                    style={'description_width': 'initial'}\n                )\n            \n            # Add help text\n            help_widget = widgets.HTML(f\"<small><i>{description}</i></small>\")\n            param_widgets.extend([widget, help_widget])\n            \n            # Store widget reference\n            widget.param_name = param_name\n        \n        self.param_area.children = param_widgets\n    \n    def add_transformation(self, button):\n        \"\"\"Add transformation to pipeline\"\"\"\n        with self.output_area:\n            clear_output(wait=True)\n            \n            # Get current selections\n            dataset = self.dataset_dropdown.value\n            category = self.category_dropdown.value\n            transform_key = self.transform_dropdown.value\n            \n            if not all([dataset, category, transform_key]):\n                print(\"❌ Please select dataset, category, and transformation\")\n                return\n            \n            # Get parameters\n            parameters = {}\n            for widget in self.param_area.children:\n                if hasattr(widget, 'param_name'):\n                    param_name = widget.param_name\n                    if hasattr(widget, 'value'):\n                        parameters[param_name] = widget.value\n            \n            # Add to pipeline\n            transform_info = {\n                'dataset': dataset,\n                'category': category,\n                'transform_key': transform_key,\n                'transform_name': self.transformations[category][transform_key]['name'],\n                'parameters': parameters\n            }\n            \n            self.selected_transformations.append(transform_info)\n            \n            print(f\"✅ Added: {transform_info['transform_name']} on {dataset}\")\n            self.update_pipeline_display()\n    \n    def update_pipeline_display(self):\n        \"\"\"Update the pipeline display\"\"\"\n        if not self.selected_transformations:\n            self.pipeline_display.value = \"<h4>📋 Current Pipeline: Empty</h4>\"\n            return\n        \n        html = \"<h4>📋 Current Pipeline:</h4><ol>\"\n        for i, transform in enumerate(self.selected_transformations):\n            html += f\"<li><b>{transform['transform_name']}</b> on <i>{transform['dataset']}</i></li>\"\n        html += \"</ol>\"\n        \n        self.pipeline_display.value = html\n    \n    def execute_pipeline(self, button):\n        \"\"\"Execute the transformation pipeline\"\"\"\n        with self.output_area:\n            clear_output(wait=True)\n            \n            if not self.selected_transformations:\n                print(\"❌ No transformations in pipeline\")\n                return\n            \n            print(\"🚀 Executing transformation pipeline...\")\n            \n            # This will be implemented in the next section\n            print(\"✅ Pipeline configuration ready for execution!\")\n    \n    def clear_pipeline(self, button):\n        \"\"\"Clear the transformation pipeline\"\"\"\n        self.selected_transformations = []\n        self.update_pipeline_display()\n        with self.output_area:\n            clear_output(wait=True)\n            print(\"🗑️ Pipeline cleared\")\n    \n    def display(self):\n        \"\"\"Display the transformation selector interface\"\"\"\n        \n        # Layout the interface\n        selection_box = widgets.VBox([\n            widgets.HTML(\"<h3>🔧 Transformation Configuration</h3>\"),\n            self.dataset_dropdown,\n            self.category_dropdown,\n            self.transform_dropdown,\n            widgets.HTML(\"<h4>Parameters:</h4>\"),\n            self.param_area,\n            widgets.HBox([self.add_button, self.execute_button, self.clear_button])\n        ])\n        \n        pipeline_box = widgets.VBox([\n            self.pipeline_display,\n            self.output_area\n        ])\n        \n        main_interface = widgets.HBox([selection_box, pipeline_box])\n        \n        display(main_interface)\n\n# Initialize transformation selector\ntry:\n    selector = TransformationSelector(loaded_datasets, AVAILABLE_TRANSFORMATIONS)\n    selector.display()\nexcept NameError:\n    print(\"⚠️ Transformation definitions not available. Please run the setup cells first.\")

## 6. Execute Selected Transformations Pipeline <a id="execute"></a>

Now let's implement the pipeline execution engine that will run the selected transformations and track the complete lineage.

In [None]:
# Pipeline execution engine
class PipelineExecutor:
    def __init__(self, datasets, transformations, engine=None):
        self.datasets = datasets.copy()  # Working copy of datasets
        self.transformations = transformations
        self.engine = engine
        self.execution_results = []
        self.lineage_data = None
    
    def execute_pipeline(self, selected_transformations):
        \"\"\"Execute the complete transformation pipeline\"\"\"\n        print(\"🚀 Starting pipeline execution...\")\n        print(\"=\" * 50)\n        \n        for i, transform_config in enumerate(selected_transformations):\n            print(f\"\\n📊 Step {i+1}: {transform_config['transform_name']}\")\n            print(f\"📁 Dataset: {transform_config['dataset']}\")\n            print(f\"🔧 Parameters: {transform_config['parameters']}\")\n            \n            try:\n                result = self.execute_single_transformation(transform_config)\n                self.execution_results.append(result)\n                print(f\"✅ Completed: {result['output_name']} ({result['output_rows']:,} rows)\")\n                \n            except Exception as e:\n                print(f\"❌ Failed: {str(e)}\")\n                break\n        \n        print(f\"\\n🎉 Pipeline execution completed!\")\n        print(f\"📊 Generated {len(self.execution_results)} new datasets\")\n        \n        # Update lineage\n        if self.engine:\n            self.lineage_data = self.engine.get_lineage_info()\n        \n        return self.execution_results\n    \n    def execute_single_transformation(self, transform_config):\n        \"\"\"Execute a single transformation\"\"\"\n        \n        # Get the transformation class\n        category = transform_config['category']\n        transform_key = transform_config['transform_key']\n        transform_class = self.transformations[category][transform_key]['class']\n        \n        # Get input dataset\n        input_dataset = transform_config['dataset']\n        input_df = self.datasets[input_dataset]\n        \n        # Create transformation instance\n        if self.engine:\n            transformation = transform_class(self.engine.lineage_tracker)\n        else:\n            transformation = transform_class()\n        \n        # Execute transformation\n        parameters = transform_config['parameters']\n        \n        # Handle special cases for different transformation types\n        if transform_key in ['join', 'union', 'lookup']:\n            # These require multiple datasets - simplified for demo\n            output_df = input_df  # Placeholder\n        else:\n            if self.engine:\n                output_df = self.engine.execute_transformation(transformation, input_df, **parameters)\n            else:\n                output_df = transformation.transform(input_df, **parameters)\n        \n        # Generate output name\n        output_name = f\"{input_dataset}_{transform_key}_{len(self.execution_results)+1}\"\n        \n        # Store result\n        self.datasets[output_name] = output_df\n        \n        return {\n            'step': len(self.execution_results) + 1,\n            'transformation': transform_config['transform_name'],\n            'input_dataset': input_dataset,\n            'output_name': output_name,\n            'output_rows': output_df.count(),\n            'output_cols': len(output_df.columns),\n            'parameters': parameters\n        }\n    \n    def show_results_summary(self):\n        \"\"\"Display execution results summary\"\"\"\n        if not self.execution_results:\n            print(\"No execution results available\")\n            return\n        \n        # Create summary DataFrame\n        summary_data = []\n        for result in self.execution_results:\n            summary_data.append({\n                'Step': result['step'],\n                'Transformation': result['transformation'],\n                'Input': result['input_dataset'],\n                'Output': result['output_name'],\n                'Rows': f\"{result['output_rows']:,}\",\n                'Columns': result['output_cols']\n            })\n        \n        summary_df = pd.DataFrame(summary_data)\n        display(summary_df)\n        \n        # Show sample of final dataset\n        if self.execution_results:\n            final_result = self.execution_results[-1]\n            final_dataset = final_result['output_name']\n            final_df = self.datasets[final_dataset]\n            \n            print(f\"\\n👀 Sample from final dataset '{final_dataset}':\")\n            display(final_df.limit(5).toPandas())\n\n# Create pipeline executor\nexecutor = PipelineExecutor(loaded_datasets, AVAILABLE_TRANSFORMATIONS, engine)\n\n# Demo: Execute some sample transformations\ndef run_sample_pipeline():\n    \"\"\"Run a sample transformation pipeline\"\"\"\n    \n    print(\"🎯 Running Sample Pipeline\")\n    \n    # Sample transformations\n    sample_transformations = [\n        {\n            'dataset': 'customers',\n            'category': 'Data Cleaning',\n            'transform_key': 'remove_nulls',\n            'transform_name': 'Remove Null Values',\n            'parameters': {'columns': ['email'], 'how': 'any'}\n        },\n        {\n            'dataset': 'customers_remove_nulls_1',\n            'category': 'Data Cleaning', \n            'transform_key': 'standardize_text',\n            'transform_name': 'Standardize Text',\n            'parameters': {'columns': ['first_name', 'last_name'], 'operations': ['lower', 'trim']}\n        },\n        {\n            'dataset': 'customers_standardize_text_2',\n            'category': 'Aggregations',\n            'transform_key': 'group_by',\n            'transform_name': 'Group By Aggregation', \n            'parameters': {\n                'group_by_columns': ['customer_tier', 'state'],\n                'aggregations': {'age': 'avg', 'customer_id': 'count'}\n            }\n        }\n    ]\n    \n    # Execute pipeline\n    results = executor.execute_pipeline(sample_transformations)\n    \n    # Show results\n    executor.show_results_summary()\n    \n    return results\n\n# Run sample pipeline\nsample_results = run_sample_pipeline()

## 7. Capture Data Lineage Metadata <a id="lineage"></a>

Let's capture and analyze the complete data lineage metadata from our transformation pipeline.

In [None]:
# Lineage metadata analysis
def analyze_lineage_metadata():
    \"\"\"Analyze and display lineage metadata\"\"\"\n    \n    if executor.lineage_data is None:\n        print(\"❌ No lineage data available. Run some transformations first.\")\n        return\n    \n    lineage = executor.lineage_data\n    \n    print(\"📊 LINEAGE METADATA ANALYSIS\")\n    print(\"=\" * 50)\n    \n    # Basic statistics\n    print(f\"\\n📈 Overview:\")\n    print(f\"  • Data Sources: {len(lineage.get('data_sources', []))}\")\n    print(f\"  • Data Sinks: {len(lineage.get('data_sinks', []))}\")\n    print(f\"  • Transformations: {len(lineage.get('transformations', []))}\")\n    print(f\"  • Dependencies: {len(lineage.get('dependencies', []))}\")\n    \n    # Data sources detail\n    print(f\"\\n📁 Data Sources:\")\n    for source in lineage.get('data_sources', []):\n        print(f\"  • {source['name']} ({source['type']}) - {source['location']}\")\n    \n    # Transformations detail\n    print(f\"\\n🔧 Transformations:\")\n    for i, transform in enumerate(lineage.get('transformations', []), 1):\n        print(f\"  {i}. {transform['name']}\")\n        print(f\"     Input: {', '.join(transform['input_tables'])}\")\n        print(f\"     Output: {transform['output_table']}\")\n        print(f\"     Logic: {transform['transformation_logic'][:100]}...\")\n        \n        # Show metadata if available\n        if 'metadata' in transform and transform['metadata']:\n            metadata = transform['metadata']\n            if 'input_rows' in metadata:\n                print(f\"     Rows: {metadata.get('input_rows', 'N/A')} → {metadata.get('output_rows', 'N/A')}\")\n        print()\n    \n    # Dependencies analysis\n    print(f\"\\n🔗 Data Dependencies:\")\n    dependency_graph = {}\n    for dep in lineage.get('dependencies', []):\n        source = dep['source']\n        target = dep['target']\n        if source not in dependency_graph:\n            dependency_graph[source] = []\n        dependency_graph[source].append(target)\n    \n    for source, targets in dependency_graph.items():\n        print(f\"  • {source} → {', '.join(targets)}\")\n    \n    return lineage\n\n# Schema evolution tracking\ndef track_schema_evolution():\n    \"\"\"Track how schemas evolve through transformations\"\"\"\n    \n    print(\"\\n🔄 SCHEMA EVOLUTION TRACKING\")\n    print(\"=\" * 50)\n    \n    for result in executor.execution_results:\n        input_name = result['input_dataset']\n        output_name = result['output_name']\n        \n        if input_name in executor.datasets and output_name in executor.datasets:\n            input_df = executor.datasets[input_name]\n            output_df = executor.datasets[output_name]\n            \n            print(f\"\\n📊 {result['transformation']}:\")\n            print(f\"  Input ({input_name}): {len(input_df.columns)} columns\")\n            print(f\"  Output ({output_name}): {len(output_df.columns)} columns\")\n            \n            # Schema comparison\n            input_cols = set(input_df.columns)\n            output_cols = set(output_df.columns)\n            \n            added_cols = output_cols - input_cols\n            removed_cols = input_cols - output_cols\n            common_cols = input_cols & output_cols\n            \n            if added_cols:\n                print(f\"  ➕ Added columns: {', '.join(added_cols)}\")\n            if removed_cols:\n                print(f\"  ➖ Removed columns: {', '.join(removed_cols)}\")\n            if common_cols:\n                print(f\"  🔄 Preserved columns: {len(common_cols)}\")\n\n# Data quality impact analysis\ndef analyze_quality_impact():\n    \"\"\"Analyze how transformations impact data quality\"\"\"\n    \n    print(\"\\n🎯 DATA QUALITY IMPACT ANALYSIS\")\n    print(\"=\" * 50)\n    \n    for result in executor.execution_results:\n        input_name = result['input_dataset']\n        output_name = result['output_name']\n        \n        if input_name in executor.datasets and output_name in executor.datasets:\n            input_df = executor.datasets[input_name]\n            output_df = executor.datasets[output_name]\n            \n            input_count = input_df.count()\n            output_count = output_df.count()\n            \n            print(f\"\\n📊 {result['transformation']}:\")\n            print(f\"  Record Impact: {input_count:,} → {output_count:,}\")\n            \n            if input_count > 0:\n                change_pct = ((output_count - input_count) / input_count) * 100\n                if change_pct > 0:\n                    print(f\"  📈 +{change_pct:.1f}% increase in records\")\n                elif change_pct < 0:\n                    print(f\"  📉 {abs(change_pct):.1f}% decrease in records\")\n                else:\n                    print(f\"  ➡️ No change in record count\")\n\n# Performance metrics\ndef analyze_performance_metrics():\n    \"\"\"Analyze transformation performance metrics\"\"\"\n    \n    print(\"\\n⚡ PERFORMANCE METRICS\")\n    print(\"=\" * 50)\n    \n    total_transformations = len(executor.execution_results)\n    total_input_rows = 0\n    total_output_rows = 0\n    \n    for result in executor.execution_results:\n        input_name = result['input_dataset']\n        if input_name in executor.datasets:\n            input_rows = executor.datasets[input_name].count()\n            total_input_rows += input_rows\n        \n        total_output_rows += result['output_rows']\n    \n    print(f\"📊 Pipeline Summary:\")\n    print(f\"  • Total Transformations: {total_transformations}\")\n    print(f\"  • Total Input Rows Processed: {total_input_rows:,}\")\n    print(f\"  • Total Output Rows Generated: {total_output_rows:,}\")\n    \n    if total_input_rows > 0:\n        efficiency = (total_output_rows / total_input_rows) * 100\n        print(f\"  • Data Processing Efficiency: {efficiency:.1f}%\")\n    \n    # Dataset size progression\n    print(f\"\\n📈 Dataset Size Progression:\")\n    for result in executor.execution_results:\n        input_name = result['input_dataset']\n        output_name = result['output_name']\n        \n        if input_name in executor.datasets:\n            input_size = executor.datasets[input_name].count()\n            output_size = result['output_rows']\n            print(f\"  {result['step']}. {input_name} ({input_size:,}) → {output_name} ({output_size:,})\")\n\n# Run all analyses\nif executor.execution_results:\n    lineage_metadata = analyze_lineage_metadata()\n    track_schema_evolution()\n    analyze_quality_impact()\n    analyze_performance_metrics()\nelse:\n    print(\"⚠️ No execution results available. Run the sample pipeline first.\")

## 8. Create Interactive Lineage Visualization <a id="visualization"></a>

Now for the exciting part - let's create beautiful, interactive visualizations of our data lineage using network graphs and flowcharts!

In [None]:
# Interactive lineage visualization system\ndef create_lineage_flowchart():\n    \"\"\"Create beautiful flowchart visualization of data lineage\"\"\"\n    \n    if executor.lineage_data is None:\n        print(\"❌ No lineage data available\")\n        return\n    \n    try:\n        # Use our custom visualizer\n        visualizer = create_lineage_visualization(executor.lineage_data)\n        \n        print(\"🎨 Creating Interactive Lineage Visualizations\")\n        print(\"=\" * 50)\n        \n        # 1. Main lineage graph\n        print(\"\\n📊 1. Main Data Lineage Graph\")\n        main_fig = visualizer.create_interactive_graph(layout='spring')\n        main_fig.update_layout(\n            title=\"🔄 Data Transformation Pipeline Lineage\",\n            height=600,\n            showlegend=True\n        )\n        main_fig.show()\n        \n        # 2. Dependency matrix\n        print(\"\\n📈 2. Dependency Matrix Heatmap\")\n        matrix_fig = visualizer.create_dependency_matrix()\n        matrix_fig.update_layout(\n            title=\"🔗 Data Dependency Matrix\",\n            height=500\n        )\n        matrix_fig.show()\n        \n        # 3. Transformation summary\n        print(\"\\n📋 3. Transformation Types Summary\")\n        summary_fig = visualizer.create_transformation_summary()\n        summary_fig.update_layout(\n            title=\"🔧 Transformation Types Distribution\",\n            height=400\n        )\n        summary_fig.show()\n        \n        # 4. Pipeline statistics\n        stats = visualizer.get_lineage_statistics()\n        print(\"\\n📊 4. Pipeline Statistics\")\n        \n        # Create stats visualization\n        fig_stats = make_subplots(\n            rows=1, cols=3,\n            subplot_titles=('Pipeline Overview', 'Node Types', 'Transformation Types'),\n            specs=[[{\"type\": \"pie\"}, {\"type\": \"bar\"}, {\"type\": \"pie\"}]]\n        )\n        \n        # Pipeline overview pie chart\n        overview_labels = ['Data Sources', 'Transformations', 'Data Sinks']\n        overview_values = [stats['data_sources'], stats['transformations'], stats['data_sinks']]\n        overview_colors = ['lightblue', 'lightgreen', 'lightcoral']\n        \n        fig_stats.add_trace(\n            go.Pie(labels=overview_labels, values=overview_values, \n                  marker_colors=overview_colors, showlegend=False),\n            row=1, col=1\n        )\n        \n        # Node types bar chart\n        node_types = ['Sources', 'Transforms', 'Sinks']\n        node_counts = [stats['data_sources'], stats['transformations'], stats['data_sinks']]\n        \n        fig_stats.add_trace(\n            go.Bar(x=node_types, y=node_counts, \n                  marker_color=['lightblue', 'lightgreen', 'lightcoral'],\n                  showlegend=False),\n            row=1, col=2\n        )\n        \n        # Transformation types pie chart\n        if stats['transformation_types']:\n            transform_labels = list(stats['transformation_types'].keys())\n            transform_values = list(stats['transformation_types'].values())\n            \n            fig_stats.add_trace(\n                go.Pie(labels=transform_labels, values=transform_values, showlegend=False),\n                row=1, col=3\n            )\n        \n        fig_stats.update_layout(\n            title=\"📈 Comprehensive Pipeline Statistics\",\n            height=500\n        )\n        fig_stats.show()\n        \n        return visualizer\n        \n    except Exception as e:\n        print(f\"❌ Visualization error: {e}\")\n        return None\n\ndef create_custom_network_graph():\n    \"\"\"Create custom network graph with enhanced styling\"\"\"\n    \n    if not executor.execution_results:\n        print(\"❌ No execution results available\")\n        return\n    \n    # Build custom graph\n    G = nx.DiGraph()\n    \n    # Add nodes for each dataset\n    all_datasets = set()\n    for result in executor.execution_results:\n        all_datasets.add(result['input_dataset'])\n        all_datasets.add(result['output_name'])\n    \n    # Add original datasets as source nodes\n    for dataset in loaded_datasets.keys():\n        if dataset in all_datasets:\n            G.add_node(dataset, node_type='source', color='lightblue', size=30)\n    \n    # Add transformation results as transformation nodes\n    for result in executor.execution_results:\n        output_name = result['output_name']\n        G.add_node(output_name, \n                  node_type='transformation', \n                  color='lightgreen',\n                  size=25,\n                  transformation=result['transformation'])\n    \n    # Add edges\n    for result in executor.execution_results:\n        G.add_edge(result['input_dataset'], result['output_name'],\n                  transformation=result['transformation'])\n    \n    # Create visualization\n    pos = nx.spring_layout(G, k=3, iterations=50)\n    \n    # Extract coordinates and properties\n    node_x = [pos[node][0] for node in G.nodes()]\n    node_y = [pos[node][1] for node in G.nodes()]\n    node_colors = [G.nodes[node].get('color', 'lightgray') for node in G.nodes()]\n    node_sizes = [G.nodes[node].get('size', 20) for node in G.nodes()]\n    \n    # Create hover text\n    node_text = []\n    for node in G.nodes():\n        node_data = G.nodes[node]\n        if node_data.get('node_type') == 'source':\n            text = f\"<b>Source Dataset</b><br>{node}<br>Original Data\"\n        else:\n            transform = node_data.get('transformation', 'Unknown')\n            text = f\"<b>Transformation Result</b><br>{node}<br>Type: {transform}\"\n        node_text.append(text)\n    \n    # Extract edge coordinates\n    edge_x = []\n    edge_y = []\n    edge_info = []\n    \n    for edge in G.edges():\n        x0, y0 = pos[edge[0]]\n        x1, y1 = pos[edge[1]]\n        edge_x.extend([x0, x1, None])\n        edge_y.extend([y0, y1, None])\n        edge_info.append(G.edges[edge].get('transformation', ''))\n    \n    # Create the plot\n    fig = go.Figure()\n    \n    # Add edges\n    fig.add_trace(go.Scatter(\n        x=edge_x, y=edge_y,\n        mode='lines',\n        line=dict(width=3, color='rgba(125,125,125,0.8)'),\n        hoverinfo='none',\n        showlegend=False\n    ))\n    \n    # Add nodes\n    fig.add_trace(go.Scatter(\n        x=node_x, y=node_y,\n        mode='markers+text',\n        marker=dict(\n            size=node_sizes,\n            color=node_colors,\n            line=dict(width=2, color='black')\n        ),\n        text=[node for node in G.nodes()],\n        textposition=\"middle center\",\n        hovertext=node_text,\n        hoverinfo='text',\n        showlegend=False\n    ))\n    \n    # Update layout\n    fig.update_layout(\n        title=\"🎯 Custom Pipeline Flow Network\",\n        showlegend=False,\n        hovermode='closest',\n        margin=dict(b=20,l=5,r=5,t=40),\n        annotations=[\n            dict(\n                text=\"Interactive Pipeline Flow - Hover for details\",\n                showarrow=False,\n                xref=\"paper\", yref=\"paper\",\n                x=0.005, y=-0.002,\n                xanchor=\"left\", yanchor=\"bottom\",\n                font=dict(color=\"gray\", size=12)\n            )\n        ],\n        xaxis=dict(showgrid=False, zeroline=False, showticklabels=False),\n        yaxis=dict(showgrid=False, zeroline=False, showticklabels=False),\n        height=600\n    )\n    \n    fig.show()\n    return fig\n\ndef create_sankey_diagram():\n    \"\"\"Create Sankey diagram for data flow visualization\"\"\"\n    \n    if not executor.execution_results:\n        print(\"❌ No execution results available\")\n        return\n    \n    # Prepare data for Sankey diagram\n    nodes = []\n    links = []\n    node_indices = {}\n    \n    # Collect all unique datasets\n    all_datasets = set()\n    for result in executor.execution_results:\n        all_datasets.add(result['input_dataset'])\n        all_datasets.add(result['output_name'])\n    \n    # Create nodes\n    for i, dataset in enumerate(sorted(all_datasets)):\n        nodes.append(dataset)\n        node_indices[dataset] = i\n    \n    # Create links\n    for result in executor.execution_results:\n        source_idx = node_indices[result['input_dataset']]\n        target_idx = node_indices[result['output_name']]\n        value = result['output_rows']\n        \n        links.append({\n            'source': source_idx,\n            'target': target_idx,\n            'value': value,\n            'label': result['transformation']\n        })\n    \n    # Create Sankey diagram\n    fig = go.Figure(data=[go.Sankey(\n        node=dict(\n            pad=15,\n            thickness=20,\n            line=dict(color=\"black\", width=0.5),\n            label=nodes,\n            color=\"lightblue\"\n        ),\n        link=dict(\n            source=[link['source'] for link in links],\n            target=[link['target'] for link in links],\n            value=[link['value'] for link in links],\n            label=[link['label'] for link in links]\n        )\n    )])\n    \n    fig.update_layout(\n        title=\"🌊 Data Flow Sankey Diagram\",\n        font_size=12,\n        height=500\n    )\n    \n    fig.show()\n    return fig\n\n# Create all visualizations\nprint(\"🎨 CREATING INTERACTIVE LINEAGE VISUALIZATIONS\")\nprint(\"=\" * 60)\n\nif executor.execution_results:\n    # Create main lineage visualization\n    visualizer = create_lineage_flowchart()\n    \n    # Create custom network graph\n    print(\"\\n🔗 Creating Custom Network Graph...\")\n    network_fig = create_custom_network_graph()\n    \n    # Create Sankey diagram\n    print(\"\\n🌊 Creating Sankey Flow Diagram...\")\n    sankey_fig = create_sankey_diagram()\n    \n    print(\"\\n✅ All visualizations created successfully!\")\nelse:\n    print(\"⚠️ No execution results available. Please run the sample pipeline first.\")

## 9. Generate Airflow DAG Code <a id="airflow"></a>

Let's automatically generate production-ready Airflow DAG code based on our executed pipeline for deployment in production environments.

In [None]:
# Airflow DAG code generator\ndef generate_airflow_dag():\n    \"\"\"Generate production-ready Airflow DAG code\"\"\"\n    \n    if not executor.execution_results:\n        print(\"❌ No execution results available\")\n        return None\n    \n    dag_code = '''from datetime import datetime, timedelta\nfrom airflow import DAG\nfrom airflow.operators.python import PythonOperator\nimport sys\nimport os\n\n# Add project root to Python path\nproject_root = os.path.dirname(os.path.dirname(__file__))\nsys.path.append(project_root)\n\n# Import transformation modules\nfrom src.transformations import SparkTransformationEngine\n\n# Default arguments\ndefault_args = {\n    'owner': 'data_team',\n    'depends_on_past': False,\n    'start_date': datetime(2024, 1, 1),\n    'email_on_failure': False,\n    'email_on_retry': False,\n    'retries': 1,\n    'retry_delay': timedelta(minutes=5),\n}\n\n# Create DAG\ndag = DAG(\n    'generated_lineage_pipeline',\n    default_args=default_args,\n    description='Auto-generated data transformation pipeline with lineage',\n    schedule_interval=timedelta(days=1),\n    catchup=False,\n    tags=['data_engineering', 'lineage', 'auto_generated'],\n)\n\n'''\n    \n    # Generate task functions\n    task_functions = []\n    tasks = []\n    \n    for i, result in enumerate(executor.execution_results):\n        task_name = f\"transform_{i+1}_{result['transformation'].lower().replace(' ', '_')}\"\n        \n        # Generate function code\n        function_code = f'''\ndef {task_name}(**context):\n    \"\"\"Execute {result['transformation']} transformation\"\"\"\n    \n    engine = SparkTransformationEngine(\"AirflowPipeline_{task_name}\")\n    \n    try:\n        # Load input data\n        input_df = engine.load_data(\n            \"path/to/{result['input_dataset']}.csv\", \n            \"{result['input_dataset']}\", \n            \"csv\"\n        )\n        \n        # Apply transformation\n        # Note: Customize transformation parameters based on your requirements\n        # Parameters used: {result['parameters']}\n        \n        # TODO: Implement specific transformation logic here\n        result_df = input_df  # Placeholder\n        \n        # Save result\n        engine.save_data(\n            result_df, \n            \"path/to/{result['output_name']}\", \n            \"{result['output_name']}\", \n            \"parquet\"\n        )\n        \n        # Export lineage\n        engine.export_lineage(f\"lineage/{task_name}_lineage.json\")\n        \n        return \"Task completed successfully\"\n    \n    finally:\n        engine.stop()\n\n'''\n        \n        task_functions.append(function_code)\n        \n        # Generate task definition\n        task_def = f'''{task_name}_task = PythonOperator(\n    task_id='{task_name}',\n    python_callable={task_name},\n    dag=dag,\n)\n'''\n        tasks.append((task_name, task_def))\n    \n    # Add all function definitions\n    for func in task_functions:\n        dag_code += func\n    \n    # Add task definitions\n    dag_code += \"\\n# Task definitions\\n\"\n    for task_name, task_def in tasks:\n        dag_code += task_def\n    \n    # Add dependencies\n    dag_code += \"\\n# Task dependencies\\n\"\n    if len(tasks) > 1:\n        for i in range(len(tasks) - 1):\n            current_task = tasks[i][0]\n            next_task = tasks[i + 1][0]\n            dag_code += f\"{current_task}_task >> {next_task}_task\\n\"\n    \n    return dag_code\n\ndef generate_production_config():\n    \"\"\"Generate production configuration files\"\"\"\n    \n    config = {\n        \"pipeline\": {\n            \"name\": \"generated_lineage_pipeline\",\n            \"version\": \"1.0.0\",\n            \"generated_date\": datetime.now().isoformat(),\n            \"transformations\": len(executor.execution_results)\n        },\n        \"transformations\": [],\n        \"lineage\": {\n            \"tracking_enabled\": True,\n            \"export_format\": \"json\",\n            \"visualization_enabled\": True\n        },\n        \"data_sources\": {},\n        \"data_sinks\": {},\n        \"monitoring\": {\n            \"alerts_enabled\": True,\n            \"metrics_collection\": True,\n            \"lineage_validation\": True\n        }\n    }\n    \n    # Add transformation details\n    for result in executor.execution_results:\n        transform_config = {\n            \"name\": result['transformation'],\n            \"input_dataset\": result['input_dataset'],\n            \"output_dataset\": result['output_name'],\n            \"parameters\": result['parameters'],\n            \"expected_output_rows\": result['output_rows'],\n            \"expected_output_cols\": result['output_cols']\n        }\n        config[\"transformations\"].append(transform_config)\n    \n    # Add data source configurations\n    for dataset_name in loaded_datasets.keys():\n        config[\"data_sources\"][dataset_name] = {\n            \"type\": \"csv\",\n            \"location\": f\"data/{dataset_name}.csv\",\n            \"format_options\": {\n                \"header\": True,\n                \"inferSchema\": True\n            }\n        }\n    \n    return config\n\ndef create_deployment_package():\n    \"\"\"Create a complete deployment package\"\"\"\n    \n    package = {\n        \"dag_code\": generate_airflow_dag(),\n        \"config\": generate_production_config(),\n        \"lineage_metadata\": executor.lineage_data,\n        \"execution_results\": executor.execution_results,\n        \"deployment_instructions\": {\n            \"steps\": [\n                \"1. Install required dependencies from requirements.txt\",\n                \"2. Copy DAG file to Airflow dags folder\",\n                \"3. Update data source paths in configuration\",\n                \"4. Configure Airflow connections and variables\",\n                \"5. Enable the DAG in Airflow UI\",\n                \"6. Monitor execution and lineage tracking\"\n            ],\n            \"requirements\": [\n                \"apache-airflow\",\n                \"pyspark\",\n                \"pandas\",\n                \"plotly\",\n                \"networkx\"\n            ]\n        }\n    }\n    \n    return package\n\n# Generate the DAG and deployment package\nprint(\"🚀 GENERATING AIRFLOW DAG CODE\")\nprint(\"=\" * 50)\n\nif executor.execution_results:\n    # Generate DAG code\n    dag_code = generate_airflow_dag()\n    \n    print(\"✅ Airflow DAG code generated!\")\n    print(\"\\n📋 Generated DAG Preview:\")\n    print(\"=\" * 30)\n    # Show first 50 lines\n    dag_lines = dag_code.split('\\n')\n    for i, line in enumerate(dag_lines[:50]):\n        print(f\"{i+1:2d}: {line}\")\n    \n    if len(dag_lines) > 50:\n        print(f\"... ({len(dag_lines) - 50} more lines)\")\n    \n    # Generate complete deployment package\n    print(\"\\n📦 Generating deployment package...\")\n    deployment_package = create_deployment_package()\n    \n    print(\"✅ Deployment package created!\")\n    print(f\"\\n📊 Package Contents:\")\n    print(f\"  • DAG Code: {len(dag_code)} characters\")\n    print(f\"  • Configuration: {len(deployment_package['config']['transformations'])} transformations\")\n    print(f\"  • Lineage Metadata: {len(deployment_package['lineage_metadata']['transformations'])} tracked transformations\")\n    print(f\"  • Execution Results: {len(deployment_package['execution_results'])} results\")\n    \n    # Save to files (optional)\n    save_to_files = True  # Change to True to save files\n    \n    if save_to_files:\n        output_dir = os.path.join(project_root, 'generated_output')\n        os.makedirs(output_dir, exist_ok=True)\n        \n        # Save DAG file\n        dag_file = os.path.join(output_dir, 'generated_lineage_dag.py')\n        with open(dag_file, 'w') as f:\n            f.write(dag_code)\n        \n        # Save configuration\n        config_file = os.path.join(output_dir, 'pipeline_config.json')\n        with open(config_file, 'w') as f:\n            json.dump(deployment_package['config'], f, indent=2)\n        \n        # Save deployment package\n        package_file = os.path.join(output_dir, 'deployment_package.json')\n        with open(package_file, 'w') as f:\n            json.dump(deployment_package, f, indent=2, default=str)\n        \n        print(f\"\\n💾 Files saved to: {output_dir}\")\n        print(f\"  • {dag_file}\")\n        print(f\"  • {config_file}\")\n        print(f\"  • {package_file}\")\n    \nelse:\n    print(\"⚠️ No execution results available. Please run the sample pipeline first.\")

## 10. Export Pipeline Results <a id="export"></a>

Let's export all our pipeline results, lineage data, and visualizations for future use and analysis.

In [None]:
# Comprehensive export system\ndef export_all_results():\n    \"\"\"Export all pipeline results and artifacts\"\"\"\n    \n    print(\"📦 EXPORTING ALL PIPELINE RESULTS\")\n    print(\"=\" * 50)\n    \n    # Create output directory\n    timestamp = datetime.now().strftime('%Y%m%d_%H%M%S')\n    export_dir = os.path.join(project_root, 'exports', f'pipeline_export_{timestamp}')\n    os.makedirs(export_dir, exist_ok=True)\n    \n    exported_files = []\n    \n    # 1. Export transformed datasets\n    print(\"\\n📊 1. Exporting transformed datasets...\")\n    datasets_dir = os.path.join(export_dir, 'datasets')\n    os.makedirs(datasets_dir, exist_ok=True)\n    \n    for dataset_name, df in executor.datasets.items():\n        try:\n            # Export as Parquet for efficiency\n            parquet_path = os.path.join(datasets_dir, f'{dataset_name}.parquet')\n            df.write.mode('overwrite').parquet(parquet_path)\n            \n            # Also export sample as CSV for easy viewing\n            sample_path = os.path.join(datasets_dir, f'{dataset_name}_sample.csv')\n            df.limit(100).toPandas().to_csv(sample_path, index=False)\n            \n            exported_files.extend([parquet_path, sample_path])\n            print(f\"  ✅ {dataset_name}: {df.count():,} rows exported\")\n            \n        except Exception as e:\n            print(f\"  ❌ Failed to export {dataset_name}: {e}\")\n    \n    # 2. Export lineage metadata\n    print(\"\\n🔗 2. Exporting lineage metadata...\")\n    lineage_dir = os.path.join(export_dir, 'lineage')\n    os.makedirs(lineage_dir, exist_ok=True)\n    \n    if executor.lineage_data:\n        lineage_file = os.path.join(lineage_dir, 'complete_lineage.json')\n        with open(lineage_file, 'w') as f:\n            json.dump(executor.lineage_data, f, indent=2, default=str)\n        exported_files.append(lineage_file)\n        print(f\"  ✅ Complete lineage metadata exported\")\n    \n    # 3. Export execution results\n    print(\"\\n📋 3. Exporting execution results...\")\n    if executor.execution_results:\n        results_file = os.path.join(export_dir, 'execution_results.json')\n        with open(results_file, 'w') as f:\n            json.dump(executor.execution_results, f, indent=2, default=str)\n        exported_files.append(results_file)\n        \n        # Create execution summary\n        summary_data = []\n        for result in executor.execution_results:\n            summary_data.append({\n                'Step': result['step'],\n                'Transformation': result['transformation'],\n                'Input_Dataset': result['input_dataset'],\n                'Output_Dataset': result['output_name'],\n                'Input_Rows': executor.datasets[result['input_dataset']].count() if result['input_dataset'] in executor.datasets else 'N/A',\n                'Output_Rows': result['output_rows'],\n                'Output_Columns': result['output_cols'],\n                'Parameters': str(result['parameters'])\n            })\n        \n        summary_df = pd.DataFrame(summary_data)\n        summary_file = os.path.join(export_dir, 'execution_summary.csv')\n        summary_df.to_csv(summary_file, index=False)\n        exported_files.append(summary_file)\n        \n        print(f\"  ✅ Execution results and summary exported\")\n    \n    # 4. Export data quality reports\n    print(\"\\n🎯 4. Exporting data quality reports...\")\n    quality_dir = os.path.join(export_dir, 'quality_reports')\n    os.makedirs(quality_dir, exist_ok=True)\n    \n    if quality_reports:\n        quality_file = os.path.join(quality_dir, 'quality_assessment.json')\n        with open(quality_file, 'w') as f:\n            json.dump(quality_reports, f, indent=2, default=str)\n        exported_files.append(quality_file)\n        print(f\"  ✅ Data quality reports exported\")\n    \n    # 5. Export visualizations (as HTML)\n    print(\"\\n📈 5. Exporting visualizations...\")\n    viz_dir = os.path.join(export_dir, 'visualizations')\n    os.makedirs(viz_dir, exist_ok=True)\n    \n    try:\n        if 'visualizer' in globals() and visualizer:\n            # Main lineage graph\n            main_fig = visualizer.create_interactive_graph()\n            main_fig.write_html(os.path.join(viz_dir, 'lineage_graph.html'))\n            \n            # Dependency matrix\n            matrix_fig = visualizer.create_dependency_matrix()\n            matrix_fig.write_html(os.path.join(viz_dir, 'dependency_matrix.html'))\n            \n            # Transformation summary\n            summary_fig = visualizer.create_transformation_summary()\n            summary_fig.write_html(os.path.join(viz_dir, 'transformation_summary.html'))\n            \n            print(f\"  ✅ Interactive visualizations exported as HTML\")\n        \n        # Export custom network graph if available\n        if 'network_fig' in globals():\n            network_fig.write_html(os.path.join(viz_dir, 'network_graph.html'))\n        \n        # Export Sankey diagram if available\n        if 'sankey_fig' in globals():\n            sankey_fig.write_html(os.path.join(viz_dir, 'sankey_diagram.html'))\n            \n    except Exception as e:\n        print(f\"  ⚠️ Some visualizations could not be exported: {e}\")\n    \n    # 6. Export generated Airflow DAG\n    print(\"\\n🚀 6. Exporting Airflow DAG...\")\n    airflow_dir = os.path.join(export_dir, 'airflow')\n    os.makedirs(airflow_dir, exist_ok=True)\n    \n    if 'dag_code' in globals() and dag_code:\n        dag_file = os.path.join(airflow_dir, 'generated_dag.py')\n        with open(dag_file, 'w') as f:\n            f.write(dag_code)\n        exported_files.append(dag_file)\n        print(f\"  ✅ Airflow DAG code exported\")\n    \n    if 'deployment_package' in globals() and deployment_package:\n        config_file = os.path.join(airflow_dir, 'deployment_config.json')\n        with open(config_file, 'w') as f:\n            json.dump(deployment_package['config'], f, indent=2)\n        exported_files.append(config_file)\n        print(f\"  ✅ Deployment configuration exported\")\n    \n    # 7. Create export manifest\n    print(\"\\n📋 7. Creating export manifest...\")\n    manifest = {\n        'export_info': {\n            'timestamp': timestamp,\n            'export_date': datetime.now().isoformat(),\n            'notebook_version': '1.0.0',\n            'total_files': len(exported_files)\n        },\n        'pipeline_summary': {\n            'datasets_loaded': len(loaded_datasets),\n            'transformations_executed': len(executor.execution_results),\n            'total_datasets_generated': len(executor.datasets),\n            'lineage_tracked': executor.lineage_data is not None\n        },\n        'exported_files': exported_files,\n        'file_structure': {\n            'datasets/': 'Transformed datasets in Parquet and CSV format',\n            'lineage/': 'Complete data lineage metadata',\n            'quality_reports/': 'Data quality assessment reports',\n            'visualizations/': 'Interactive HTML visualizations',\n            'airflow/': 'Generated Airflow DAG and configuration'\n        }\n    }\n    \n    manifest_file = os.path.join(export_dir, 'export_manifest.json')\n    with open(manifest_file, 'w') as f:\n        json.dump(manifest, f, indent=2)\n    \n    # Create README\n    readme_content = f'''\n# Data Lineage Pipeline Export\n\nGenerated on: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n## Contents\n\n### 📊 Datasets\n- **Location**: `datasets/`\n- **Format**: Parquet (full data) + CSV (samples)\n- **Count**: {len(executor.datasets)} datasets\n\n### 🔗 Lineage\n- **Location**: `lineage/`\n- **Format**: JSON metadata\n- **Transformations tracked**: {len(executor.execution_results)}\n\n### 🎯 Quality Reports\n- **Location**: `quality_reports/`\n- **Content**: Data quality assessments for all datasets\n\n### 📈 Visualizations\n- **Location**: `visualizations/`\n- **Format**: Interactive HTML files\n- **Types**: Lineage graphs, dependency matrices, transformation summaries\n\n### 🚀 Airflow DAG\n- **Location**: `airflow/`\n- **Content**: Production-ready DAG code and configuration\n\n## Usage\n\n1. **View Visualizations**: Open HTML files in `visualizations/` with a web browser\n2. **Deploy Pipeline**: Use files in `airflow/` for production deployment\n3. **Analyze Data**: Load Parquet files from `datasets/` for further analysis\n4. **Review Lineage**: Examine JSON files in `lineage/` for complete data flow\n\n## Next Steps\n\n1. Review the generated Airflow DAG in `airflow/generated_dag.py`\n2. Customize data source and sink paths for your environment\n3. Deploy to your Airflow instance\n4. Monitor pipeline execution and lineage tracking\n\nFor questions or issues, refer to the project documentation.\n'''\n    \n    readme_file = os.path.join(export_dir, 'README.md')\n    with open(readme_file, 'w') as f:\n        f.write(readme_content)\n    \n    print(f\"\\n✅ EXPORT COMPLETED SUCCESSFULLY!\")\n    print(f\"📁 Export location: {export_dir}\")\n    print(f\"📊 Total files exported: {len(exported_files) + 2}\")\n    print(f\"💾 Total datasets: {len(executor.datasets)}\")\n    print(f\"🔧 Transformations: {len(executor.execution_results)}\")\n    \n    return export_dir, manifest\n\n# Execute the complete export\nif executor.execution_results:\n    export_location, export_manifest = export_all_results()\n    \n    # Final summary\n    print(\"\\n\" + \"=\" * 60)\n    print(\"🎉 DATA LINEAGE PIPELINE NOTEBOOK COMPLETED!\")\n    print(\"=\" * 60)\n    \n    print(f\"\\n📈 Pipeline Summary:\")\n    print(f\"  • Original datasets loaded: {len(loaded_datasets)}\")\n    print(f\"  • Transformations executed: {len(executor.execution_results)}\")\n    print(f\"  • Final datasets generated: {len(executor.datasets)}\")\n    print(f\"  • Lineage relationships tracked: {len(executor.lineage_data.get('dependencies', []))}\")\n    print(f\"  • Export completed: {export_location}\")\n    \n    print(f\"\\n🎯 What you've accomplished:\")\n    print(f\"  ✅ Built an interactive data transformation pipeline\")\n    print(f\"  ✅ Tracked complete data lineage from source to destination\")\n    print(f\"  ✅ Created beautiful interactive visualizations\")\n    print(f\"  ✅ Generated production-ready Airflow DAG code\")\n    print(f\"  ✅ Exported everything for deployment and future use\")\n    \n    print(f\"\\n🚀 Next steps:\")\n    print(f\"  1. Review exported files in: {export_location}\")\n    print(f\"  2. Customize the Airflow DAG for your environment\")\n    print(f\"  3. Deploy to production and monitor lineage\")\n    print(f\"  4. Use the Streamlit UI for ongoing pipeline management\")\n    \nelse:\n    print(\"⚠️ No execution results to export. Please run the sample pipeline first.\")\n\n# Clean up Spark session\nif 'spark' in globals():\n    print(f\"\\n🔄 Cleaning up Spark session...\")\n    # Uncomment the line below to stop Spark session\n    # spark.stop()\n    print(f\"✅ Cleanup completed!\")