In [1]:
!pip install pandas selenium webdriver-manager pyarrow s3fs



In [2]:
import time
import datetime
import pandas as pd
from selenium import webdriver
from selenium.webdriver.chrome.service import Service
from selenium.webdriver.chrome.options import Options
from webdriver_manager.chrome import ChromeDriverManager
import os
import re
import logging

# --- Logging Configuration ---
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(filename)s:%(lineno)d - %(message)s',
    handlers=[logging.StreamHandler()]
)

# --- EGATRealTimeScraper Class ---
class EGATRealTimeScraper:
    def __init__(self, url="https://www.sothailand.com/sysgen/egat/"):
        self.url = url
        self.driver = None
        self._initialize_driver()

    def _initialize_driver(self):
        logging.info("Initializing WebDriver...")
        chrome_options = Options()
        chrome_options.add_argument("--headless")
        chrome_options.add_argument("--no-sandbox")
        chrome_options.add_argument("--disable-dev-shm-usage")
        chrome_options.add_argument("--disable-gpu")
        chrome_options.add_argument("--window-size=1920,1080")
        chrome_options.add_argument("--log-level=0")
        chrome_options.set_capability('goog:loggingPrefs', {'browser': 'ALL'})

        try:
            service = Service(ChromeDriverManager().install())
            self.driver = webdriver.Chrome(service=service, options=chrome_options)
            logging.info("WebDriver initialized successfully.")
        except Exception as e:
            logging.error(f"Error initializing WebDriver: {e}", exc_info=True)
            self.driver = None

    def extract_data_from_console(self):
        if not self.driver:
            logging.error("WebDriver not initialized. Cannot extract data.")
            return None
        try:
            logs = self.driver.get_log('browser')
        except Exception as e:
            logging.error(f"Failed to get browser logs: {e}", exc_info=True)
            return None

        for log_entry in reversed(logs):
            message = log_entry.get('message', '')
            if 'updateMessageArea:' in message:
                match = re.search(r'updateMessageArea:\s*(\d+)\s*,\s*(\d{1,2}:\d{2})\s*,\s*([\d,]+\.?\d*)\s*,\s*(\d+\.?\d*)', message)
                if match:
                    display_date_id = match.group(1).strip()
                    display_time = match.group(2).strip()
                    current_value_mw_str = match.group(3).replace(',', '').strip()
                    temperature_c_str = match.group(4).strip()
                    try:
                        current_value_mw = float(current_value_mw_str) if current_value_mw_str else None
                        temperature_c = float(temperature_c_str) if temperature_c_str else None
                        data_dict = {
                            'scrape_timestamp_utc': datetime.datetime.utcnow().strftime('%Y-%m-%d %H:%M:%S'),
                            'display_date_id': display_date_id,
                            'display_time': display_time,
                            'current_value_MW': current_value_mw,
                            'temperature_C': temperature_c
                        }
                        logging.info(f"Data extracted: {data_dict}")
                        return data_dict
                    except ValueError as ve:
                        logging.error(f"Error converting extracted data: {ve}. Raw: val='{current_value_mw_str}', temp='{temperature_c_str}'")
        logging.warning("Relevant 'updateMessageArea' log not found or data parsing failed.")
        return None

    def scrape_once(self):
        if not self.driver:
            logging.warning("WebDriver not available. Attempting to re-initialize.")
            self._initialize_driver()
            if not self.driver:
                 logging.error("Failed to re-initialize WebDriver. Aborting scrape_once.")
                 return None
        try:
            logging.info(f"Navigating to URL: {self.url}")
            self.driver.get(self.url)
            logging.info("Waiting for page load and data (10 seconds)...")
            time.sleep(10)
            return self.extract_data_from_console()
        except Exception as e:
            logging.error(f"Error during scrape_once: {e}", exc_info=True)
            return None

    def close(self):
        if self.driver:
            logging.info("Closing WebDriver.")
            try:
                self.driver.quit()
            except Exception as e:
                logging.error(f"Error quitting WebDriver: {e}", exc_info=True)
            finally:
                self.driver = None

# --- Function for a single scrape and update cycle (with robust try-except for read_parquet) ---
def perform_scrape_and_update(scraper, lakefs_s3_path, storage_options):
    """
    Performs one cycle of scraping, processing, and updating data to LakeFS.
    """
    logging.info("--- Starting new scrape and update cycle ---")
    existing_df = pd.DataFrame() # Initialize as empty DataFrame

    try:
        logging.info(f"Attempting to read existing Parquet from: {lakefs_s3_path}")
        existing_df = pd.read_parquet(lakefs_s3_path, storage_options=storage_options)
        logging.info(f"Successfully read {len(existing_df)} rows from existing Parquet file.")
        if 'scrape_timestamp_utc' in existing_df.columns:
            existing_df['scrape_timestamp_utc'] = pd.to_datetime(existing_df['scrape_timestamp_utc'], errors='coerce')
    except FileNotFoundError:
        logging.warning(f"Parquet file not found at {lakefs_s3_path} (FileNotFoundError). This is expected if it's the first run. A new file will be created.")
    except Exception as e:
        error_message = str(e).lower()
        # Check for common "not found" messages from S3/PyArrow
        if "no such file or directory" in error_message or \
           "nosuchkey" in error_message or \
           "nosuchbucket" in error_message or \
           isinstance(e, pd.errors.EmptyDataError): # EmptyDataError can occur if file is empty/unreadable
            logging.warning(f"Parquet file not found or unreadable at {lakefs_s3_path} (Error: {type(e).__name__} - {e}). This is expected if it's the first run or an empty/corrupt file. A new/updated file will be created.")
        else:
            logging.error(f"Unexpected error reading Parquet file from {lakefs_s3_path}: {type(e).__name__} - {e}", exc_info=True)
            # For now, allow script to continue with an empty existing_df

    new_data_dict = scraper.scrape_once()

    if new_data_dict:
        logging.info(f"Scraped new data: {new_data_dict}")
        new_df = pd.DataFrame([new_data_dict])
        new_df['scrape_timestamp_utc'] = pd.to_datetime(new_df['scrape_timestamp_utc'], errors='coerce')

        combined_df = pd.concat([existing_df, new_df], ignore_index=True) if not existing_df.empty else new_df

        key_cols_for_dedup = ['display_date_id', 'display_time']
        if all(col in combined_df.columns for col in key_cols_for_dedup) and 'scrape_timestamp_utc' in combined_df.columns:
            logging.info("Deduplicating data...")
            combined_df.sort_values('scrape_timestamp_utc', ascending=False, inplace=True, na_position='last')
            deduplicated_df = combined_df.drop_duplicates(subset=key_cols_for_dedup, keep='first')
            logging.info(f"Rows after deduplication: {len(deduplicated_df)}")
        else:
            deduplicated_df = combined_df
            logging.warning("Skipping deduplication due to missing key columns.")

        try:
            if 'display_date_id' in deduplicated_df.columns and 'display_time' in deduplicated_df.columns:
                deduplicated_df['temp_datetime_sort'] = pd.to_datetime(
                    deduplicated_df['display_date_id'] + ' ' + deduplicated_df['display_time'],
                    format='%Y%m%d %H:%M', errors='coerce'
                )
                deduplicated_df.sort_values(
                    by=['temp_datetime_sort', 'scrape_timestamp_utc'],
                    ascending=[True, True], inplace=True, na_position='last'
                )
                deduplicated_df.drop(columns=['temp_datetime_sort'], inplace=True)
            elif 'scrape_timestamp_utc' in deduplicated_df.columns:
                 deduplicated_df.sort_values('scrape_timestamp_utc', ascending=True, inplace=True, na_position='last')
            logging.info("Sorted final DataFrame.")
        except Exception as sort_e:
            logging.warning(f"Could not perform full sort: {sort_e}. Using basic sort if possible.")
            if 'scrape_timestamp_utc' in deduplicated_df.columns: # Fallback sort
                 deduplicated_df.sort_values('scrape_timestamp_utc', ascending=True, inplace=True, na_position='last')

        try:
            logging.info(f"Saving {len(deduplicated_df)} rows to LakeFS: {lakefs_s3_path}")
            deduplicated_df.to_parquet(
                lakefs_s3_path,
                storage_options=storage_options,
                index=False, engine='pyarrow', compression='snappy'
            )
            logging.info("Successfully saved data to LakeFS.")
        except Exception as e:
            logging.error(f"Failed to save DataFrame to LakeFS: {e}", exc_info=True)
    else:
        logging.warning("No new data was scraped in this cycle.")
    logging.info("--- Scrape and update cycle finished ---")

def run_scraper_periodically(interval_minutes=5):
    ACCESS_KEY = os.getenv("LAKEFS_ACCESS_KEY_ID", "access_key")
    SECRET_KEY = os.getenv("LAKEFS_SECRET_ACCESS_KEY", "secret_key")
    LAKEFS_ENDPOINT = os.getenv("LAKEFS_ENDPOINT_URL", "http://lakefs-dev:8000/")
    REPO_NAME = "dataset"
    BRANCH_NAME = "main"
    TARGET_PARQUET_FILE_PATH = "egat_datascraping/egat_realtime_power_history.parquet"
    lakefs_s3_path = f"s3a://{REPO_NAME}/{BRANCH_NAME}/{TARGET_PARQUET_FILE_PATH}"

    storage_options = {
        "key": ACCESS_KEY,
        "secret": SECRET_KEY,
        "client_kwargs": {
            "endpoint_url": LAKEFS_ENDPOINT
        }
    }
    logging.info(f"Configured LakeFS Target Path: {lakefs_s3_path}")
    logging.info(f"Scraping interval set to {interval_minutes} minutes.")

    scraper = None
    try:
        scraper = EGATRealTimeScraper()
        if not scraper.driver:
            logging.error("WebDriver could not be initialized. Terminating process.")
            return

        while True:
            perform_scrape_and_update(scraper, lakefs_s3_path, storage_options)
            
            wait_seconds = interval_minutes * 60
            logging.info(f"Waiting for {interval_minutes} minutes ({wait_seconds} seconds) before next cycle...")
            time.sleep(wait_seconds)

    except KeyboardInterrupt:
        logging.info("Process interrupted by user (KeyboardInterrupt).")
    except Exception as e:
        logging.error(f"An unexpected error occurred in the main loop: {e}", exc_info=True)
    finally:
        if scraper:
            scraper.close()
        logging.info("Scraping process terminated.")


if __name__ == "__main__":
    run_scraper_periodically(interval_minutes=5)

2025-05-18 20:21:56,867 - INFO - 3296181972.py:204 - Configured LakeFS Target Path: s3a://dataset/main/egat_datascraping/egat_realtime_power_history.parquet
2025-05-18 20:21:56,867 - INFO - 3296181972.py:205 - Scraping interval set to 5 minutes.
2025-05-18 20:21:56,868 - INFO - 3296181972.py:27 - Initializing WebDriver...
2025-05-18 20:21:57,002 - INFO - logger.py:11 - Get LATEST chromedriver version for google-chrome
2025-05-18 20:21:57,216 - INFO - logger.py:11 - Get LATEST chromedriver version for google-chrome
2025-05-18 20:21:57,395 - INFO - logger.py:11 - Driver [/home/jovyan/.wdm/drivers/chromedriver/linux64/136.0.7103.94/chromedriver-linux64/chromedriver] found in cache
2025-05-18 20:21:57,893 - INFO - 3296181972.py:40 - WebDriver initialized successfully.
2025-05-18 20:21:57,895 - INFO - 3296181972.py:113 - --- Starting new scrape and update cycle ---
2025-05-18 20:21:57,901 - INFO - 3296181972.py:117 - Attempting to read existing Parquet from: s3a://dataset/main/egat_datascra