<a href="https://colab.research.google.com/github/larry-tableau/tableau/blob/main/Read_from_BQ_into_Hyper_via_Pantab_v3_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

This script provides a highly configurable and robust way to extract data from Google BigQuery and save it in a specified output format (`hyper`, `parquet`, or `csv`). Below is the detailed documentation for running this script, its parameters, and configuration:

---

#### **1. Setup and Prerequisites**

Before running the script:
1. **Python Environment**: Ensure you have Python 3.8+ installed with all required libraries.
2. **Dependencies**: Install the required Python libraries:
   ```bash
   pip install google-cloud-bigquery google-cloud-bigquery-storage pandas pandas-gbq pyarrow pantab psutil
   ```
3. **Authentication**: Authenticate with Google Cloud in Colab or your local environment:
   - In Colab:
     ```python
     from google.colab import auth
     auth.authenticate_user()
     ```
   - Locally:
     Set up authentication by downloading a service account key from your Google Cloud project and exporting the `GOOGLE_APPLICATION_CREDENTIALS` environment variable:
     ```bash
     export GOOGLE_APPLICATION_CREDENTIALS="/path/to/your-key.json"
     ```

---

#### **2. Configuration Parameters**

The script uses a `BigQueryConfig` class to define configuration parameters. Here's a detailed breakdown:

| **Parameter**          | **Type**           | **Description**                                                                                          | **Default**               |
|-------------------------|--------------------|----------------------------------------------------------------------------------------------------------|---------------------------|
| `project_id`           | `str`             | Your Google Cloud project ID where BigQuery is configured.                                               | N/A                       |
| `source_project`       | `str`             | Source project ID containing the dataset.                                                                | N/A                       |
| `dataset_id`           | `str`             | The dataset ID containing the table to extract data from.                                                | N/A                       |
| `table_id`             | `str`             | The table ID from which to extract data.                                                                 | N/A                       |
| `output_format`        | `str`             | Output format: `hyper`, `parquet`, or `csv`.                                                             | `'hyper'`                 |
| `output_path`          | `str`             | Path to save the output files.                                                                           | `'./data'`                |
| `max_bytes_billed`     | `int`             | Maximum bytes allowed for billing (e.g., 100GB).                                                         | `100GB`                   |
| `initial_chunk_size`   | `int`             | Initial chunk size for data extraction in rows.                                                          | `500,000`                 |
| `max_workers`          | `int`             | Number of workers for parallel data processing.                                                          | `4`                       |
| `chunk_size`           | `int` (Optional)  | Override for dynamically adjusted chunk size.                                                            | `None`                    |
| `columns`              | `List[str]`       | List of columns to extract. If `None`, all columns are fetched.                                           | `None`                    |
| `where_clause`         | `str` (Optional)  | SQL WHERE clause to filter data.                                                                         | `None`                    |
| `max_rows`             | `int` (Optional)  | Maximum number of rows to extract.                                                                       | `None`                    |
| `hyper_batch_size`     | `int`             | Batch size for writing to Tableau Hyper files.                                                           | `100,000`                 |
| `max_memory_gb`        | `float`           | Maximum memory in GB to allocate for extraction.                                                         | `0.8`                     |
| `clean_up_temp_files`  | `bool`            | Whether to delete temporary files after merging.                                                         | `True`                    |

---

#### **3. Execution Steps**

1. **Define Configuration**: Modify the `BigQueryConfig` object with your specific parameters:
   ```python
   config = BigQueryConfig(
       project_id='your-project-id',
       source_project='source-project-id',
       dataset_id='dataset-id',
       table_id='table-id',
       output_format='parquet',
       output_path='./data',
       max_bytes_billed=100 * 1024 * 1024 * 1024,  # 100GB
       initial_chunk_size=200_000,
       max_workers=4,
       clean_up_temp_files=True
   )
   ```

2. **Run the Script**: Execute the extraction using:
   ```python
   total_rows, final_file = extract_bigquery_data(config)
   print(f"Total Rows: {total_rows}")
   print(f"Output File: {final_file}")
   ```

3. **Logging**: Check the logs for progress and errors. Logs are dynamically displayed in the notebook environment.

---

#### **4. Key Functions**

| **Function**                         | **Description**                                                                                         |
|--------------------------------------|---------------------------------------------------------------------------------------------------------|
| `extract_bigquery_data(config)`      | Main entry point. Extracts data from BigQuery and saves it in the desired format.                       |
| `_build_query(offset)`               | Builds paginated SQL queries for chunked data extraction.                                              |
| `_fetch_and_save_chunk(offset, ...)` | Fetches a data chunk and saves it to disk in the configured format.                                     |
| `_merge_to_final_format()`           | Merges all chunks into a single file based on the specified output format.                              |
| `_get_optimal_stream_config()`       | Dynamically calculates streaming and memory configurations based on available resources.                |

---

#### **5. Parameters Available via GitHub Artefacts**

If your environment supports GitHub integration:
1. Clone the repository:
   ```bash
   git clone <repository-url>
   cd <repository-directory>
   ```
2. Run the script locally or modify it as needed.

---

#### **6. Notes and Best Practices**

- **Chunk Size**: Adjust `chunk_size` for low-memory environments to prevent memory overruns.
- **Output Format**: Use `hyper` format for Tableau, `parquet` for efficient storage, or `csv` for compatibility.
- **Error Handling**: The script includes robust retry mechanisms with exponential backoff.
- **Parallel Processing**: Increase `max_workers` for faster extraction, but ensure sufficient CPU and memory availability.

---

#### **7. Example Use Case**

Extracting GitHub timeline data into a Tableau `.hyper` file:
```python
config = BigQueryConfig(
    project_id='my-project-id',
    source_project='bigquery-public-data',
    dataset_id='samples',
    table_id='github_timeline',
    output_format='hyper',
    output_path='/content/data',
    max_bytes_billed=10 * 1024 * 1024 * 1024,  # 10GB
    chunk_size=200_000,
    max_workers=4
)
total_rows, final_file = extract_bigquery_data(config)
```

Output:
```
Total Rows: 500,000
Output File: /content/data/github_timeline_complete_<timestamp>.hyper
```

---

By following this documentation, you can confidently configure and execute the script for efficient data extraction from Google BigQuery to your desired format. Let me know if you need further assistance!

In [None]:
%%capture
!pip install google-auth google-auth-oauthlib google-auth-httplib2 google-cloud-bigquery \
    pandas pantab psutil pyarrow pandas-gbq google-cloud-core google-cloud-storage \
    google-api-core google-auth-httplib2 google-api-python-client tableauhyperapi

In [None]:
# Import required libraries
import os
import gc
import logging
import time
from datetime import datetime
from google.colab import auth
from google.cloud import bigquery
import pandas as pd
import pandas_gbq
import numpy as np
from typing import List, Optional, Tuple, Dict, Any
from dataclasses import dataclass
from IPython.display import clear_output, display, HTML
from concurrent.futures import ThreadPoolExecutor, as_completed
import psutil
import pantab
import json
import pyarrow as pa
import pyarrow.parquet as pq
from pathlib import Path
from google.cloud import bigquery_storage

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - [%(filename)s:%(lineno)d] - %(message)s'
)
logger = logging.getLogger(__name__)

class CoLabLogHandler(logging.Handler):
    """Custom log handler to store logs in memory for display."""
    def __init__(self, max_lines=1000):
        super().__init__()
        self.log_buffer = []
        self.max_lines = max_lines

    def emit(self, record):
        """Emit a log record."""
        log_entry = self.format(record)
        self.log_buffer.append(log_entry)
        if len(self.log_buffer) > self.max_lines:
            self.log_buffer = self.log_buffer[-self.max_lines:]

        # Immediately update log display
        self._update_display()

    def get_logs(self):
        """Get all stored logs."""
        return '\n'.join(self.log_buffer)

    def _update_display(self):
        """Update log display in the notebook."""
        clear_output(wait=True)
        logs_html = "<br>".join(self.log_buffer[-20:])  # Display only the last 20 logs for readability
        display(HTML(f"""
        <div style="font-family:monospace; font-size:12px; background-color:#f8f9fa;
                    border:1px solid #ddd; padding:10px; max-height:200px; overflow-y:auto;">
            {logs_html}
        </div>
        """))


@dataclass
class BigQueryConfig:
    """Configuration for BigQuery extraction."""
    project_id: str
    source_project: str
    dataset_id: str
    table_id: str
    output_format: str = 'hyper'
    output_path: str = './data'
    max_bytes_billed: int = 100 * 1024 * 1024 * 1024  # 100GB
    initial_chunk_size: int = 500_000
    max_workers: int = 4
    chunk_size: Optional[int] = None
    columns: Optional[List[str]] = None
    where_clause: Optional[str] = None
    clean_up_temp_files: bool = True
    max_rows: Optional[int] = None
    hyper_batch_size: int = 100000  # Add this here with other defaults
    max_memory_gb: float = 0.8  # And this here

    def __post_init__(self):
        """Validate configuration parameters."""
        if self.output_format not in ['hyper', 'parquet', 'csv']:
            raise ValueError("output_format must be one of: hyper, parquet, csv")
        if self.initial_chunk_size <= 0:
            raise ValueError("chunk_size must be positive")
        if self.max_workers <= 0:
            raise ValueError("max_workers must be positive")
        if self.max_rows is not None and self.max_rows <= 0:
            raise ValueError("max_rows must be positive if specified")
        self.output_path = str(Path(self.output_path).resolve())

class BigQueryExtractor:
    """Enhanced BigQuery data extraction utility."""

    def __init__(self, config: BigQueryConfig):
        """Initialize extractor with configuration."""
        self.config = config

        # Initialize chunk_size early to avoid access issues
        self.chunk_size = config.initial_chunk_size

        self._ensure_output_directory()

        # Initialize BigQuery client first
        self.client = self._initialize_client()

        # Fetch schema information
        self.schema = self._get_schema()

        # Initialize BigQuery Storage client
        self.bq_storage_client = bigquery_storage.BigQueryReadClient()

        # Setup logging
        self.log_handler = CoLabLogHandler(max_lines=100)
        self.log_handler.setFormatter(
            logging.Formatter('%(asctime)s - %(levelname)s - %(message)s', datefmt='%Y-%m-%d %H:%M:%S')
        )
        logger.addHandler(self.log_handler)

        logger.info(f"Adjusted chunk size: {self.chunk_size} rows")
        logger.info("BigQuery connection initialized. Fetching schema information completed.")

        # Initialize other attributes
        self.total_rows = 0
        self.processed_chunks = 0
        self.failed_chunks = []
        self.start_time = time.time()
        self._setup_progress_display()

    def _preprocess_dataframe(self, df: pd.DataFrame, column_types: Dict[str, str]) -> pd.DataFrame:
        """Preprocess the DataFrame to handle types and NaNs efficiently."""
        if df.empty:
            return df

        for column in df.columns:
            if df[column].isna().all():  # If column is entirely NaN
                if column_types.get(column, '').upper() in ['INTEGER', 'FLOAT']:
                    df[column] = 0  # Default to 0 for numeric types
                else:
                    df[column] = ''  # Default to empty string for non-numeric types
            elif pd.api.types.is_object_dtype(df[column]):  # Object type columns
                df[column] = df[column].astype(str).fillna("")  # Convert to string and fill NaNs
            elif pd.api.types.is_float_dtype(df[column]):  # Float columns with NaNs
                df[column] = df[column].fillna(0.0)
            elif pd.api.types.is_integer_dtype(df[column]):  # Integer columns with NaNs
                df[column] = df[column].fillna(0)
            elif column_types.get(column, '').upper() in ['DATE', 'TIMESTAMP']:  # Ensure datetime consistency
                df[column] = pd.to_datetime(df[column], errors='coerce')

        return df


    def preprocess_dataframe(df: pd.DataFrame, schema: List[bigquery.SchemaField]) -> pd.DataFrame:
        """Preprocess the DataFrame to handle types and NaNs efficiently."""
        type_mapping = {
            'STRING': str,
            'BYTES': str,
            'INTEGER': 'Int',
            'INT64': 'Int64',
            'FLOAT': 'float',
            'FLOAT64': 'float64',
            'NUMERIC': 'float64',
            'BIGNUMERIC': 'float64',
            'BOOLEAN': 'boolean',
            'BOOL': 'boolean',
            'DATE': 'datetime64[ns]',
            'DATETIME': 'datetime64[ns]',
            'TIME': str,
            'TIMESTAMP': 'datetime64[ns]',
            'RECORD': str,
            'STRUCT': str,
            'ARRAY': str,
            'GEOGRAPHY': str
        }

        for column in df.columns:
            if df[column].isna().all():  # If the column is entirely NaN
                if schema[column] in ['INTEGER', 'FLOAT']:
                    df[column] = 0  # Default to 0 for numeric types
                else:
                    df[column] = ''  # Default to empty string for non-numeric types
            elif pd.api.types.is_object_dtype(df[column]):  # Object type columns
                df[column] = df[column].astype(str).fillna("")  # Convert to string and fill
            elif pd.api.types.is_float_dtype(df[column]):  # Float columns with NaNs
                df[column] = df[column].fillna(0)
            elif pd.api.types.is_integer_dtype(df[column]):  # Integer columns with NaNs
                df[column] = df[column].fillna(0)
            elif schema[column] in ['DATE', 'TIMESTAMP']:  # Ensure datetime consistency
                df[column] = pd.to_datetime(df[column], errors='coerce')

        return df

    def _ensure_output_directory(self):
        """Create output directory if it doesn't exist."""
        os.makedirs(self.config.output_path, exist_ok=True)

    def _initialize_client(self) -> bigquery.Client:
        """Initialize BigQuery client with authentication."""
        try:
            auth.authenticate_user()
            return bigquery.Client(project=self.config.project_id)
        except Exception as e:
            logger.error(f"Failed to initialize BigQuery client: {str(e)}")
            raise

    def _get_schema(self) -> List[bigquery.SchemaField]:
        """Get table schema information."""
        try:
            dataset_ref = self.client.dataset(self.config.dataset_id,
                                            project=self.config.source_project)
            table_ref = dataset_ref.table(self.config.table_id)
            return self.client.get_table(table_ref).schema
        except Exception as e:
            logger.error(f"Failed to get schema: {str(e)}")
            raise

    def _get_bq_type_mapping(self) -> Dict[str, Any]:
        """Get mapping of BigQuery data types to Python/Pandas types."""
        return {
            'STRING': str,
            'BYTES': str,
            'INTEGER': 'Int64',
            'INT64': 'Int64',
            'FLOAT': 'float64',
            'FLOAT64': 'float64',
            'NUMERIC': 'float64',
            'BIGNUMERIC': 'float64',
            'BOOLEAN': 'boolean',
            'BOOL': 'boolean',
            'DATE': 'datetime64[ns]',
            'DATETIME': 'datetime64[ns]',
            'TIME': str,
            'TIMESTAMP': 'datetime64[ns]',
            'RECORD': str,
            'STRUCT': str,
            'ARRAY': str,
            'GEOGRAPHY': str
        }

    def _setup_progress_display(self):
        """Initialize progress display styling."""
        display(HTML("""
        <style>
            .bq-progress {
                font-family: monospace;
                padding: 10px;
                border: 1px solid #ccc;
                border-radius: 4px;
                margin: 10px 0;
                background-color: #f8f9fa;
            }
            .progress-bar {
                color: #fff;
                background-color: #28a745;
                height: 20px;
                border-radius: 3px;
                transition: width 0.3s ease;
                text-align: center;
                line-height: 20px;
            }
            .log-container {
                font-family: monospace;
                padding: 10px;
                border: 1px solid #ddd;
                border-radius: 4px;
                margin: 10px 0;
                background-color: #f8f9fa;
                max-height: 200px;
                overflow-y: auto;
                white-space: pre-wrap;
                font-size: 12px;
            }
            .log-entry {
                margin: 2px 0;
            }
            .log-error { color: #dc3545; }
            .log-warning { color: #ffc107; }
            .log-info { color: #17a2b8; }
        </style>
        """))

    @staticmethod
    def adjust_chunk_size(schema: List[bigquery.SchemaField], available_memory: int) -> int:
        """
        Dynamically adjust chunk size based on memory and schema details.
        Adds a cap to avoid overly large chunk sizes.
        """
        # Estimated memory usage per data type in bytes
        type_memory_footprint = {
            'STRING': 100,        # Average string size
            'BYTES': 50,          # Binary data
            'INTEGER': 8,         # 64-bit integers
            'INT64': 8,           # 64-bit integers
            'FLOAT': 8,           # 64-bit floats
            'FLOAT64': 8,         # 64-bit floats
            'NUMERIC': 16,        # Numeric with higher precision
            'BIGNUMERIC': 32,     # Big numeric
            'BOOLEAN': 1,         # Boolean
            'DATE': 4,            # Dates
            'DATETIME': 8,        # Datetime objects
            'TIMESTAMP': 8,       # Timestamp
            'RECORD': 200,        # Nested structure (average)
            'STRUCT': 200,        # Nested structure (average)
            'ARRAY': 150,         # Array (average per element)
            'GEOGRAPHY': 1000     # Geography data
        }

        # Calculate total memory usage per row based on schema
        total_memory_per_row = sum(
            type_memory_footprint.get(field.field_type, 50) for field in schema
        )

        # Account for overhead and concurrency
        row_memory_with_overhead = total_memory_per_row * 1.1  # Add 10% buffer

        # Calculate max rows based on available memory
        max_rows = available_memory // row_memory_with_overhead

        # Cap chunk size to avoid excessive memory usage
        capped_chunk_size = min(max_rows, 1_000_000)  # Cap at 100,000 rows

        # Log intermediate values for debugging
        logger.info(f"Available memory: {available_memory} bytes")
        logger.info(f"Estimated memory per row: {row_memory_with_overhead:.2f} bytes")
        logger.info(f"Calculated chunk size: {max_rows} rows")
        logger.info(f"Capped chunk size: {capped_chunk_size} rows")

        # Ensure a minimum chunk size to avoid inefficient queries
        # return max(100_000, int(capped_chunk_size)) #10_000
        return max(50_000, min(500_000, max_rows))

    def _count_records(self) -> int:
        """Count total records in table."""
        query = f"""
        SELECT COUNT(*) as total
        FROM `{self.config.source_project}.{self.config.dataset_id}.{self.config.table_id}`
        """
        if self.config.where_clause:
            query += f" WHERE {self.config.where_clause}"

        #df = pandas_gbq.read_gbq(query, project_id=self.config.project_id, use_bqstorage_api=True)
        #total = int(df['total'].iloc[0])
        query_job = self.client.query(query)
        result = query_job.result()  # Wait for the query to complete
        total = next(iter(result)).get('total', 0)

        logger.info(f"Total records to process: {total:,}")
        return total

    def _build_query(self, offset: int) -> str:
        """Build optimized BigQuery query."""
        columns = self.config.columns or [field.name for field in self.schema]

        query = f"""
        SELECT {', '.join(columns)}
        FROM `{self.config.source_project}.{self.config.dataset_id}.{self.config.table_id}`
        """

        if self.config.where_clause:
            query += f" WHERE {self.config.where_clause}"

        # Simple LIMIT/OFFSET without ORDER BY to avoid memory issues
        query += f"""
        LIMIT {self.chunk_size}
        OFFSET {offset}
        """

        return query

    def _process_columns(self, df: pd.DataFrame) -> pd.DataFrame:
        """Process the DataFrame to handle types and NaNs efficiently."""
        if df.empty:
            return df

        # Get schema type mappings for BigQuery table
        column_types = {field.name: field.field_type.upper() for field in self.schema}

        # Preprocess DataFrame using class method
        df = self._preprocess_dataframe(df, column_types)

        return df



    def _save_chunk(self, df: pd.DataFrame, chunk_num: int) -> str:
        """Save data chunk with enhanced type handling."""
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        base_filename = f"{self.config.table_id}_chunk_{chunk_num}_{timestamp}"

        try:
            filename = f"{base_filename}.parquet"
            full_path = f"{self.config.output_path}/{filename}"

            # Convert to Arrow Table and write with compression
            table = pa.Table.from_pandas(df)
            pq.write_table(table, full_path, compression='snappy')  # Snappy compression

            logger.debug(f"Saved chunk {chunk_num} to {full_path}")
            return filename

        except Exception as e:
            logger.error(f"Failed to save chunk: {str(e)}")
            raise


    def _update_progress(self, total_chunks: int, throttle: int = 1):
        """Throttle progress updates to reduce I/O overhead."""
        if self.processed_chunks % throttle != 0:
            return
        clear_output(wait=True)

        elapsed_time = time.time() - self.start_time
        progress = (self.processed_chunks / total_chunks) * 100

        rows_per_second = self.total_rows / elapsed_time if elapsed_time > 0 else 0
        memory_info = psutil.Process(os.getpid()).memory_info()
        memory_usage_mb = memory_info.rss / 1024 / 1024

        progress_html = f"""
        <div style="font-family:monospace; font-size:12px; padding:10px;">
            <div><strong>Progress:</strong> {progress:.1f}%</div>
            <div><strong>Processed Chunks:</strong> {self.processed_chunks}/{total_chunks}</div>
            <div><strong>Rows Processed:</strong> {self.total_rows:,}</div>
            <div><strong>Processing Speed:</strong> {rows_per_second:.1f} rows/sec</div>
            <div><strong>Memory Usage:</strong> {memory_usage_mb:.1f} MB</div>
            <div><strong>Elapsed Time:</strong> {elapsed_time:.1f} seconds</div>
        </div>
        """
        display(HTML(progress_html))


    def _fetch_and_save_chunk(self, offset: int, chunk_num: int) -> Optional[str]:
        """Fetch and save a single chunk with optimized retry logic and memory management."""
        max_retries = 3
        original_chunk_size = self.chunk_size
        min_chunk_size = 50_000  # Minimum chunk size to maintain efficiency

        def get_memory_usage_mb():
            """Get current memory usage in MB."""
            return psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)

        def _adjust_chunk_size(error_type: str, attempt: int) -> int:
            """Adjust chunk size based on error type and retry attempt."""
            reduction_factor = {
                'memory': 2,      # Reduce chunk size more for memory errors
                'timeout': 1.5,   # Moderate reduction for timeout errors
                'quota': 3        # Aggressive reduction for quota issues
            }.get(error_type.lower(), 1.5)  # Default reduction factor

            new_size = max(
                original_chunk_size // (reduction_factor * (attempt + 1)),
                min_chunk_size
            )
            logger.info(f"Adjusted chunk size to {new_size:,} rows due to {error_type} error.")
            return new_size

        def preprocess_dataframe(df: pd.DataFrame, schema: Dict[str, str]) -> pd.DataFrame:
            """Preprocess the DataFrame to handle types and NaNs efficiently."""
            for column in df.columns:
                if pd.api.types.is_object_dtype(df[column]):  # Object type columns
                    df[column] = df[column].astype(str).fillna("")  # Convert to string and fill NaNs
                elif pd.api.types.is_float_dtype(df[column]):  # Float columns with NaNs
                    df[column] = df[column].fillna(0.0)
                elif pd.api.types.is_integer_dtype(df[column]):  # Integer columns with NaNs
                    df[column] = df[column].fillna(0)
                elif schema.get(column, '').upper() in ['DATE', 'TIMESTAMP']:  # Ensure datetime consistency
                    df[column] = pd.to_datetime(df[column], errors='coerce')
            return df

        initial_memory = get_memory_usage_mb()
        last_error = None

        for attempt in range(max_retries):
            try:
                if attempt > 0:
                    # Retry logic: Adjust chunk size for subsequent attempts
                    logger.info(f"Retrying chunk {chunk_num} (Attempt {attempt + 1}/{max_retries})")
                    self.chunk_size = _adjust_chunk_size(str(last_error), attempt)

                # Build and execute query
                query = self._build_query(offset)
                df_chunk = self.client.query(query).to_dataframe()  # Fetch data as a DataFrame
                if df_chunk.empty:
                    logger.warning(f"Chunk {chunk_num} is empty. Skipping.")
                    return None  # Skip saving empty chunks

                # Update total rows processed
                self.total_rows += len(df_chunk)

                # Preprocess and process columns
                column_schema = {field.name: field.field_type for field in self.schema}
                df_chunk = preprocess_dataframe(df_chunk, column_schema)

                # Save the chunk
                saved_file = self._save_chunk(df_chunk, chunk_num)
                logger.info(f"Successfully processed and saved chunk {chunk_num}")

                # **Release memory after saving**
                del df_chunk  # Delete the chunk DataFrame
                gc.collect()  # Trigger garbage collection

                return saved_file

            except Exception as e:
                # Handle errors and determine backoff strategy
                last_error = e
                logger.error(f"Error processing chunk {chunk_num}: {str(e)}")

                if attempt == max_retries - 1:
                    logger.error(f"Max retries reached for chunk {chunk_num}. Marking as failed.")
                    self.failed_chunks.append(chunk_num)
                    return None  # Return failure after max retries

                # Exponential backoff for retries
                backoff_time = min(2 ** attempt, 30)  # Cap backoff at 30 seconds
                logger.info(f"Backing off for {backoff_time} seconds before retrying...")
                time.sleep(backoff_time)

            finally:
                # Clean up memory if needed
                current_memory = get_memory_usage_mb()
                if current_memory > initial_memory * 1.7:  # Memory exceeds 70% of the initial usage
                    logger.warning("High memory usage detected. Triggering garbage collection.")
                    gc.collect()

        # Reset chunk size before exiting
        self.chunk_size = original_chunk_size
        return None





    def _get_optimal_stream_config(self) -> Dict[str, Any]:
        """Calculate optimal streaming configuration based on available resources."""
        try:
            available_memory = psutil.virtual_memory()
            total_memory_gb = available_memory.total / (1024**3)
            available_memory_gb = available_memory.available / (1024**3)

            # Reserve 20% memory for system and overhead
            usable_memory_gb = available_memory_gb * 0.8

            # Calculate optimal stream count and batch size
            optimal_streams = max(1, min(
                int(usable_memory_gb / 1.5),  # 1.5GB per stream
                os.cpu_count() * 2,  # Double CPU count
                8  # Hard cap at 8 streams
            ))

            # For 12GB Colab machine, adjust batch size accordingly
            rows_per_gb = 250000  # Estimated rows per GB
            optimal_batch_size = int((usable_memory_gb / optimal_streams) * rows_per_gb)

            # Cap batch size based on total memory
            max_safe_batch = min(
                optimal_batch_size,
                500000 if total_memory_gb <= 13 else 1000000  # Lower batch size for Colab
            )

            logger.info(f"Memory Configuration:")
            logger.info(f"- Total RAM: {total_memory_gb:.1f}GB")
            logger.info(f"- Available: {available_memory_gb:.1f}GB")
            logger.info(f"- Usable: {usable_memory_gb:.1f}GB")
            logger.info(f"- Streams: {optimal_streams}")
            logger.info(f"- Batch Size: {max_safe_batch:,} rows")

            return {
                'stream_count': optimal_streams,
                'batch_size': max_safe_batch,
                'memory_limit': int(usable_memory_gb * 1024 * 1024 * 1024),  # in bytes
                'total_memory_gb': total_memory_gb,
                'available_memory_gb': available_memory_gb
            }
        except Exception as e:
            logger.error(f"Error calculating stream configuration: {str(e)}")
            # Provide safe default values
            return {
                'stream_count': 2,
                'batch_size': 250000,
                'memory_limit': 8 * 1024 * 1024 * 1024,  # 8GB
                'total_memory_gb': 12,
                'available_memory_gb': 8
            }

    def _merge_to_final_format(self, saved_files: List[str]) -> str:
        """Merge chunks into final output format with improved memory handling."""
        final_filename = f"{self.config.table_id}_complete_{datetime.now().strftime('%Y%m%d_%H%M%S')}"

        try:
            if self.config.output_format == 'hyper':
                final_path = f"{self.config.output_path}/{final_filename}.hyper"
                import pantab

                logger.info("Starting merge to Hyper format...")
                first_write = True

                for file in saved_files:
                    if file:
                        try:
                            # Read parquet file in chunks
                            pq_file = pq.ParquetFile(f"{self.config.output_path}/{file}")
                            num_row_groups = pq_file.num_row_groups

                            for i in range(num_row_groups):
                                df = pq_file.read_row_group(i).to_pandas()

                                if not df.empty and not df.isna().all().all():
                                    # Process columns
                                    df = self._process_columns(df)

                                    # Convert all object columns to string
                                    for col in df.select_dtypes(include=['object']).columns:
                                        df[col] = df[col].astype(str)

                                    # Write to hyper file
                                    if first_write:
                                        # First write creates the file
                                        pantab.frame_to_hyper(
                                            df,
                                            final_path,
                                            table=self.config.table_id,
                                            table_mode='w'  # Write mode for first chunk
                                        )
                                        first_write = False
                                    else:
                                        # Subsequent writes append to the file
                                        pantab.frame_to_hyper(
                                            df,
                                            final_path,
                                            table=self.config.table_id,
                                            table_mode='a'  # Append mode for subsequent chunks
                                        )

                                    # Force cleanup
                                    del df
                                    gc.collect()

                        except Exception as e:
                            logger.error(f"Error processing file {file}: {str(e)}")
                            continue

                logger.info("Completed Hyper file creation")

            elif self.config.output_format == 'csv':
                final_path = f"{self.config.output_path}/{final_filename}.csv"
                first_chunk = True
                for file in saved_files:
                    if file:
                        df = pd.read_parquet(f"{self.config.output_path}/{file}")
                        if not df.empty and not df.isna().all().all():  # Exclude empty or all-NA DataFrames
                            df.to_csv(final_path, mode='w' if first_chunk else 'a',
                                      header=first_chunk, index=False)
                            first_chunk = False

            elif self.config.output_format == 'parquet':
                final_path = f"{self.config.output_path}/{final_filename}.parquet"
                dfs = []
                total_size = 0
                max_batch_size = 1000000

                for file in saved_files:
                    if file:
                        df = pd.read_parquet(f"{self.config.output_path}/{file}")
                        if not df.empty and not df.isna().all().all():  # Exclude empty or all-NA DataFrames
                            dfs.append(df)
                            total_size += len(df)

                            if total_size >= max_batch_size:
                                combined_df = pd.concat(dfs, ignore_index=True)
                                combined_df.to_parquet(final_path)
                                dfs = []
                                total_size = 0

                if dfs:
                    combined_df = pd.concat(dfs, ignore_index=True)
                    combined_df.to_parquet(final_path)

            # Clean up temporary files if requested
            if self.config.clean_up_temp_files:
                for file in saved_files:
                    if file:
                        try:
                            os.remove(f"{self.config.output_path}/{file}")
                            logger.debug(f"Removed temporary file: {file}")
                        except Exception as e:
                            logger.warning(f"Failed to remove temporary file {file}: {str(e)}")

            return f"{final_filename}.{self.config.output_format}"

        except Exception as e:
            logger.error(f"Error merging files: {str(e)}")
            raise


    def extract_data(self) -> Tuple[int, str]:
        """Main extraction method using parallelized chunk-based approach."""
        try:
            total_records = self._count_records()
            logger.info(f"Total records to process: {total_records:,}")
            if total_records == 0:
                logger.warning("No records found to extract")
                return 0, None

            offsets = range(0, total_records, self.chunk_size)
            results = []
            with ThreadPoolExecutor(max_workers=self.config.max_workers) as executor:
                futures = [
                    executor.submit(self._fetch_and_save_chunk, offset, chunk_num)
                    for chunk_num, offset in enumerate(offsets)
                ]
                for future in as_completed(futures):
                    try:
                        result = future.result()
                        if result:
                            results.append(result)
                        self.processed_chunks += 1
                        self._update_progress(len(offsets))  # Update progress dynamically
                    except Exception as e:
                        logger.error(f"Error in chunk processing: {e}")
            gc.collect()

            if not results:
                logger.warning("No data was successfully extracted.")
                return 0, None

            final_path = self._merge_to_final_format(results)
            logger.info(f"Successfully created final file: {final_path}")
            return len(results), final_path
        except Exception as e:
            logger.error(f"Error during extraction: {str(e)}")
            raise


    def _stream_to_parquet(self, output_path: str) -> int:
        """Stream directly to parquet with memory optimization."""
        config = self._get_optimal_stream_config()
        writer = None
        total_rows = 0

        try:
            session = self._create_read_session(config['stream_count'])

            for stream in session.streams:
                reader = self.bq_storage_client.read_rows(stream.name)

                for batch in reader.rows().pages:
                    # Convert RecordBatch to Table
                    record_batch = batch.to_arrow()
                    arrow_table = pa.Table.from_batches([record_batch])  # Convert to Table

                    if writer is None:
                        # Initialize writer with schema from the first Arrow table
                        writer = pq.ParquetWriter(
                            output_path,
                            arrow_table.schema,  # Use schema from the first Arrow table
                            compression='snappy',
                            use_dictionary=True,
                            write_statistics=True
                        )

                    # Write the Arrow Table to Parquet
                    writer.write_table(arrow_table)
                    total_rows += arrow_table.num_rows

                    self._update_progress_streaming(total_rows)

            if writer:
                writer.close()

            return total_rows

        except Exception as e:
            logger.error(f"Error in parquet streaming: {str(e)}")
            if writer:
                writer.close()
            raise

    def _stream_to_hyper(self, output_path: str) -> int:
        """Stream to hyper format with memory optimization and row limit."""
        config = self._get_optimal_stream_config()
        total_rows = 0
        batch_rows = []
        current_size = 0

        try:
            session = self._create_read_session(config['stream_count'])

            for stream in session.streams:
                if self.config.max_rows and total_rows >= self.config.max_rows:
                    break

                reader = self.bq_storage_client.read_rows(stream.name)

                for batch in reader.rows().pages:
                    # Convert ReadRowsPage to Pandas DataFrame
                    df = batch.to_arrow().to_pandas()

                    # Apply row limit if needed
                    if self.config.max_rows:
                        rows_remaining = self.config.max_rows - total_rows
                        if rows_remaining <= 0:
                            break
                        if len(df) > rows_remaining:
                            df = df.iloc[:rows_remaining]

                    df = self._process_columns(df)
                    batch_rows.append(df)
                    current_size += len(df)

                    if current_size >= config['batch_size']:
                        try:
                            # Filter out empty DataFrames before concatenation
                            combined_df = pd.concat([df for df in batch_rows if not df.empty], ignore_index=True)
                            pantab.frame_to_hyper(
                                combined_df,
                                output_path,
                                table=self.config.table_id,
                                exists_ok=True  # Append rows if table exists
                            )
                            total_rows += current_size
                            batch_rows = []
                            current_size = 0
                            self._update_progress_streaming(total_rows)

                            combined_df = None
                            gc.collect()

                        except Exception as e:
                            logger.error(f"Error writing batch: {str(e)}")
                            raise

            # Process remaining rows
            if batch_rows:
                try:
                    combined_df = pd.concat([df for df in batch_rows if not df.empty], ignore_index=True)
                    pantab.frame_to_hyper(
                        combined_df,
                        output_path,
                        table=self.config.table_id,
                        exists_ok=True
                    )
                    total_rows += current_size
                except Exception as e:
                    logger.error(f"Error writing final batch: {str(e)}")
                    raise

            logger.info(f"Successfully processed {total_rows:,} rows")
            return total_rows

        except Exception as e:
            logger.error(f"Error in hyper streaming: {str(e)}")
            raise



    def _stream_to_csv(self, output_path: str) -> int:
        """Stream to CSV format with memory optimization."""
        config = self._get_optimal_stream_config()
        total_rows = 0
        first_batch = True

        try:
            session = self._create_read_session(config['stream_count'])

            for stream in session.streams:
                reader = self.bq_storage_client.read_rows(stream.name)

                for batch in reader.rows().pages:
                    df = batch.to_pandas()
                    df = self._process_columns(df)

                    df.to_csv(
                        output_path,
                        mode='w' if first_batch else 'a',
                        header=first_batch,
                        index=False
                    )

                    first_batch = False
                    total_rows += len(df)
                    self._update_progress_streaming(total_rows)

                    # Force garbage collection
                    df = None
                    gc.collect()

            return total_rows

        except Exception as e:
            logger.error(f"Error in CSV streaming: {str(e)}")
            raise

    def _adjust_chunk_size(self, error_type: str, attempt: int) -> int:
        """Adjust chunk size dynamically based on error type and retry attempt."""
        reduction_factor = {
            'memory': 2,  # Divide chunk size by 2 for memory issues
            'timeout': 1.5,  # Divide by 1.5 for timeouts
            'quota': 3  # Divide by 3 for quota errors
        }.get(error_type.lower(), 1.5)  # Default reduction factor

        # Exponential backoff: smaller chunks for repeated retries
        adjusted_chunk_size = max(
            self.chunk_size // (reduction_factor * (attempt + 1)),
            50_000  # Minimum chunk size to avoid inefficiency
        )

        logger.info(
            f"Adjusting chunk size to {adjusted_chunk_size:,} rows (attempt {attempt + 1}) "
            f"due to {error_type} error."
        )
        return adjusted_chunk_size

    def _create_read_session(self, stream_count: int):
        """Create optimized read session."""
        try:
            read_session = bigquery_storage.types.ReadSession()
            table_path = f"projects/{self.config.source_project}/datasets/{self.config.dataset_id}/tables/{self.config.table_id}"
            read_session.table = table_path
            read_session.data_format = bigquery_storage.types.DataFormat.ARROW

            # Only apply where clause if specified
            if self.config.where_clause:
                read_session.read_options.row_restriction = self.config.where_clause

            # Create session with snapshot
            session = self.bq_storage_client.create_read_session(
                parent=f"projects/{self.config.project_id}",
                read_session=read_session,
                max_stream_count=stream_count #max_stream_count=stream_count
            )

            logger.info(f"Created read session with {len(session.streams)} streams")
            return session

        except Exception as e:
            logger.error(f"Error creating read session: {str(e)}")
            raise

    def _update_progress_streaming(self, total_rows: int):
        """Update progress display for streaming mode."""
        try:
            # Use clear_output with wait=True to prevent flickering
            clear_output(wait=True)

            elapsed_time = time.time() - self.start_time
            total_records = self._count_records()
            progress = (total_rows / total_records) * 100 if total_records > 0 else 0
            rows_per_second = total_rows / elapsed_time if elapsed_time > 0 else 0
            memory_usage_mb = psutil.Process(os.getpid()).memory_info().rss / 1024 / 1024

            # Create simple progress display
            progress_html = f"""
            <div class="bq-progress">
                <div style="font-weight: bold;">Processing: {self.config.source_project}.{self.config.dataset_id}.{self.config.table_id}</div>
                <div style="margin: 10px 0;">
                    <div style="width: 100%; background-color: #eee; border-radius: 3px;">
                        <div class="progress-bar" style="width: {min(100, progress)}%;">
                            {progress:.1f}%
                        </div>
                    </div>
                </div>
                <div style="margin: 10px 0;">
                    <div>Processed Rows: {total_rows:,} of {total_records:,}</div>
                    <div>Processing Rate: {rows_per_second:.1f} rows/sec</div>
                    <div>Memory Usage: {memory_usage_mb:.1f} MB</div>
                    <div>Elapsed Time: {elapsed_time:.1f}s</div>
                </div>
            </div>
            """

            # Display progress first
            display(HTML(progress_html))

            # Force display to flush
            import sys
            sys.stdout.flush()

        except Exception as e:
            logger.error(f"Error updating progress: {str(e)}")

def extract_bigquery_data(config: BigQueryConfig) -> Tuple[int, Optional[str]]:
    """Main function to extract data from BigQuery."""
    extractor = BigQueryExtractor(config)
    return extractor.extract_data()

# Example usage
if __name__ == "__main__":
    try:
        # Calculate optimal number of workers based on CPU cores
        cpu_count = os.cpu_count()
        recommended_workers = max(1, min(cpu_count * 2, 8))  # 2 workers per CPU, max 8

        # Calculate reasonable max bytes billed (10TB default limit)
        MAX_BYTES_BILLED = 10 * 1024 * 1024 * 1024 * 1024  # 10TB

        # Configure extraction with reasonable defaults
        config = BigQueryConfig(
            project_id='pre-sales-demo',
            source_project='bigquery-public-data',
            dataset_id='samples',
            table_id='github_timeline',
            output_format='hyper',
            output_path='/content/data',
            hyper_batch_size=100000,  # Added for Hyper file creation
            max_memory_gb=0.8,  # Maximum memory usage (80% of available)
            chunk_size=200_000,
            initial_chunk_size=200_000,  # More conservative initial chunk size
            max_workers=recommended_workers,  # Dynamic based on CPU cores
            max_bytes_billed=MAX_BYTES_BILLED,  # 10TB limit
            clean_up_temp_files=True,
            where_clause=None,  # Optional filtering
            columns=None,  # All columns by default
            max_rows=None
        )

        # Log configuration details
        logger.info("Starting extraction with configuration:")
        logger.info(f"- Project ID: {config.project_id}")
        logger.info(f"- Source: {config.source_project}.{config.dataset_id}.{config.table_id}")
        logger.info(f"- Workers: {config.max_workers} (based on {cpu_count} CPU cores)")
        logger.info(f"- Initial chunk size: {config.initial_chunk_size:,} rows")
        logger.info(f"- Max bytes billed: {config.max_bytes_billed / (1024**4):.1f}TB")
        logger.info(f"- Output format: {config.output_format}")
        logger.info(f"- Output path: {config.output_path}")

        # Validate output directory exists
        Path(config.output_path).mkdir(parents=True, exist_ok=True)

        # Check available memory
        memory_info = psutil.virtual_memory()
        available_memory_gb = memory_info.available / (1024**3)
        logger.info(f"Available memory: {available_memory_gb:.1f}GB")

        if available_memory_gb < 2:  # Less than 2GB available
            logger.warning("Low memory detected - reducing chunk size")
            config.initial_chunk_size = 100_000  # Reduce chunk size for low memory

        # Extract data
        total_rows, final_file = extract_bigquery_data(config)

        # Log final results
        if final_file:
            logger.info("Extraction completed successfully")
            logger.info(f"Total rows processed: {total_rows:,}")
            logger.info(f"Final output file: {final_file}")
            logger.info(f"Output location: {os.path.join(config.output_path, final_file)}")
        else:
            logger.error("Extraction failed - no output file generated")

    except Exception as e:
        logger.error(f"Extraction failed with error: {str(e)}", exc_info=True)
        raise
    finally:
        logger.info("Process completed")
        # Optional: Display memory usage at end
        final_memory = psutil.Process(os.getpid()).memory_info().rss / (1024 * 1024)
        logger.info(f"Final memory usage: {final_memory:.1f}MB")