# NASA NEO Data Processing Pipeline 🚀

This notebook demonstrates the complete ETL process for NASA Near-Earth Objects data:

1. **Data Extraction** from NASA API
2. **Data Processing** with PySpark 
3. **Data Storage** to MySQL database
4. **Automated Scheduling** with Apache Airflow

## Features:
- ✅ Robust error handling
- ✅ Environment-based configuration 
- ✅ Data quality validation
- ✅ Production-ready code
- ✅ Weekly automation via Airflow

In [None]:
# Step 1: Environment Setup and Configuration 🔧
import sys
import os
import logging
from datetime import datetime, timedelta

# Configure logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# PySpark imports
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType
from pyspark.sql.functions import *

# Set Python executables explicitly to avoid worker communication issues
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable

# Additional environment variables for stability
os.environ['PYTHONPATH'] = os.pathsep.join(sys.path)
os.environ['SPARK_LOCAL_IP'] = '127.0.0.1'

# Configuration settings
class Config:
    NASA_API_KEY = os.getenv('NASA_API_KEY', '3NcI1W7rKoehu3KEtqGFjh4nwV6nuO6v7a3WhYrC')
    MYSQL_HOST = os.getenv('MYSQL_HOST', 'localhost')
    MYSQL_PORT = int(os.getenv('MYSQL_PORT', '3306'))
    MYSQL_USER = os.getenv('MYSQL_USER', 'airflow')
    MYSQL_PASSWORD = os.getenv('MYSQL_PASSWORD', 'airflow')
    MYSQL_DATABASE = os.getenv('MYSQL_DATABASE', 'airflow')

logger.info("✅ Environment variables configured successfully")
logger.info(f"PYSPARK_PYTHON: {os.environ['PYSPARK_PYTHON']}")
logger.info(f"PYSPARK_DRIVER_PYTHON: {os.environ['PYSPARK_DRIVER_PYTHON']}")
logger.info(f"SPARK_LOCAL_IP: {os.environ['SPARK_LOCAL_IP']}")

✅ Environment variables configured:
PYSPARK_PYTHON: c:\Users\mouad\AppData\Local\Programs\Python\Python310\python.exe
PYSPARK_DRIVER_PYTHON: c:\Users\mouad\AppData\Local\Programs\Python\Python310\python.exe
SPARK_LOCAL_IP: 127.0.0.1


In [None]:
# Step 2: Initialize Spark Session with Production Settings ⚡
def create_spark_session():
    """Create optimized Spark session for production use"""
    try:
        spark = SparkSession.builder \
            .appName("nasa_neo_etl_production") \
            .master("local[*]") \
            .config("spark.executor.memory", "4g") \
            .config("spark.driver.memory", "2g") \
            .config("spark.sql.adaptive.enabled", "true") \
            .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
            .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
            .config("spark.sql.execution.arrow.pyspark.enabled", "false") \
            .config("spark.sql.adaptive.advisoryPartitionSizeInBytes", "128MB") \
            .getOrCreate()
        
        logger.info(f"🎉 SUCCESS! SparkSession created - Version: {spark.version}")
        logger.info(f"✅ App name: {spark.sparkContext.appName}")
        logger.info(f"✅ Master: {spark.sparkContext.master}")
        logger.info(f"✅ Available cores: {spark.sparkContext.defaultParallelism}")
        
        return spark
    except Exception as e:
        logger.error(f"❌ Failed to create Spark session: {e}")
        raise

# Create the Spark session
spark = create_spark_session()

🎉 SUCCESS! SparkSession created without errors!
✅ Spark version: 3.5.7
✅ App name: worker_fix_test
✅ Master: local[1]


In [None]:
# Step 3: Enhanced NASA API Data Extraction 🛰️
import json
import requests
from datetime import datetime, timedelta

def fetch_nasa_neo_data(start_date=None, end_date=None, api_key=None):
    """
    Fetch NASA NEO data with error handling and validation
    
    Args:
        start_date (str): Start date in YYYY-MM-DD format
        end_date (str): End date in YYYY-MM-DD format  
        api_key (str): NASA API key
        
    Returns:
        list: Flattened list of NEO objects
    """
    try:
        # Default to last 7 days if dates not provided
        if not end_date:
            end_date = datetime.now().strftime('%Y-%m-%d')
        if not start_date:
            start_date = (datetime.now() - timedelta(days=7)).strftime('%Y-%m-%d')
        
        api_key = api_key or Config.NASA_API_KEY
        
        logger.info(f"🔍 Fetching NASA NEO data from {start_date} to {end_date}")
        
        # Build API URL
        url = (f"https://api.nasa.gov/neo/rest/v1/feed"
               f"?start_date={start_date}&end_date={end_date}&api_key={api_key}")
        
        # Make API request with timeout and error handling
        response = requests.get(url, timeout=30)
        response.raise_for_status()
        
        data = response.json()
        
        # Validate API response
        if 'near_earth_objects' not in data:
            raise ValueError("Invalid API response: missing 'near_earth_objects' field")
        
        # Flatten the nested structure
        neo_list = []
        total_objects = 0
        
        for date, objects in data["near_earth_objects"].items():
            logger.info(f"📅 {date}: {len(objects)} objects")
            for obj in objects:
                obj["observation_date"] = date
                neo_list.append(obj)
                total_objects += 1
        
        logger.info(f"✅ Successfully extracted {total_objects} NEO records")
        
        # Data quality check
        if total_objects == 0:
            logger.warning("⚠️ No NEO data found for the specified date range")
        
        return neo_list
        
    except requests.RequestException as e:
        logger.error(f"❌ API request failed: {e}")
        raise
    except Exception as e:
        logger.error(f"❌ Data extraction failed: {e}")
        raise

# Fetch the data
start_date = "2025-10-01"
end_date = "2025-10-07"
neo_data = fetch_nasa_neo_data(start_date, end_date)

logger.info(f"📊 Data extraction complete: {len(neo_data)} total records")

In [None]:
# Step 4: Data Export and Spark DataFrame Creation 💾
import pandas as pd

# Export to JSON for backup/debugging
backup_file = f"neo_data_backup_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(backup_file, 'w') as f:
    json.dump(neo_data, f, indent=2)

logger.info(f"📁 Backup saved to: {backup_file}")

# Create pandas DataFrame for initial inspection
pandas_df = pd.DataFrame(neo_data)
logger.info(f"📋 Pandas DataFrame shape: {pandas_df.shape}")
logger.info(f"🔍 Columns: {list(pandas_df.columns)}")

# Create Spark DataFrame
df = spark.createDataFrame(pandas_df)
logger.info(f"⚡ Spark DataFrame created successfully")

# Display basic info
print(f"\n📊 Dataset Overview:")
print(f"   • Total Records: {df.count()}")
print(f"   • Total Columns: {len(df.columns)}")
print(f"   • Date Range: {start_date} to {end_date}")

df.show(5, truncate=False)

In [None]:
# Step 5: Schema Analysis and Data Exploration 🔍
def analyze_dataframe_schema(df):
    """Analyze and display DataFrame schema information"""
    logger.info("🔍 Analyzing DataFrame schema...")
    
    print("📋 DataFrame Schema:")
    df.printSchema()
    
    print(f"\n📊 Row Count: {df.count()}")
    
    # Check for nulls in key columns
    null_counts = {}
    key_columns = ['id', 'neo_reference_id', 'name', 'estimated_diameter', 'close_approach_data']
    
    for col_name in key_columns:
        if col_name in df.columns:
            null_count = df.filter(col(col_name).isNull()).count()
            null_counts[col_name] = null_count
            
    print(f"\n🔍 Null Value Analysis:")
    for col_name, null_count in null_counts.items():
        print(f"   • {col_name}: {null_count} nulls")
    
    return df

# Analyze the DataFrame
df_analyzed = analyze_dataframe_schema(df)

# Sample data exploration
print(f"\n🎯 Sample Records:")
df.select("id", "name", "observation_date", "is_potentially_hazardous_asteroid").show(10)

+--------------------+--------+----------------+------------+--------------------+--------------------+--------------------+---------------------------------+--------------------+----------------+----------+--------------------+
|               links|      id|neo_reference_id|        name|        nasa_jpl_url|absolute_magnitude_h|  estimated_diameter|is_potentially_hazardous_asteroid| close_approach_data|is_sentry_object|      date|         sentry_data|
+--------------------+--------+----------------+------------+--------------------+--------------------+--------------------+---------------------------------+--------------------+----------------+----------+--------------------+
|{self -> http://a...| 3427459|         3427459|   (2008 SS)|https://ssd.jpl.n...|               22.18|{feet -> {estimat...|                            false|[{relative_veloci...|           false|2025-10-03|                 NaN|
|{self -> http://a...| 3716631|         3716631|  (2015 HN9)|https://ssd.jpl.n...|  

In [None]:
# Step 6: Advanced Data Processing - Extract Nested Fields 🛠️
def process_neo_dataframe(df):
    """
    Process NEO DataFrame to extract and flatten nested fields
    
    Args:
        df: Input Spark DataFrame
        
    Returns:
        Processed Spark DataFrame with flattened fields
    """
    try:
        logger.info("🛠️ Processing NEO data - extracting nested fields...")
        
        # Extract diameter information from nested structure
        df_with_diameter = df.withColumn("estimated_diameter_km_max", 
                                       col("estimated_diameter").getItem("kilometers").getItem("estimated_diameter_max")) \
                            .withColumn("estimated_diameter_km_min", 
                                       col("estimated_diameter").getItem("kilometers").getItem("estimated_diameter_min")) \
                            .withColumn("estimated_diameter_m_max", 
                                       col("estimated_diameter").getItem("meters").getItem("estimated_diameter_max")) \
                            .withColumn("estimated_diameter_m_min", 
                                       col("estimated_diameter").getItem("meters").getItem("estimated_diameter_min"))
        
        # Select relevant columns for the base dataset
        df_processed = df_with_diameter.select(
            "id",
            "neo_reference_id", 
            "name",
            "absolute_magnitude_h", 
            "is_potentially_hazardous_asteroid", 
            "estimated_diameter_km_max", 
            "estimated_diameter_km_min",
            "estimated_diameter_m_max",
            "estimated_diameter_m_min",
            "close_approach_data", 
            "observation_date",
            "is_sentry_object",
            "sentry_data"
        )
        
        logger.info("✅ Basic field extraction completed")
        
        return df_processed
        
    except Exception as e:
        logger.error(f"❌ Data processing failed: {e}")
        raise

# Process the DataFrame
df_processed = process_neo_dataframe(df)

print("📊 Processed DataFrame Preview:")
df_processed.show(10, truncate=False)

+--------+----------------+------------+--------------------+---------------------------------+-------------------------+-------------------------+--------------------+----------+----------------+--------------------+
|      id|neo_reference_id|        name|absolute_magnitude_h|is_potentially_hazardous_asteroid|estimated_diameter_km_max|estimated_diameter_km_min| close_approach_data|      date|is_sentry_object|         sentry_data|
+--------+----------------+------------+--------------------+---------------------------------+-------------------------+-------------------------+--------------------+----------+----------------+--------------------+
| 3427459|         3427459|   (2008 SS)|               22.18|                            false|              0.217791025|             0.0973991073|[{relative_veloci...|2025-10-03|           false|                 NaN|
| 3716631|         3716631|  (2015 HN9)|                22.6|                            false|             0.1794898848|       

In [62]:
df.printSchema()


root
 |-- links: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- id: string (nullable = true)
 |-- neo_reference_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- nasa_jpl_url: string (nullable = true)
 |-- absolute_magnitude_h: double (nullable = true)
 |-- estimated_diameter: map (nullable = true)
 |    |-- key: string
 |    |-- value: map (valueContainsNull = true)
 |    |    |-- key: string
 |    |    |-- value: double (valueContainsNull = true)
 |-- is_potentially_hazardous_asteroid: boolean (nullable = true)
 |-- close_approach_data: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- is_sentry_object: boolean (nullable = true)
 |-- date: string (nullable = true)
 |-- sentry_data: string (nullable = true)



In [None]:
# Step 7: Complex Nested Data Processing - Close Approach Data 🎯
def process_close_approach_data(df):
    """
    Process complex nested close approach data with error handling
    
    Args:
        df: Input DataFrame with close_approach_data column
        
    Returns:
        DataFrame with extracted close approach information
    """
    try:
        logger.info("🎯 Processing close approach data...")
        
        # Extract first close approach record (most relevant)
        df_with_approach = df.withColumn("close_approach_date_full", 
                                       col("close_approach_data").getItem(0).getItem("close_approach_date_full")) \
                           .withColumn("close_approach_date", 
                                      col("close_approach_data").getItem(0).getItem("close_approach_date")) \
                           .withColumn("velocity_data_str", 
                                      col("close_approach_data").getItem(0).getItem("relative_velocity")) \
                           .withColumn("miss_distance_str", 
                                      col("close_approach_data").getItem(0).getItem("miss_distance")) \
                           .withColumn("orbiting_body", 
                                      col("close_approach_data").getItem(0).getItem("orbiting_body"))
        
        logger.info("✅ Close approach data extraction completed")
        
        # Show intermediate results for validation
        print("🔍 Intermediate Results - Close Approach Data:")
        df_with_approach.select(
            "id", "name", "close_approach_date_full", 
            "velocity_data_str", "miss_distance_str", "orbiting_body"
        ).show(5, truncate=False)
        
        return df_with_approach
        
    except Exception as e:
        logger.error(f"❌ Close approach data processing failed: {e}")
        logger.info("ℹ️ This might be due to missing close approach data for some records")
        # Return original DataFrame if processing fails
        return df

# Process close approach data
df_with_approach = process_close_approach_data(df_processed)

+--------+----------------+------------+--------------------+---------------------------------+-------------------------+-------------------------+------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----------------+-----------+------------+------------------------+------------------------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------------------------+-------------+
|id      |neo_reference_id|name        |absolute_magnitude_h|is_potentially_hazardous_asteroid|estimated_diameter_km_max|estimated_dia

In [None]:
# Step 8: Final Data Processing - Numerical Extraction and Cleanup 🧹
from pyspark.sql.functions import regexp_extract, col, when, coalesce, lit
from pyspark.sql.types import DoubleType

def extract_numerical_values(df):
    """
    Extract numerical values from string-formatted nested data
    
    Args:
        df: DataFrame with string-formatted velocity and distance data
        
    Returns:
        Final cleaned DataFrame ready for database insertion
    """
    try:
        logger.info("🧹 Extracting numerical values from nested string data...")
        
        # Extract velocity (km/s) using regex
        df_with_velocity = df.withColumn("velocity_kps", 
                                       regexp_extract(col("velocity_data_str"), r"kilometers_per_second=([0-9.]+)", 1).cast(DoubleType()))
        
        # Extract miss distance (km) using regex  
        df_with_distance = df_with_velocity.withColumn("miss_distance_km", 
                                                     regexp_extract(col("miss_distance_str"), r"kilometers=([0-9.]+)", 1).cast(DoubleType()))
        
        # Add additional velocity measurements
        df_with_extra = df_with_distance.withColumn("velocity_kmh", 
                                                  regexp_extract(col("velocity_data_str"), r"kilometers_per_hour=([0-9.]+)", 1).cast(DoubleType())) \
                                       .withColumn("miss_distance_au", 
                                                  regexp_extract(col("miss_distance_str"), r"astronomical=([0-9.]+)", 1).cast(DoubleType()))
        
        # Create final cleaned dataset
        df_final = df_with_extra.select(
            "id",
            "neo_reference_id", 
            "name",
            col("absolute_magnitude_h").cast(DoubleType()).alias("absolute_magnitude_h"),
            "is_potentially_hazardous_asteroid", 
            col("estimated_diameter_km_max").cast(DoubleType()).alias("estimated_diameter_km_max"),
            col("estimated_diameter_km_min").cast(DoubleType()).alias("estimated_diameter_km_min"),
            "close_approach_date_full", 
            "close_approach_date",
            "velocity_kps", 
            "velocity_kmh",
            "miss_distance_km",
            "miss_distance_au", 
            "orbiting_body",
            col("observation_date").alias("observation_date"),
            "is_sentry_object"
        )
        
        # Data quality improvements
        df_clean = df_final.fillna({
            'velocity_kps': 0.0,
            'velocity_kmh': 0.0, 
            'miss_distance_km': 0.0,
            'miss_distance_au': 0.0,
            'estimated_diameter_km_max': 0.0,
            'estimated_diameter_km_min': 0.0,
            'absolute_magnitude_h': 0.0,
            'orbiting_body': 'Unknown'
        })
        
        logger.info("✅ Numerical extraction and cleanup completed")
        
        return df_clean
        
    except Exception as e:
        logger.error(f"❌ Numerical extraction failed: {e}")
        raise

# Process final data
df_final_clean = extract_numerical_values(df_with_approach)

# Display final results
print("🎉 Final Processed Dataset:")
print(f"   • Total Records: {df_final_clean.count()}")
print(f"   • Total Columns: {len(df_final_clean.columns)}")

df_final_clean.show(15, truncate=False)

# Display summary statistics
print("\n📊 Summary Statistics:")
df_final_clean.describe().show()

+--------+----------------+------------+--------------------+---------------------------------+-------------------------+-------------------------+------------------------+-------------+--------------------+-------------+
|      id|neo_reference_id|        name|absolute_magnitude_h|is_potentially_hazardous_asteroid|estimated_diameter_km_max|estimated_diameter_km_min|close_approach_date_full| velocity_kps|    miss_distance_km|orbiting_body|
+--------+----------------+------------+--------------------+---------------------------------+-------------------------+-------------------------+------------------------+-------------+--------------------+-------------+
| 3427459|         3427459|   (2008 SS)|               22.18|                            false|              0.217791025|             0.0973991073|       2025-Oct-03 22:04|14.5278596145|1.7860588568532284E7|        Earth|
| 3716631|         3716631|  (2015 HN9)|                22.6|                            false|             0.17

In [None]:
# Step 9: MySQL Database Integration 🗄️
import pandas as pd
from sqlalchemy import create_engine, text
import pymysql

def create_mysql_connection():
    """Create MySQL database connection"""
    try:
        connection_string = f"mysql+pymysql://{Config.MYSQL_USER}:{Config.MYSQL_PASSWORD}@{Config.MYSQL_HOST}:{Config.MYSQL_PORT}/{Config.MYSQL_DATABASE}"
        engine = create_engine(connection_string)
        logger.info("✅ MySQL connection established")
        return engine
    except Exception as e:
        logger.error(f"❌ MySQL connection failed: {e}")
        raise

def create_neo_table(engine):
    """Create NEO data table if it doesn't exist"""
    try:
        create_table_sql = """
        CREATE TABLE IF NOT EXISTS nasa_neo_data (
            id VARCHAR(20) PRIMARY KEY,
            neo_reference_id VARCHAR(20) NOT NULL,
            name VARCHAR(255),
            absolute_magnitude_h DECIMAL(10,6),
            is_potentially_hazardous_asteroid BOOLEAN,
            estimated_diameter_km_max DECIMAL(15,6),
            estimated_diameter_km_min DECIMAL(15,6),
            close_approach_date_full VARCHAR(50),
            close_approach_date VARCHAR(20),
            velocity_kps DECIMAL(15,6),
            velocity_kmh DECIMAL(15,6),
            miss_distance_km DECIMAL(20,6),
            miss_distance_au DECIMAL(15,10),
            orbiting_body VARCHAR(50),
            observation_date DATE,
            is_sentry_object BOOLEAN,
            created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
            updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
            INDEX idx_observation_date (observation_date),
            INDEX idx_hazardous (is_potentially_hazardous_asteroid),
            INDEX idx_orbiting_body (orbiting_body)
        ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
        """
        
        with engine.connect() as conn:
            conn.execute(text(create_table_sql))
            conn.commit()
        
        logger.info("✅ NEO data table created/verified")
        
    except Exception as e:
        logger.error(f"❌ Table creation failed: {e}")
        raise

def load_data_to_mysql(df_spark, engine):
    """Load processed data to MySQL database"""
    try:
        # Convert Spark DataFrame to Pandas for easier MySQL insertion
        df_pandas = df_spark.toPandas()
        
        logger.info(f"🔄 Loading {len(df_pandas)} records to MySQL...")
        
        # Data type conversions for MySQL compatibility
        df_pandas['is_potentially_hazardous_asteroid'] = df_pandas['is_potentially_hazardous_asteroid'].astype(bool)
        df_pandas['is_sentry_object'] = df_pandas['is_sentry_object'].astype(bool)
        df_pandas['observation_date'] = pd.to_datetime(df_pandas['observation_date']).dt.date
        
        # Handle potential duplicates - replace existing records
        df_pandas.to_sql(
            name='nasa_neo_data',
            con=engine,
            if_exists='replace',  # Replace existing data for demo
            index=False,
            method='multi',
            chunksize=1000
        )
        
        logger.info("✅ Data successfully loaded to MySQL database")
        
        # Verify the data
        with engine.connect() as conn:
            result = conn.execute(text("SELECT COUNT(*) as count FROM nasa_neo_data"))
            count = result.fetchone()[0]
            logger.info(f"🔍 Verification: {count} records in database")
        
        return True
        
    except Exception as e:
        logger.error(f"❌ MySQL data loading failed: {e}")
        raise

# Execute MySQL integration
try:
    # Create database connection
    mysql_engine = create_mysql_connection()
    
    # Create table
    create_neo_table(mysql_engine)
    
    # Load data
    success = load_data_to_mysql(df_final_clean, mysql_engine)
    
    if success:
        print("🎉 MySQL Integration Complete!")
        print(f"   • Database: {Config.MYSQL_DATABASE}")
        print(f"   • Table: nasa_neo_data") 
        print(f"   • Records: {df_final_clean.count()}")
        
except Exception as e:
    logger.error(f"❌ MySQL integration failed: {e}")
    print("⚠️ MySQL integration failed. Check your database configuration.")

In [None]:
# Step 10: Data Quality Checks and Pipeline Summary 📊
def run_data_quality_checks(engine):
    """Run comprehensive data quality checks"""
    try:
        logger.info("🔍 Running data quality checks...")
        
        quality_queries = {
            "Total Records": "SELECT COUNT(*) FROM nasa_neo_data",
            "Unique Objects": "SELECT COUNT(DISTINCT id) FROM nasa_neo_data", 
            "Hazardous Objects": "SELECT COUNT(*) FROM nasa_neo_data WHERE is_potentially_hazardous_asteroid = 1",
            "Objects per Date": "SELECT observation_date, COUNT(*) as count FROM nasa_neo_data GROUP BY observation_date ORDER BY observation_date",
            "Average Diameter": "SELECT AVG(estimated_diameter_km_max) as avg_diameter FROM nasa_neo_data",
            "Speed Statistics": "SELECT MIN(velocity_kps) as min_speed, MAX(velocity_kps) as max_speed, AVG(velocity_kps) as avg_speed FROM nasa_neo_data",
            "Missing Values": "SELECT SUM(CASE WHEN velocity_kps = 0 THEN 1 ELSE 0 END) as missing_velocity, SUM(CASE WHEN miss_distance_km = 0 THEN 1 ELSE 0 END) as missing_distance FROM nasa_neo_data"
        }
        
        print("📊 DATA QUALITY REPORT")
        print("=" * 50)
        
        with engine.connect() as conn:
            for check_name, query in quality_queries.items():
                result = conn.execute(text(query))
                
                if check_name == "Objects per Date":
                    print(f"\n{check_name}:")
                    for row in result:
                        print(f"   {row[0]}: {row[1]} objects")
                else:
                    row = result.fetchone()
                    if len(row) == 1:
                        print(f"{check_name}: {row[0]}")
                    else:
                        print(f"{check_name}: {dict(zip(result.keys(), row))}")
        
        logger.info("✅ Data quality checks completed")
        
    except Exception as e:
        logger.error(f"❌ Data quality checks failed: {e}")

def print_pipeline_summary():
    """Print comprehensive pipeline summary"""
    print("\n" + "=" * 60)
    print("🚀 NASA NEO DATA PIPELINE SUMMARY")
    print("=" * 60)
    print(f"📅 Processing Date: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"🗓️ Data Range: {start_date} to {end_date}")
    print(f"📊 Total Records Processed: {df_final_clean.count()}")
    print(f"🔧 Spark Version: {spark.version}")
    print(f"🗄️ Database: MySQL ({Config.MYSQL_HOST}:{Config.MYSQL_PORT})")
    print(f"📋 Table: nasa_neo_data")
    
    print("\n🔄 PIPELINE STAGES:")
    print("   1. ✅ Environment Setup")
    print("   2. ✅ Spark Session Creation")  
    print("   3. ✅ NASA API Data Extraction")
    print("   4. ✅ Data Processing & Transformation")
    print("   5. ✅ Nested Field Extraction")
    print("   6. ✅ Numerical Data Cleanup")
    print("   7. ✅ MySQL Database Integration")
    print("   8. ✅ Data Quality Validation")
    
    print("\n🎯 NEXT STEPS:")
    print("   • Run Airflow DAG: nasa_neo_pipeline")
    print("   • Schedule: Every 7 days")
    print("   • Monitor: Check Airflow UI for execution status")
    print("   • Query: Access data via MySQL nasa_neo_data table")
    
    print("\n📝 AIRFLOW COMMAND:")
    print("   docker-compose up -d")
    print("   # Access Airflow UI at http://localhost:8080")
    print("   # Username: airflow, Password: airflow")
    
    print("=" * 60)

# Run data quality checks
if 'mysql_engine' in locals():
    run_data_quality_checks(mysql_engine)

# Print pipeline summary
print_pipeline_summary()

# Cleanup Spark session
try:
    spark.stop()
    logger.info("🔄 Spark session stopped cleanly")
except:
    pass

print("🎉 NASA NEO Data Pipeline execution completed successfully!")