In [38]:
import sys
import logging
from pathlib import Path
from datetime import datetime
from typing import Any, Dict, List, Optional, Tuple

import numpy as np
import pandas as pd
import s3fs
import xarray as xr
import dask
from dask.distributed import Client

from opera_tropo import download


class WeatherDataAnalyzer:
    """A class for analyzing weather data from NetCDF files and generating statistical reports."""

    def __init__(
        self,
        output_dir: str = "./weather_stats",
        n_workers: int = 4,
        s3_profile: str = "saml-pub",
        verbose: bool = True,
    ):
        """Initialize the WeatherDataAnalyzer.

        Parameters
        ----------
        output_dir : str
            Directory to save output files
        n_workers : int
            Number of Dask workers
        s3_profile : str
            S3 profile name for authentication
        verbose : bool
            Whether to print progress messages

        """
        self.output_dir = Path(output_dir)
        self.n_workers = n_workers
        self.s3_profile = s3_profile
        self.verbose = verbose
        self.logger = self._setup_logging()

        # Variable configurations
        self.variable_configs = {
            "z": {
                "levels": [0],
                "desc": "Geopotential (surface)",
                "check_negative": False,
            },
            "t": {
                "levels": "all",
                "desc": "Temperature (all levels)",
                "check_negative": False,
            },
            "q": {
                "levels": "all",
                "desc": "Specific humidity (all levels)",
                "check_negative": True,
            },
            "lnsp": {
                "levels": [0],
                "desc": "Log surface pressure",
                "check_negative": False,
            },
        }

    def _setup_logging(self) -> logging.Logger:
        """Set up logging configuration."""
        logger = logging.getLogger(__name__)
        if not logger.handlers:
            handler = logging.StreamHandler()
            formatter = logging.Formatter("%(asctime)s - %(levelname)s - %(message)s")
            handler.setFormatter(formatter)
            logger.addHandler(handler)
            logger.setLevel(logging.INFO if self.verbose else logging.WARNING)
        return logger

    def _extract_time_info(self, dataset: xr.Dataset) -> Tuple[str, str]:
        """Extract model date and time information from dataset."""
        try:
            if "time" in dataset.coords:
                model_time = pd.to_datetime(dataset.time.values[0])
                return (
                    model_time.strftime("%Y-%m-%d"),
                    model_time.strftime("%H:%M:%S UTC"),
                )
            else:
                self.logger.warning("No time coordinate found in dataset")
                return "Unknown", "Unknown"
        except Exception as e:
            self.logger.warning(f"Error extracting time info: {e}")
            return "Unknown", "Unknown"

    def _get_variable_data(
        self, dataset: xr.Dataset, var: str, config: Dict[str, Any]
    ) -> Optional[xr.DataArray]:
        """Extract variable data based on configuration."""
        if var not in dataset.data_vars:
            self.logger.warning(f"Variable '{var}' not found in dataset")
            return None

        data = dataset[var]

        # Apply level selection if specified
        if config["levels"] != "all" and "level" in data.dims:
            try:
                data = data.isel(level=config["levels"])
            except (IndexError, KeyError) as e:
                self.logger.warning(f"Error selecting levels for {var}: {e}")

        return data

    def _compute_statistics(
        self, data: xr.DataArray, var: str, config: Dict[str, Any]
    ) -> Dict[str, Any]:
        """Compute comprehensive statistics for a variable."""
        stats = {
            f"{var}_min": data.min(),
            f"{var}_max": data.max(),
            f"{var}_mean": data.mean(),
            f"{var}_std": data.std(),
            f"{var}_total_size": data.size,
            f"{var}_nan_count": data.isnull().sum(),
            f"{var}_zero_count": (data == 0.0).sum(),
            f"{var}_inf_count": np.isinf(data).sum(),
            f"{var}_finite_count": np.isfinite(data).sum(),
        }

        # Add negative count only for variables where it's relevant
        if config["check_negative"]:
            stats[f"{var}_negative_count"] = (data < 0).sum()
        else:
            stats[f"{var}_negative_count"] = xr.DataArray(0)

        return stats

    def _create_quality_flags(
        self, var: str, results: Dict[str, Any], config: Dict[str, Any]
    ) -> List[str]:
        """Create quality flags based on computed statistics."""
        flags = []

        nan_count = results[f"{var}_nan_count"].item()
        inf_count = results[f"{var}_inf_count"].item()
        zero_count = results[f"{var}_zero_count"].item()
        negative_count = results[f"{var}_negative_count"].item()

        if nan_count > 0:
            flags.append(f"NaN({nan_count})")
        if inf_count > 0:
            flags.append(f"Inf({inf_count})")
        if zero_count > 0:
            flags.append(f"Zero({zero_count})")
        if negative_count > 0 and config["check_negative"]:
            flags.append(f"Neg({negative_count})")

        return flags

    def _create_summary_dataframe(self, all_results: Dict[str, Any]) -> pd.DataFrame:
        """Create summary DataFrame from computed results."""
        summary_data = []

        for var, config in self.variable_configs.items():
            if f"{var}_min" not in all_results:
                continue

            total_size = all_results[f"{var}_total_size"]
            nan_count = all_results[f"{var}_nan_count"].item()
            zero_count = all_results[f"{var}_zero_count"].item()
            negative_count = all_results[f"{var}_negative_count"].item()
            inf_count = all_results[f"{var}_inf_count"].item()
            finite_count = all_results[f"{var}_finite_count"].item()

            quality_flags = self._create_quality_flags(var, all_results, config)

            summary_data.append(
                {
                    "Variable": var,
                    "Description": config["desc"],
                    "Min": all_results[f"{var}_min"].item(),
                    "Max": all_results[f"{var}_max"].item(),
                    "Mean": all_results[f"{var}_mean"].item(),
                    "Std": all_results[f"{var}_std"].item(),
                    "Total_Size": total_size,
                    "Finite_Count": finite_count,
                    "NaN_Count": nan_count,
                    "Inf_Count": inf_count,
                    "Zero_Count": zero_count,
                    "Negative_Count": negative_count,
                    "NaN_%": f"{nan_count/total_size*100:.3f}%",
                    "Zero_%": f"{zero_count/total_size*100:.3f}%",
                    "Completeness_%": f"{finite_count/total_size*100:.2f}%",
                    "Data_Quality": "; ".join(quality_flags) if quality_flags else "OK",
                }
            )

        return pd.DataFrame(summary_data)

    def _generate_output_filenames(
        self, file_url: str, model_date_str: str, model_time_str: str
    ) -> Tuple[Path, Path]:
        """Generate expected output filenames for a given input file."""
        analysis_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        date_part = (
            model_date_str.replace("-", "")
            if model_date_str != "Unknown"
            else "unknown"
        )

        try:
            hour_part = pd.to_datetime(f"{model_date_str} {model_time_str}").strftime(
                "%H"
            )
        except:
            hour_part = "unknown"

        base_filename = f"weather_stats_{date_part}_{hour_part}_{analysis_timestamp}"

        txt_path = self.output_dir / f"{base_filename}.txt"
        csv_path = self.output_dir / f"{base_filename}.csv"

        return txt_path, csv_path

    def _check_existing_outputs(
        self, file_url: str, overwrite: bool = False
    ) -> Tuple[bool, List[str]]:
        """Check if output files already exist for this input file.

        Parameters
        ----------
        file_url : str
            URL of the input file
        overwrite : bool
            If True, existing files will be overwritten

        Returns
        -------
        Tuple[bool, List[str]]
            (should_skip, existing_files_list)

        """
        if overwrite:
            return False, []

        try:
            # We need to extract time info to predict the output filename pattern
            # This requires opening the file briefly
            fs = s3fs.S3FileSystem(
                profile=self.s3_profile, config_kwargs={"max_pool_connections": 50}
            )

            with fs.open(file_url, mode="rb") as f:
                dataset = xr.open_dataset(f, engine="h5netcdf")
                model_date_str, model_time_str = self._extract_time_info(dataset)
                dataset.close()

            # Generate the expected filename pattern
            date_part = (
                model_date_str.replace("-", "")
                if model_date_str != "Unknown"
                else "unknown"
            )
            try:
                hour_part = pd.to_datetime(
                    f"{model_date_str} {model_time_str}"
                ).strftime("%H")
            except:
                hour_part = "unknown"

            # Look for existing files with this date_hour pattern
            pattern = f"weather_stats_{date_part}_{hour_part}_*"
            existing_files = list(self.output_dir.glob(pattern + ".txt"))
            existing_csv_files = list(self.output_dir.glob(pattern + ".csv"))

            # If we have both txt and csv files, consider it already processed
            if existing_files and existing_csv_files:
                return True, [str(f) for f in existing_files + existing_csv_files]

            return False, []

        except Exception as e:
            self.logger.warning(f"Error checking existing outputs for {file_url}: {e}")
            # If we can't check, assume it doesn't exist and process it
            return False, []

    def _export_results(
        self,
        summary_df: pd.DataFrame,
        file_url: str,
        model_date_str: str,
        model_time_str: str,
        dataset: xr.Dataset,
    ) -> Tuple[str, str]:
        """Export results to text and CSV files."""
        # Generate output filenames using the new method
        txt_path, csv_path = self._generate_output_filenames(
            file_url, model_date_str, model_time_str
        )

        # Export text report
        self._export_text_report(
            txt_path, summary_df, file_url, model_date_str, model_time_str, dataset
        )

        # Export CSV
        analysis_timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        self._export_csv_report(
            csv_path,
            summary_df,
            file_url,
            model_date_str,
            model_time_str,
            analysis_timestamp,
        )

        return str(txt_path), str(csv_path)

    def _export_text_report(
        self,
        file_path: Path,
        summary_df: pd.DataFrame,
        file_url: str,
        model_date_str: str,
        model_time_str: str,
        dataset: xr.Dataset,
    ):
        """Export detailed text report."""
        header = f"""
{'='*80}
WEATHER DATA STATISTICAL ANALYSIS REPORT
{'='*80}
Analysis Timestamp: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}
Source File: {file_url}
Model Date: {model_date_str}
Model Time: {model_time_str}

Dataset Information:
- Dimensions: {dict(dataset.sizes)}
- Coordinates: {list(dataset.coords.keys())}
- Data Variables: {list(dataset.data_vars.keys())}
- Global Attributes: {len(dataset.attrs)} attributes

Variable Processing Configuration:
{self._format_variable_configs()}
{'='*80}

STATISTICAL SUMMARY:
"""

        with open(file_path, "w") as f:
            f.write(header)

            # Write summary table
            f.write("\nDETAILED STATISTICS TABLE:\n")
            f.write("=" * 140 + "\n")

            display_df = summary_df.copy()
            numerical_cols = ["Min", "Max", "Mean", "Std"]
            for col in numerical_cols:
                display_df[col] = display_df[col].round(6)

            f.write(display_df.to_string(index=False, max_colwidth=30))

            # Write data quality summary
            f.write(f"\n\n{'='*50}\n")
            f.write("DATA QUALITY SUMMARY:\n")
            f.write(f"{'='*50}\n")

            for _, row in summary_df.iterrows():
                status = (
                    "✅ No issues detected"
                    if row["Data_Quality"] == "OK"
                    else row["Data_Quality"]
                )
                f.write(f"{row['Variable']}: {status}\n")

    def _format_variable_configs(self) -> str:
        """Format variable configurations for display."""
        config_lines = []
        for var, config in self.variable_configs.items():
            levels_str = (
                "all levels"
                if config["levels"] == "all"
                else f"levels {config['levels']}"
            )
            config_lines.append(f"- {var}: {levels_str}")
        return "\n".join(config_lines)

    def _export_csv_report(
        self,
        file_path: Path,
        summary_df: pd.DataFrame,
        file_url: str,
        model_date_str: str,
        model_time_str: str,
        analysis_timestamp: str,
    ):
        """Export CSV report with metadata."""
        csv_df = summary_df.copy()
        csv_df.insert(0, "Source_File", file_url)
        csv_df.insert(1, "Model_Date", model_date_str)
        csv_df.insert(2, "Model_Time", model_time_str)
        csv_df.insert(3, "Analysis_Timestamp", analysis_timestamp)

        csv_df.to_csv(file_path, index=False)

    def analyze_file(
        self, file_url: str, client: Optional[Client] = None, overwrite: bool = False
    ) -> Tuple[pd.DataFrame, str, str]:
        """Analyze a single weather data file and export statistics.

        Parameters
        ----------
        file_url : str
            URL or path to the file to analyze
        client : Optional[Client]
            Existing Dask client to use. If None, creates a new one.
        overwrite : bool
            If False, skip files that have already been processed

        Returns
        -------
        Tuple[pd.DataFrame, str, str]
            Summary DataFrame, text report path, and CSV report path

        """
        # Create output directory
        self.output_dir.mkdir(parents=True, exist_ok=True)

        # Check if file has already been processed
        should_skip, existing_files = self._check_existing_outputs(file_url, overwrite)
        if should_skip:
            self.logger.info(f"Skipping already processed file: {file_url}")
            self.logger.info(f"Existing files: {', '.join(existing_files)}")
            # Return the most recent existing files
            txt_files = [f for f in existing_files if f.endswith(".txt")]
            csv_files = [f for f in existing_files if f.endswith(".csv")]
            if txt_files and csv_files:
                # Load summary from existing CSV for consistency
                try:
                    summary_df = pd.read_csv(csv_files[0])
                    # Remove metadata columns to match analyze_file output format
                    metadata_cols = [
                        "Source_File",
                        "Model_Date",
                        "Model_Time",
                        "Analysis_Timestamp",
                    ]
                    for col in metadata_cols:
                        if col in summary_df.columns:
                            summary_df = summary_df.drop(columns=[col])
                    return summary_df, txt_files[0], csv_files[0]
                except Exception as e:
                    self.logger.warning(f"Could not load existing summary: {e}")
            # Return empty DataFrame if can't load existing results
            return (
                pd.DataFrame(),
                txt_files[0] if txt_files else "",
                csv_files[0] if csv_files else "",
            )

        self.logger.info(f"Processing: {file_url}")

        def _analyze_with_client(dask_client):
            """Internal function to perform analysis with a given client."""
            try:
                # Initialize S3 filesystem
                if self.verbose:
                    print("  → Connecting to S3...")
                    sys.stdout.flush()

                fs = s3fs.S3FileSystem(
                    profile=self.s3_profile, config_kwargs={"max_pool_connections": 50}
                )

                # Open dataset
                if self.verbose:
                    print("  → Opening dataset...")
                    sys.stdout.flush()

                with fs.open(file_url, mode="rb") as f:
                    dataset = xr.open_dataset(f, engine="h5netcdf", chunks="auto")

                # Extract time information
                model_date_str, model_time_str = self._extract_time_info(dataset)

                # Compute statistics for all variables
                all_operations = {}
                processed_vars = []

                if self.verbose:
                    print("  → Preparing computations...")
                    sys.stdout.flush()

                for var, config in self.variable_configs.items():
                    data = self._get_variable_data(dataset, var, config)
                    if data is not None:
                        stats = self._compute_statistics(data, var, config)
                        all_operations.update(stats)
                        processed_vars.append(var)

                if not all_operations:
                    raise ValueError("No valid variables found in dataset")

                if self.verbose:
                    print("  → Computing statistics...")
                    sys.stdout.flush()
                else:
                    self.logger.info("Computing statistics...")

                results = dask.compute(all_operations)[0]

                if self.verbose:
                    print("  → Generating reports...")
                    sys.stdout.flush()

                # Create summary DataFrame
                summary_df = self._create_summary_dataframe(results)

                # Export results
                txt_path, csv_path = self._export_results(
                    summary_df, file_url, model_date_str, model_time_str, dataset
                )

                if self.verbose:
                    print("  → Analysis completed successfully")
                    sys.stdout.flush()
                else:
                    self.logger.info("Analysis completed successfully")

                return summary_df, txt_path, csv_path

            except Exception as e:
                if self.verbose:
                    print(f"  → Error: {e}")
                    sys.stdout.flush()
                raise

        try:
            if client is not None:
                # Use provided client
                return _analyze_with_client(client)
            else:
                # Create new client for single file analysis
                with Client(
                    n_workers=self.n_workers,
                    threads_per_worker=2,
                    silence_logs=not self.verbose,
                ) as new_client:
                    return _analyze_with_client(new_client)

        except Exception as e:
            self.logger.error(f"Error during analysis: {e}")
            raise

In [22]:
downloader = download.HRESDownloader()
hres_dates = downloader.list_matching_keys(
            start_date='20240909', end_date='20240911'
        )

df = pd.DataFrame(hres_dates, columns=["s3_key", "url"])
df["dates"] = df["s3_key"].apply(lambda x: x.split("/")[0])
df["hour"] = df["s3_key"].apply(lambda x: x.split("/")[1].split('_')[2][8:10])
df["filename"] = df["s3_key"].apply(lambda x: x.split("/")[1])
df = df[["dates", "hour","filename", "s3_key", "url"]]

print(f"Found {len(df)} files")

Found 12 files


In [23]:
df

Unnamed: 0,dates,hour,filename,s3_key,url
0,20240909,0,ECMWF_TROP_202409090000_202409090000_1.nc,20240909/ECMWF_TROP_202409090000_202409090000_...,s3://opera-ecmwf/20240909/ECMWF_TROP_202409090...
1,20240909,6,ECMWF_TROP_202409090600_202409090600_1.nc,20240909/ECMWF_TROP_202409090600_202409090600_...,s3://opera-ecmwf/20240909/ECMWF_TROP_202409090...
2,20240909,12,ECMWF_TROP_202409091200_202409091200_1.nc,20240909/ECMWF_TROP_202409091200_202409091200_...,s3://opera-ecmwf/20240909/ECMWF_TROP_202409091...
3,20240909,18,ECMWF_TROP_202409091800_202409091800_1.nc,20240909/ECMWF_TROP_202409091800_202409091800_...,s3://opera-ecmwf/20240909/ECMWF_TROP_202409091...
4,20240910,0,ECMWF_TROP_202409100000_202409100000_1.nc,20240910/ECMWF_TROP_202409100000_202409100000_...,s3://opera-ecmwf/20240910/ECMWF_TROP_202409100...
5,20240910,6,ECMWF_TROP_202409100600_202409100600_1.nc,20240910/ECMWF_TROP_202409100600_202409100600_...,s3://opera-ecmwf/20240910/ECMWF_TROP_202409100...
6,20240910,12,ECMWF_TROP_202409101200_202409101200_1.nc,20240910/ECMWF_TROP_202409101200_202409101200_...,s3://opera-ecmwf/20240910/ECMWF_TROP_202409101...
7,20240910,18,ECMWF_TROP_202409101800_202409101800_1.nc,20240910/ECMWF_TROP_202409101800_202409101800_...,s3://opera-ecmwf/20240910/ECMWF_TROP_202409101...
8,20240911,0,ECMWF_TROP_202409110000_202409110000_1.nc,20240911/ECMWF_TROP_202409110000_202409110000_...,s3://opera-ecmwf/20240911/ECMWF_TROP_202409110...
9,20240911,6,ECMWF_TROP_202409110600_202409110600_1.nc,20240911/ECMWF_TROP_202409110600_202409110600_...,s3://opera-ecmwf/20240911/ECMWF_TROP_202409110...


In [35]:
file_url = df[(df.dates == '20240910') & (df.hour == '06')].url.values[0]

In [39]:
verbose = True
analyzer = WeatherDataAnalyzer(
    output_dir=Path('download_test'),
    n_workers=3,
    s3_profile="saml-pub",
    verbose=verbose,
)


with Client(
            n_workers=4,
            threads_per_worker=2,
            memory_limit=f"{2}GB",
            silence_logs=not verbose,
        ) as client:
    
    should_skip, existing_files = (
                            analyzer._check_existing_outputs(file_url, overwrite=False)
                            )
    summary_df, txt_file, csv_file = analyzer.analyze_file(
                            file_url, client=client, overwrite=False
                            )

Perhaps you already have a cluster running?
Hosting the HTTP server on port 43869 instead
2025-06-26 16:24:53,290 - distributed.scheduler - INFO - State start
2025-06-26 16:24:53,305 - distributed.scheduler - INFO -   Scheduler at:     tcp://127.0.0.1:32905
2025-06-26 16:24:53,306 - distributed.scheduler - INFO -   dashboard at:  http://127.0.0.1:43869/status
2025-06-26 16:24:53,306 - distributed.scheduler - INFO - Registering Worker plugin shuffle
2025-06-26 16:24:53,340 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:42461'
2025-06-26 16:24:53,342 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:33160'
2025-06-26 16:24:53,345 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:44157'
2025-06-26 16:24:53,349 - distributed.nanny - INFO -         Start Nanny at: 'tcp://127.0.0.1:46055'
2025-06-26 16:24:53,763 - distributed.worker - INFO -       Start worker at:      tcp://127.0.0.1:38845
2025-06-26 16:24:53,763 - distributed.w

  → Connecting to S3...
  → Opening dataset...
  → Preparing computations...
  → Computing statistics...
  → Generating reports...
  → Analysis completed successfully


2025-06-26 16:27:56,502 - distributed.scheduler - INFO - Remove client Client-c3604ef5-52e4-11f0-99b1-e43d1adad770
2025-06-26 16:27:56,503 - distributed.core - INFO - Received 'close-stream' from tcp://127.0.0.1:44258; closing.
2025-06-26 16:27:56,504 - distributed.scheduler - INFO - Remove client Client-c3604ef5-52e4-11f0-99b1-e43d1adad770
2025-06-26 16:27:56,505 - distributed.scheduler - INFO - Close client connection: Client-c3604ef5-52e4-11f0-99b1-e43d1adad770
2025-06-26 16:27:56,506 - distributed.scheduler - INFO - Retire worker addresses (stimulus_id='retire-workers-1750980476.5067892') (0, 1, 2, 3)
2025-06-26 16:27:56,507 - distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:42461'. Reason: nanny-close
2025-06-26 16:27:56,508 - distributed.nanny - INFO - Nanny asking worker to close. Reason: nanny-close
2025-06-26 16:27:56,508 - distributed.nanny - INFO - Closing Nanny at 'tcp://127.0.0.1:33160'. Reason: nanny-close
2025-06-26 16:27:56,510 - distributed.nanny - INFO - N

In [40]:
summary_df

Unnamed: 0,Variable,Description,Min,Max,Mean,Std,Total_Size,Finite_Count,NaN_Count,Inf_Count,Zero_Count,Negative_Count,NaN_%,Zero_%,Completeness_%,Data_Quality
0,z,Geopotential (surface),-4563.282,62700.71875,3712.363525,8367.480469,13107200,13107200,0,0,0,0,0.000%,0.000%,100.00%,OK
1,t,Temperature (all levels),182.1989,317.13504,240.07666,29.328047,1795686400,1795686400,0,0,0,0,0.000%,0.000%,100.00%,OK
2,q,Specific humidity (all levels),2.282539e-08,0.028652,0.001634,0.003674,1795686400,1795686400,0,0,0,0,0.000%,0.000%,100.00%,OK
3,lnsp,Log surface pressure,10.74795,11.574851,11.472398,0.117171,13107200,13107200,0,0,0,0,0.000%,0.000%,100.00%,OK


In [42]:
client.close()