In [None]:
!pip install pyspark==3.4.0
!pip install delta-spark==2.4.0
!pip install faker==19.6.2
!pip install schedule==1.2.0

# Install Java (required for Spark)
!apt-get update -qq
!apt-get install -y openjdk-8-jdk-headless -qq > /dev/null
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"

print("‚úÖ All dependencies installed successfully!")

Collecting pyspark==3.4.0
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m310.8/310.8 MB[0m [31m5.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hcanceled
[31mERROR: Operation cancelled by user[0m[31m
[0mCollecting delta-spark==2.4.0
  Downloading delta_spark-2.4.0-py3-none-any.whl.metadata (1.9 kB)
Collecting pyspark<3.5.0,>=3.4.0 (from delta-spark==2.4.0)
  Downloading pyspark-3.4.4.tar.gz (311.4 MB)
[2K     [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m311.4/311.4 MB[0m [31m4.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Downloading delta_spark-2.4.0-py3-none-any.w

In [None]:
import os
import time
import threading
from datetime import datetime, timezone
from faker import Faker
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import current_timestamp, col
from pyspark.sql.types import StructType, StructField, StringType, TimestampType
from IPython.display import display, HTML, clear_output
import json
import schedule

# Email imports - handled separately to avoid conflicts
try:
    import smtplib
    from email.mime.text import MIMEText
    from email.mime.multipart import MIMEMultipart
    EMAIL_AVAILABLE = True
    print("‚úÖ Email libraries imported successfully")
except ImportError as e:
    print(f"‚ö†Ô∏è Email libraries not available: {e}")
    print("üìß Email notifications will be disabled")
    EMAIL_AVAILABLE = False

# Schedule import - optional
try:
    import schedule
    SCHEDULE_AVAILABLE = True
    print("‚úÖ Schedule library imported successfully")
except ImportError:
    print("‚ö†Ô∏è Schedule library not found, installing...")
    !pip install schedule==1.2.0
    import schedule
    SCHEDULE_AVAILABLE = True

# For Colab, we need to handle Delta differently
try:
    from delta import *
    from delta.tables import DeltaTable
    print("‚úÖ Delta Lake libraries imported successfully")
except ImportError:
    print("‚ö†Ô∏è Delta libraries not found, installing...")
    !pip install delta-spark==2.4.0
    from delta import *
    from delta.tables import DeltaTable
    print("‚úÖ Delta Lake libraries installed and imported")

print("üéâ All libraries imported successfully!")

‚úÖ Email libraries imported successfully
‚úÖ Schedule library imported successfully
‚úÖ Delta Lake libraries imported successfully
üéâ All libraries imported successfully!


In [2]:
import json
# Configuration for Google Colab
config = {
    'delta_table_path': '/content/delta-table/customer_data',  # Colab local storage
    'timezone': 'Asia/Kolkata',
    'records_per_iteration': 5,
    'schedule_interval_minutes': 2,  # Shorter interval for demo

    # Email configs (‚úÖ enable this section properly)
    'email': {
        'enabled': True,  # Set to True and configure if you want email notifications
        'smtp_server': 'smtp.gmail.com',
        'smtp_port': 587,
        'sender_email': 'jainsanskrati88@gmail.com',
        'sender_password': 'zijwxacswhujeifm',  # Use app password for Gmail
        'recipient_emails': ['sanskratijain88@gmail.com']
    }
}

print("Configuration set:")

print(json.dumps(config, indent=2))

Configuration set:
{
  "delta_table_path": "/content/delta-table/customer_data",
  "timezone": "Asia/Kolkata",
  "records_per_iteration": 5,
  "schedule_interval_minutes": 2,
  "email": {
    "enabled": true,
    "smtp_server": "smtp.gmail.com",
    "smtp_port": 587,
    "sender_email": "jainsanskrati88@gmail.com",
    "sender_password": "zijwxacswhujeifm",
    "recipient_emails": [
      "sanskratijain88@gmail.com"
    ]
  }
}


In [None]:
class ColabDeltaDataPipeline:
    def __init__(self, config):
        """Initialize the Delta Data Pipeline for Google Colab"""
        self.config = config
        self.fake = Faker()
        self.spark = None
        self.delta_table_path = config['delta_table_path']
        self.setup_spark()
        self.running = False
        self.iteration_count = 0;

    def setup_spark(self):
        """Setup Spark session with Delta Lake configuration for Colab"""
        try:
            # Create directory if it doesn't exist
            os.makedirs(os.path.dirname(self.delta_table_path), exist_ok=True)

            builder = SparkSession.builder \
                .appName("ColabDeltaDataPipeline") \
                .master("local[*]") \
                .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                .config("spark.sql.adaptive.enabled", "true") \
                .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
                .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
                .config("spark.sql.session.timeZone", self.config.get('timezone', 'UTC')) \
                .config("spark.driver.memory", "2g") \
                .config("spark.driver.maxResultSize", "1g")

            # Configure for Delta Lake
            self.spark = configure_spark_with_delta_pip(builder).getOrCreate()
            self.spark.sparkContext.setLogLevel("WARN")
            print("‚úÖ Spark session initialized successfully")

        except Exception as e:
            print(f"‚ùå Failed to initialize Spark session: {str(e)}")
            raise

    def generate_fake_data(self, num_records=10):
        """Generate fake data for Name, Address, Email"""
        fake_data = []

        for _ in range(num_records):
            record = {
                'Name': self.fake.name(),
                'Address': self.fake.address().replace('\n', ', '),
                'Email': self.fake.email(),
                'created_timestamp': datetime.now(timezone.utc).isoformat()
            }
            fake_data.append(record)

        print(f"üìä Generated {num_records} fake records")
        return fake_data

    def create_delta_table_if_not_exists(self):
        """Create Delta table if it doesn't exist"""
        try:
            if not os.path.exists(self.delta_table_path):
                # Create initial empty DataFrame with schema
                initial_data = self.generate_fake_data(1)
                df = self.spark.createDataFrame(initial_data)
                df = df.withColumn("ingestion_timestamp", current_timestamp())

                # Write initial data to create the table
                df.write \
                  .format("delta") \
                  .mode("overwrite") \
                  .save(self.delta_table_path)

                print(f"‚úÖ Delta table created at {self.delta_table_path}")
            else:
                print("‚ÑπÔ∏è Delta table already exists")

        except Exception as e:
            print(f"‚ùå Failed to create Delta table: {str(e)}")
            raise

    def append_data_to_delta_table(self, data):
        """Append data to Delta table"""
        try:
            # Convert data to Spark DataFrame
            df = self.spark.createDataFrame(data)
            df = df.withColumn("ingestion_timestamp", current_timestamp())

            # Append to Delta table
            df.write \
              .format("delta") \
              .mode("append") \
              .save(self.delta_table_path)

            # Get version info
            delta_table = DeltaTable.forPath(self.spark, self.delta_table_path)
            history = delta_table.history(1).collect()
            current_version = history[0].version

            summary = {
                'records_appended': len(data),
                'table_version': current_version,
                'timestamp': datetime.now(timezone.utc).isoformat(),
                'appended_data': data
            }

            print(f"‚úÖ Appended {len(data)} records to Delta table. New version: {current_version}")
            return summary

        except Exception as e:
            print(f"‚ùå Failed to append data to Delta table: {str(e)}")
            raise

    def get_latest_table_contents(self, limit=None):
        """Retrieve contents of the latest version of Delta table"""
        try:
            df = self.spark.read.format("delta").load(self.delta_table_path)

            # Order by ingestion timestamp to get latest data first
            df = df.orderBy(col("ingestion_timestamp").desc())

            if limit:
                df = df.limit(limit)

            records = df.collect()
            print(f"üìã Retrieved {len(records)} records from Delta table")

            return [row.asDict() for row in records]

        except Exception as e:
            print(f"‚ùå Failed to retrieve table contents: {str(e)}")
            raise

    def get_table_version_info(self):
        """Get version information of the Delta table"""
        try:
            delta_table = DeltaTable.forPath(self.spark, self.delta_table_path)
            history = delta_table.history().collect()

            version_info = {
                'current_version': history[0].version,
                'total_versions': len(history),
                'latest_operation': history[0].operation,
                'latest_timestamp': history[0].timestamp.isoformat(),
                'version_history': [
                    {
                        'version': row.version,
                        'timestamp': row.timestamp.isoformat(),
                        'operation': row.operation
                    }
                    for row in history[:5]  # Last 5 versions
                ]
            }

            print(f"üîÑ Current Delta table version: {version_info['current_version']}")
            return version_info

        except Exception as e:
            print(f"‚ùå Failed to get version info: {str(e)}")
            raise

    def create_email_html_content(self, summary, version_info, latest_data):
        """Create HTML email content with data summary"""
        html_content = f"""
        <!DOCTYPE html>
        <html>
        <head>
            <style>
                body {{ font-family: Arial, sans-serif; margin: 20px; background-color: #f5f5f5; }}
                .container {{ background-color: white; padding: 20px; border-radius: 10px; box-shadow: 0 2px 5px rgba(0,0,0,0.1); }}
                .header {{ background-color: #4CAF50; color: white; padding: 15px; border-radius: 5px; text-align: center; }}
                .summary {{ background-color: #e8f5e8; padding: 15px; border-radius: 5px; margin: 15px 0; }}
                .data-table {{ border-collapse: collapse; width: 100%; margin: 15px 0; }}
                .data-table th, .data-table td {{ border: 1px solid #ddd; padding: 8px; text-align: left; }}
                .data-table th {{ background-color: #f2f2f2; }}
                .highlight {{ background-color: #fff3cd; padding: 10px; border-radius: 5px; margin: 10px 0; }}
                .footer {{ text-align: center; color: #666; margin-top: 20px; font-size: 12px; }}
            </style>
        </head>
        <body>
            <div class="container">
                <div class="header">
                    <h1>üöÄ Delta Pipeline Execution Report</h1>
                    <p>Iteration #{self.iteration_count} - {datetime.now().strftime('%Y-%m-%d %H:%M:%S UTC')}</p>
                </div>

                <div class="summary">
                    <h3>üìä Execution Summary</h3>
                    <ul>
                        <li><strong>Records Added:</strong> {summary['records_appended']}</li>
                        <li><strong>Table Version:</strong> {summary['table_version']}</li>
                        <li><strong>Total Versions:</strong> {version_info['total_versions']}</li>
                        <li><strong>Execution Time:</strong> {summary['timestamp']}</li>
                    </ul>
                </div>

                <div class="highlight">
                    <h3>üÜï Newly Added Data</h3>
                    <table class="data-table">
                        <tr>
                            <th>Name</th>
                            <th>Address</th>
                            <th>Email</th>
                            <th>Created Timestamp</th>
                        </tr>
        """

        # Add newly appended data to the table
        for record in summary['appended_data']:
            html_content += f"""
                        <tr>
                            <td>{record['Name']}</td>
                            <td>{record['Address'][:50]}{'...' if len(record['Address']) > 50 else ''}</td>
                            <td>{record['Email']}</td>
                            <td>{record['created_timestamp']}</td>
                        </tr>
            """

        html_content += """
                    </table>
                </div>

                <div class="summary">
                    <h3>üìã Latest Table Contents (Top 10)</h3>
                    <table class="data-table">
                        <tr>
                            <th>Name</th>
                            <th>Address</th>
                            <th>Email</th>
                            <th>Ingestion Timestamp</th>
                        </tr>
        """

        # Add latest data to the table
        for record in latest_data[:10]:
            timestamp = record.get('ingestion_timestamp', record.get('created_timestamp', 'N/A'))
            html_content += f"""
                        <tr>
                            <td>{record.get('Name', 'N/A')}</td>
                            <td>{str(record.get('Address', 'N/A'))[:50]}{'...' if len(str(record.get('Address', ''))) > 50 else ''}</td>
                            <td>{record.get('Email', 'N/A')}</td>
                            <td>{timestamp}</td>
                        </tr>
            """

        html_content += f"""
                    </table>
                </div>

                <div class="footer">
                    <p>ü§ñ Automated Delta Pipeline Report | Generated by ColabDeltaDataPipeline</p>
                    <p>Next execution scheduled in {self.config.get('schedule_interval_minutes', 5)} minutes</p>
                </div>
            </div>
        </body>
        </html>
        """

        return html_content

    def send_email_notification(self, summary, version_info, latest_data):
        """Send email notification with execution summary"""
        try:
            if not self.config['email']['enabled']:
                print("üìß Email notifications disabled")
                return

            # Create HTML content
            html_content = self.create_email_html_content(summary, version_info, latest_data)

            # Create message
            msg = MIMEMultipart('alternative')
            msg['Subject'] = f"Delta Pipeline Report - Iteration #{self.iteration_count} - {summary['records_appended']} Records Added"
            msg['From'] = self.config['email']['sender_email']
            msg['To'] = ', '.join(self.config['email']['recipient_emails'])

            # Attach HTML content
            html_part = MIMEText(html_content, 'html')
            msg.attach(html_part)

            # Send email
            with smtplib.SMTP(self.config['email']['smtp_server'], self.config['email']['smtp_port']) as server:
                server.starttls()
                server.login(self.config['email']['sender_email'], self.config['email']['sender_password'])
                server.send_message(msg)

            print(f"üìß Email notification sent successfully to {len(self.config['email']['recipient_emails'])} recipients")

        except Exception as e:
            print(f"‚ùå Failed to send email notification: {str(e)}")
            print(f"üí° Make sure to configure email settings in the config dictionary")

    def display_data_html(self, data, title="Data"):
        """Display data in HTML table format for Colab"""
        if not data:
            return

        html = f"""
        <div style="margin: 10px 0;">
            <h3 style="color: #1f77b4;">{title}</h3>
            <table style="border-collapse: collapse; width: 100%; font-family: Arial, sans-serif;">
                <tr style="background-color: #f2f2f2;">
                    <th style="border: 1px solid #ddd; padding: 8px; text-align: left;">Name</th>
                    <th style="border: 1px solid #ddd; padding: 8px; text-align: left;">Address</th>
                    <th style="border: 1px solid #ddd; padding: 8px; text-align: left;">Email</th>
                    <th style="border: 1px solid #ddd; padding: 8px; text-align: left;">Timestamp</th>
                </tr>
        """

        for record in data[:10]:  # Show only first 10 records
            html += f"""
                <tr>
                    <td style="border: 1px solid #ddd; padding: 8px;">{record.get('Name', 'N/A')}</td>
                    <td style="border: 1px solid #ddd; padding: 8px; max-width: 200px; overflow: hidden;">{record.get('Address', 'N/A')}</td>
                    <td style="border: 1px solid #ddd; padding: 8px;">{record.get('Email', 'N/A')}</td>
                    <td style="border: 1px solid #ddd; padding: 8px; font-size: 12px;">{record.get('created_timestamp', record.get('ingestion_timestamp', 'N/A'))}</td>
                </tr>
            """

        html += "</table></div>"
        display(HTML(html))

    def run_pipeline_iteration(self):
        """Run a single iteration of the data pipeline"""
        try:
            self.iteration_count += 1
            print(f"\n{'='*50}")
            print(f"üöÄ Starting pipeline iteration #{self.iteration_count} at {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
            print(f"{'='*50}")

            # Ensure Delta table exists
            self.create_delta_table_if_not_exists()

            # Generate and append new data
            num_records = self.config.get('records_per_iteration', 5)
            new_data = self.generate_fake_data(num_records)

            # Display newly generated data
            self.display_data_html(new_data, "üÜï Newly Generated Data")

            summary = self.append_data_to_delta_table(new_data)

            # Get version information
            version_info = self.get_table_version_info()

            # Display current table contents (limited)
            latest_contents = self.get_latest_table_contents(limit=10)
            self.display_data_html(latest_contents, f"üìä Latest Table Contents (Top 10) - Version {version_info['current_version']}")

            # Display version history
            self.display_version_info(version_info)

            summary_html = f"""
            <div style="background-color:#f0f8ff;padding:15px;border-radius:8px;">
              <h3 style="color:#1f77b4;">‚úÖ Delta Pipeline Summary</h3>
              <p><strong>Records Appended:</strong> {summary['records_appended']}</p>
              <p><strong>Table Version:</strong> {version_info['current_version']}</p>
              <p><strong>Time:</strong> {summary['timestamp']}</p>
            </div>
            """

            if summary['appended_data']:
              df_summary = pd.DataFrame(summary['appended_data'][:5])
              summary_html += df_summary.to_html(index=False, border=0)
              # Send email notification with summary
              self.send_email_notification(summary, version_info, latest_contents)

            print("‚úÖ Pipeline iteration completed successfully")
            return summary, version_info

        except Exception as e:
            print(f"‚ùå Pipeline iteration failed: {str(e)}")
            raise

    def display_version_info(self, version_info):
        """Display version information in HTML format"""
        html = f"""
        <div style="margin: 15px 0; padding: 10px; background-color: #f8f9fa; border-radius: 5px;">
            <h3 style="color: #28a745; margin-top: 0;">üîÑ Version Information</h3>
            <p><strong>Current Version:</strong> {version_info['current_version']}</p>
            <p><strong>Total Versions:</strong> {version_info['total_versions']}</p>
            <p><strong>Latest Operation:</strong> {version_info['latest_operation']}</p>
            <p><strong>Latest Timestamp:</strong> {version_info['latest_timestamp']}</p>
        </div>
        """
        display(HTML(html))

    def start_scheduled_pipeline(self, duration_minutes=10):
        """Start scheduled pipeline for a specific duration (Colab-friendly)"""
        self.running = True
        interval_minutes = self.config.get('schedule_interval_minutes', 2)

        print(f"üîÑ Starting scheduled pipeline for {duration_minutes} minutes")
        print(f"‚è∞ Running every {interval_minutes} minutes")
        print("‚ö†Ô∏è Note: This will run for the specified duration, then stop automatically")

        def run_scheduled():
            start_time = time.time()
            end_time = start_time + (duration_minutes * 60)


            while self.running and time.time() < end_time:
                try:
                    self.iteration_count += 1
                    print(f"\nüîÑ Scheduled Run #{self.iteration_count}")
                    self.run_pipeline_iteration() # Removed extra self

                    # Wait for next iteration
                    time.sleep(interval_minutes * 60)

                except Exception as e:
                    print(f"‚ùå Error in scheduled run: {str(e)}")
                    break

            print(f"\nüõë Scheduled pipeline completed after {duration_minutes} minutes")
            self.running = False

        # Run initial iteration
        print("\nüéØ Running initial iteration...")
        self.run_pipeline_iteration() # Removed extra self

        # Start scheduled runs in background
        thread = threading.Thread(target=run_scheduled)
        thread.daemon = True
        thread.start()

        return thread

    def stop_pipeline(self):
        """Stop the scheduled pipeline"""
        self.running = False
        print("üõë Pipeline stop requested")

    def cleanup(self):
        """Cleanup Spark session"""
        if self.spark:
            self.spark.stop()
            print("üßπ Spark session stopped")

pipeline = ColabDeltaDataPipeline(config)
print("üéâ Pipeline initialized successfully!")

‚úÖ Spark session initialized successfully
üéâ Pipeline initialized successfully!


In [None]:

# Test with a single iteration first
print("üß™ Testing with single iteration...")
try:
    summary, version_info = pipeline.run_pipeline_iteration()
    print("\n‚úÖ Single iteration test completed successfully!")
except Exception as e:
    print(f"‚ùå Test failed: {str(e)}")

üß™ Testing with single iteration...

üöÄ Starting pipeline iteration #1 at 2025-06-15 14:26:35
üìä Generated 1 fake records
‚úÖ Delta table created at /content/delta-table/customer_data
üìä Generated 5 fake records


Name,Address,Email,Timestamp
April Smith,"450 Sandra Street, Kingberg, CT 84146",holmestammy@example.net,2025-06-15T14:27:14.884155+00:00
Sarah Meyer,"2525 Delgado Courts, South Samantha, GA 07481",edward08@example.org,2025-06-15T14:27:14.885005+00:00
Sarah Mcknight,"83273 Kimberly Coves, Gonzalezville, FL 78747",adambradley@example.com,2025-06-15T14:27:14.885971+00:00
Melinda Green,"2569 Edwards Neck, Marissaview, OH 86338",alison84@example.org,2025-06-15T14:27:14.886765+00:00
Bryan Whitney,"356 Cynthia Pine, Cooperside, MP 76630",bethany87@example.net,2025-06-15T14:27:14.887588+00:00


‚úÖ Appended 5 records to Delta table. New version: 1
üîÑ Current Delta table version: 1
üìã Retrieved 6 records from Delta table


Name,Address,Email,Timestamp
April Smith,"450 Sandra Street, Kingberg, CT 84146",holmestammy@example.net,2025-06-15T14:27:14.884155+00:00
Sarah Mcknight,"83273 Kimberly Coves, Gonzalezville, FL 78747",adambradley@example.com,2025-06-15T14:27:14.885971+00:00
Sarah Meyer,"2525 Delgado Courts, South Samantha, GA 07481",edward08@example.org,2025-06-15T14:27:14.885005+00:00
Melinda Green,"2569 Edwards Neck, Marissaview, OH 86338",alison84@example.org,2025-06-15T14:27:14.886765+00:00
Bryan Whitney,"356 Cynthia Pine, Cooperside, MP 76630",bethany87@example.net,2025-06-15T14:27:14.887588+00:00
Brenda Buckley,"078 Bonnie Mills, Katherinechester, PA 41227",wrightsarah@example.com,2025-06-15T14:26:35.016057+00:00


üìß Email notification sent successfully to 1 recipients
‚úÖ Pipeline iteration completed successfully

‚úÖ Single iteration test completed successfully!


In [None]:
try:
    print("üìä Current table contents:")
    all_contents = pipeline.get_latest_table_contents(limit=20)
    pipeline.display_data_html(all_contents, f"üìã All Table Contents ({len(all_contents)} records)")

    # Show version information
    version_info = pipeline.get_table_version_info()
    pipeline.display_version_info(version_info)

except Exception as e:
    print(f"‚ùå Error displaying contents: {str(e)}")

üìä Current table contents:
üìã Retrieved 6 records from Delta table


Name,Address,Email,Timestamp
April Smith,"450 Sandra Street, Kingberg, CT 84146",holmestammy@example.net,2025-06-15T14:27:14.884155+00:00
Sarah Mcknight,"83273 Kimberly Coves, Gonzalezville, FL 78747",adambradley@example.com,2025-06-15T14:27:14.885971+00:00
Sarah Meyer,"2525 Delgado Courts, South Samantha, GA 07481",edward08@example.org,2025-06-15T14:27:14.885005+00:00
Melinda Green,"2569 Edwards Neck, Marissaview, OH 86338",alison84@example.org,2025-06-15T14:27:14.886765+00:00
Bryan Whitney,"356 Cynthia Pine, Cooperside, MP 76630",bethany87@example.net,2025-06-15T14:27:14.887588+00:00
Brenda Buckley,"078 Bonnie Mills, Katherinechester, PA 41227",wrightsarah@example.com,2025-06-15T14:26:35.016057+00:00


üîÑ Current Delta table version: 1


In [None]:
# Start scheduled pipeline for 10 minutes (adjust as needed)
duration = 10  # minutes
print(f"‚ö†Ô∏è Starting scheduled pipeline for {duration} minutes...")
print("üí° You can stop it early by running: pipeline.stop_pipeline()")

thread = pipeline.start_scheduled_pipeline(duration_minutes=duration)

# Optional: Wait for completion (blocks the cell)
# thread.join()

‚ö†Ô∏è Starting scheduled pipeline for 10 minutes...
üí° You can stop it early by running: pipeline.stop_pipeline()
üîÑ Starting scheduled pipeline for 10 minutes
‚è∞ Running every 2 minutes
‚ö†Ô∏è Note: This will run for the specified duration, then stop automatically

üéØ Running initial iteration...

üöÄ Starting pipeline iteration #2 at 2025-06-15 14:27:27
‚ÑπÔ∏è Delta table already exists
üìä Generated 5 fake records


Name,Address,Email,Timestamp
Matthew Schaefer,"7690 Nixon Glens Suite 304, New Michaelview, MP 32684",vwillis@example.org,2025-06-15T14:27:27.915281+00:00
Deanna Thomas,"51634 Stewart Overpass Suite 626, Campbelltown, MS 64702",wernerjason@example.net,2025-06-15T14:27:27.916542+00:00
Michael Taylor,"99665 Amy Ridges, Theresafurt, VA 28698",denise10@example.net,2025-06-15T14:27:27.917456+00:00
Kimberly Mccarthy,"2761 Reeves Shores, East Danielle, TN 65831",lejennifer@example.com,2025-06-15T14:27:27.918541+00:00
Katelyn Mcneil,"032 Mary Ridges Apt. 306, Wallfurt, MH 87398",hicksalexander@example.com,2025-06-15T14:27:27.919634+00:00


‚úÖ Appended 5 records to Delta table. New version: 2
üîÑ Current Delta table version: 2
üìã Retrieved 10 records from Delta table


Name,Address,Email,Timestamp
Matthew Schaefer,"7690 Nixon Glens Suite 304, New Michaelview, MP 32684",vwillis@example.org,2025-06-15T14:27:27.915281+00:00
Michael Taylor,"99665 Amy Ridges, Theresafurt, VA 28698",denise10@example.net,2025-06-15T14:27:27.917456+00:00
Deanna Thomas,"51634 Stewart Overpass Suite 626, Campbelltown, MS 64702",wernerjason@example.net,2025-06-15T14:27:27.916542+00:00
Kimberly Mccarthy,"2761 Reeves Shores, East Danielle, TN 65831",lejennifer@example.com,2025-06-15T14:27:27.918541+00:00
Katelyn Mcneil,"032 Mary Ridges Apt. 306, Wallfurt, MH 87398",hicksalexander@example.com,2025-06-15T14:27:27.919634+00:00
April Smith,"450 Sandra Street, Kingberg, CT 84146",holmestammy@example.net,2025-06-15T14:27:14.884155+00:00
Sarah Mcknight,"83273 Kimberly Coves, Gonzalezville, FL 78747",adambradley@example.com,2025-06-15T14:27:14.885971+00:00
Sarah Meyer,"2525 Delgado Courts, South Samantha, GA 07481",edward08@example.org,2025-06-15T14:27:14.885005+00:00
Melinda Green,"2569 Edwards Neck, Marissaview, OH 86338",alison84@example.org,2025-06-15T14:27:14.886765+00:00
Bryan Whitney,"356 Cynthia Pine, Cooperside, MP 76630",bethany87@example.net,2025-06-15T14:27:14.887588+00:00


üìß Email notification sent successfully to 1 recipients
‚úÖ Pipeline iteration completed successfully

üîÑ Scheduled Run #3

üöÄ Starting pipeline iteration #4 at 2025-06-15 14:27:36
‚ÑπÔ∏è Delta table already exists
üìä Generated 5 fake records


Name,Address,Email,Timestamp
Katherine Jordan,"898 Green Forest Suite 460, Tylermouth, NV 82378",amyhughes@example.net,2025-06-15T14:27:36.238036+00:00
Christy Blankenship,"710 Brent Walk Suite 973, North Johnhaven, ID 43725",jennifer32@example.net,2025-06-15T14:27:36.238865+00:00
Carrie Ritter,"9689 Yoder Underpass Suite 947, North Franciscoburgh, AL 79238",aprilcardenas@example.org,2025-06-15T14:27:36.239897+00:00
James Stone,"390 Rivers Shores Apt. 027, Kevinland, NH 37537",lewisshirley@example.net,2025-06-15T14:27:36.240851+00:00
Ann Collier,"Unit 3614 Box 0718, DPO AA 37982",simmonsandrea@example.org,2025-06-15T14:27:36.241486+00:00


In [None]:
print("üéõÔ∏è Manual Controls:")
print("‚Ä¢ To run another iteration: pipeline.run_pipeline_iteration()")
print("‚Ä¢ To stop scheduled pipeline: pipeline.stop_pipeline()")
print("‚Ä¢ To cleanup: pipeline.cleanup()")
print("‚Ä¢ To view table: pipeline.get_latest_table_contents()")

# Cleanup function (run when done)
def cleanup_pipeline():
    pipeline.cleanup()
    print("üßπ Cleanup completed")

üéõÔ∏è Manual Controls:
‚Ä¢ To run another iteration: pipeline.run_pipeline_iteration()
‚Ä¢ To stop scheduled pipeline: pipeline.stop_pipeline()
‚Ä¢ To cleanup: pipeline.cleanup()
‚Ä¢ To view table: pipeline.get_latest_table_contents()


In [None]:
try:
    # Get all data
    all_data = pipeline.get_latest_table_contents()

    if all_data:
        # Convert to pandas for analysis
        df_pandas = pd.DataFrame(all_data)

        print("üìà Quick Data Analysis:")
        print(f"Total records: {len(df_pandas)}")

        if 'ingestion_timestamp' in df_pandas.columns:
            df_pandas['ingestion_timestamp'] = pd.to_datetime(df_pandas['ingestion_timestamp'])
            print(f"Date range: {df_pandas['ingestion_timestamp'].min()} to {df_pandas['ingestion_timestamp'].max()}")

        # Display basic info
        display(HTML(f"""
        <div style="background-color: #e8f5e8; padding: 15px; border-radius: 5px; margin: 10px 0;">
            <h3 style="color: #2d5a2d; margin-top: 0;">üìä Data Summary</h3>
            <p><strong>Total Records:</strong> {len(df_pandas)}</p>
            <p><strong>Unique Names:</strong> {df_pandas['Name'].nunique() if 'Name' in df_pandas.columns else 'N/A'}</p>
            <p><strong>Unique Emails:</strong> {df_pandas['Email'].nunique() if 'Email' in df_pandas.columns else 'N/A'}</p>
        </div>
        """))

        # Show sample data
        if len(df_pandas) > 0:
            print("üìã Sample Data (Pandas DataFrame):")
            display(df_pandas.head())

except Exception as e:
    print(f"‚ùå Error in analysis: {str(e)}")

print("\nüéâ Colab Delta Pipeline Setup Complete!")
print("üí° Run cells 6-8 to operate the pipeline")
print("üõë Don't forget to run cleanup when done!")

# ============================
# Additional Colab-specific utilities
# ============================

def show_delta_table_files():
    """Show the actual Delta table files created"""
    try:
        import glob
        files = glob.glob(f"{config['delta_table_path']}/**/*", recursive=True)
        print("üìÅ Delta Table Files:")
        for file in files[:10]:  # Show first 10 files
            print(f"  {file}")
        if len(files) > 10:
            print(f"  ... and {len(files) - 10} more files")
    except Exception as e:
        print(f"‚ùå Error listing files: {str(e)}")


üìã Retrieved 11 records from Delta table
üìà Quick Data Analysis:
Total records: 11
Date range: 2025-06-15 14:26:49.262000 to 2025-06-15 14:27:28.036000


üìã Sample Data (Pandas DataFrame):


Unnamed: 0,Address,Email,Name,created_timestamp,ingestion_timestamp
0,"99665 Amy Ridges, Theresafurt, VA 28698",denise10@example.net,Michael Taylor,2025-06-15T14:27:27.917456+00:00,2025-06-15 14:27:28.036
1,"2761 Reeves Shores, East Danielle, TN 65831",lejennifer@example.com,Kimberly Mccarthy,2025-06-15T14:27:27.918541+00:00,2025-06-15 14:27:28.036
2,"032 Mary Ridges Apt. 306, Wallfurt, MH 87398",hicksalexander@example.com,Katelyn Mcneil,2025-06-15T14:27:27.919634+00:00,2025-06-15 14:27:28.036
3,"7690 Nixon Glens Suite 304, New Michaelview, M...",vwillis@example.org,Matthew Schaefer,2025-06-15T14:27:27.915281+00:00,2025-06-15 14:27:28.036
4,"51634 Stewart Overpass Suite 626, Campbelltown...",wernerjason@example.net,Deanna Thomas,2025-06-15T14:27:27.916542+00:00,2025-06-15 14:27:28.036



üéâ Colab Delta Pipeline Setup Complete!
üí° Run cells 6-8 to operate the pipeline
üõë Don't forget to run cleanup when done!
