In [4]:
import aiohttp
import asyncio
import json
from datetime import datetime, timedelta
import pandas as pd
import os
import logging
from typing import Optional, Dict, List
from aiohttp import ClientTimeout
from asyncio import Semaphore
import calendar

# ================ Configuration ================
# Basic Configuration
DATA_DOMAIN = "https://solarcentre.spinifexvalley.com.au"
LOCATION_WEATHER = "101"
MAX_CONCURRENT_REQUESTS = 10
RETRY_ATTEMPTS = 3
REQUEST_TIMEOUT = 30

# Cache Configuration
CACHE_DIR = "data_cache"
TEMP_DATA_DIR = "temp_data"
PROCESSED_DATA_DIR = "processed_data"
CACHE_EXPIRY_HOURS = 1  # Cache expiration time (hours)

# Date Range Configuration
START_YEAR = 2023
START_MONTH = 11
END_YEAR = 2024  # None means current year
END_MONTH = 4  # None means current month

# Data Source Configuration
POWER_SOURCE = '{"78":[193]}'
WEATHER_SOURCE = '{"101":[10021,9000,9304,506]}'

# Data Validation Configuration
EXPECTED_POINTS_PER_DAY = 576  # Expected data points per day (24 hours * 12 per hour * 2)
ALLOWED_MISSING_POINTS = 100    # Maximum allowed missing points
TIME_INTERVAL_MINUTES = 5      # Expected time interval (minutes)

# HTTP Headers
HEADERS = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36',
    'Accept': 'application/json, text/javascript, */*; q=0.01',
    'Accept-Language': 'en-US,en;q=0.9',
    'Accept-Encoding': 'gzip, deflate, br',
    'Connection': 'keep-alive',
    'Origin': 'https://solarcentre.spinifexvalley.com.au',
    'Referer': 'https://solarcentre.spinifexvalley.com.au/graphs'
}
# ================ End of Configuration ================

# Configure logging
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('solar_data.log'),
        logging.StreamHandler()
    ]
)

# Create necessary directories
for directory in [CACHE_DIR, TEMP_DATA_DIR, PROCESSED_DATA_DIR]:
    os.makedirs(directory, exist_ok=True)

class DataCache:
    """Data cache management class"""
    
    @staticmethod
    def get_cache_path(date: datetime, data_type: str) -> str:
        """Get cache file path"""
        return os.path.join(CACHE_DIR, f"{data_type}_{date.strftime('%Y%m%d')}.json")
    
    @staticmethod
    def save_cache(data: dict, cache_path: str) -> None:
        """Save data to cache"""
        with open(cache_path, 'w') as f:
            json.dump(data, f)
    
    @staticmethod
    def load_cache(cache_path: str) -> Optional[dict]:
        """Load data from cache"""
        if os.path.exists(cache_path):
            try:
                # Check if file is expired
                file_time = datetime.fromtimestamp(os.path.getmtime(cache_path))
                if (datetime.now() - file_time).total_seconds() > CACHE_EXPIRY_HOURS * 3600:
                    os.remove(cache_path)
                    return None
                    
                with open(cache_path, 'r') as f:
                    return json.load(f)
            except Exception as e:
                logging.error(f"Error loading cache: {str(e)}")
        return None
    
    @staticmethod
    def clean_expired_cache() -> None:
        """Clean expired cache files"""
        try:
            current_time = datetime.now()
            for filename in os.listdir(CACHE_DIR):
                file_path = os.path.join(CACHE_DIR, filename)
                file_time = datetime.fromtimestamp(os.path.getmtime(file_path))
                
                # Delete file if it's expired
                if (current_time - file_time).total_seconds() > CACHE_EXPIRY_HOURS * 3600:
                    os.remove(file_path)
                    logging.info(f"Removed expired cache file: {filename}")
        except Exception as e:
            logging.error(f"Error cleaning cache: {str(e)}")

class DataValidator:
    """Data validation class"""
    
    @staticmethod
    def validate_day_data(df: pd.DataFrame, date: datetime) -> bool:
        """Validate daily data integrity"""
        if df is None or df.empty:
            logging.warning(f"No data for {date.date()}")
            return False
            
        # Check number of data points
        points_count = len(df)
        if points_count < EXPECTED_POINTS_PER_DAY - ALLOWED_MISSING_POINTS:
            logging.warning(f"Insufficient data points for {date.date()}: "
                          f"got {points_count}, expected at least "
                          f"{EXPECTED_POINTS_PER_DAY - ALLOWED_MISSING_POINTS}")
            return False
            
        # Check time intervals
        time_diffs = df.index.to_series().diff().dt.total_seconds() / 60
        invalid_intervals = time_diffs[time_diffs != TIME_INTERVAL_MINUTES].count()
        if invalid_intervals > ALLOWED_MISSING_POINTS:
            logging.warning(f"Too many irregular time intervals for {date.date()}: "
                          f"{invalid_intervals} irregular intervals found")
            return False
            
        # Check data quality (all columns except wind speed)
        required_columns = ['power', 'rainfall', 'temperature', 'solar_radiation']
        null_counts = df[required_columns].isnull().sum()
        if (null_counts > ALLOWED_MISSING_POINTS).any():
            logging.warning(f"Too many null values for {date.date()} in required columns: \n{null_counts}")
            return False
            
        # Check outliers
        power_outliers = df['power'][
            (df['power'] < -1) |  # Negative values are usually errors
            (df['power'] > 50)    # Values over 50 are usually anomalies
        ].count()
        if power_outliers > ALLOWED_MISSING_POINTS:
            logging.warning(f"Too many power outliers for {date.date()}: {power_outliers}")
            return False
            
        return True

async def fetch_data(session: aiohttp.ClientSession, url: str, params: dict, 
                    semaphore: Semaphore, cache_path: str) -> Optional[dict]:
    """Asynchronously fetch data with caching and retry mechanism"""
    
    # Try to load from cache
    cached_data = DataCache.load_cache(cache_path)
    if cached_data:
        return cached_data
        
    for attempt in range(RETRY_ATTEMPTS):
        try:
            async with semaphore:
                async with session.get(url, params=params, headers=HEADERS, 
                                     timeout=ClientTimeout(total=REQUEST_TIMEOUT)) as response:
                    if response.status == 200:
                        data = await response.json()
                        if data.get('message', '').startswith('0'):
                            data['message'] = ''
                        # Save to cache
                        DataCache.save_cache(data, cache_path)
                        return data
                    logging.error(f"Request failed with status {response.status}")
        except Exception as e:
            if attempt < RETRY_ATTEMPTS - 1:
                await asyncio.sleep(2 ** attempt)
                continue
            logging.error(f"Error after {RETRY_ATTEMPTS} attempts: {str(e)}")
        return None

async def get_day_data(session: aiohttp.ClientSession, date: datetime, 
                      semaphore: Semaphore) -> Optional[pd.DataFrame]:
    """Asynchronously fetch daily data"""
    settings = {
        'interval': '',
        'start_date': date.strftime('%Y-%m-%d'),
        'end_date': (date + timedelta(days=1)).strftime('%Y-%m-%d'),
        'sources': POWER_SOURCE
    }
    
    weather_settings = {
        'interval': '',
        'start_date': date.strftime('%Y-%m-%d'),
        'end_date': (date + timedelta(days=1)).strftime('%Y-%m-%d'),
        'sources': WEATHER_SOURCE
    }

    power_cache = DataCache.get_cache_path(date, 'power')
    weather_cache = DataCache.get_cache_path(date, 'weather')

    power_task = fetch_data(session, f"{DATA_DOMAIN}/power/average", 
                           settings, semaphore, power_cache)
    weather_task = fetch_data(session, f"{DATA_DOMAIN}/weather/average", 
                            weather_settings, semaphore, weather_cache)
    
    power_data, weather_data = await asyncio.gather(power_task, weather_task)
    
    if not all([power_data, weather_data]):
        return None

    try:
        if (isinstance(power_data.get('measures'), list) and 
            isinstance(weather_data.get('measures'), list)):
            
            power_df = pd.DataFrame(power_data['measures'], 
                                  columns=['timestamp', 'power'])
            power_df['timestamp'] = pd.to_datetime(power_df['timestamp'])
            power_df.set_index('timestamp', inplace=True)
            
            weather_df = pd.DataFrame(weather_data['measures'],
                                    columns=['timestamp', 'rainfall', 'temperature', 
                                           'solar_radiation', 'wind_speed'])
            weather_df['timestamp'] = pd.to_datetime(weather_df['timestamp'])
            weather_df.set_index('timestamp', inplace=True)
            
            combined_df = pd.merge(power_df, weather_df, 
                                 left_index=True, right_index=True, 
                                 how='outer')
            
            # Validate data integrity
            if DataValidator.validate_day_data(combined_df, date):
                return combined_df
            else:
                logging.error(f"Data validation failed for {date.date()}")
                return None
            
    except Exception as e:
        logging.error(f"Error processing data for {date}: {str(e)}")
    return None

async def get_month_data(year: int, month: int) -> Optional[pd.DataFrame]:
    """Asynchronously fetch monthly data"""
    async with aiohttp.ClientSession() as session:
        all_data = []
        _, days_in_month = calendar.monthrange(year, month)
        start_date = datetime(year, month, 1)
        semaphore = Semaphore(MAX_CONCURRENT_REQUESTS)
        
        # Create task list
        tasks = []
        for day in range(days_in_month):
            current_date = start_date + timedelta(days=day)
            if current_date >= datetime.now():
                break
            tasks.append(get_day_data(session, current_date, semaphore))

        # Run all tasks concurrently
        results = await asyncio.gather(*tasks)
        all_data = [df for df in results if df is not None]

        if all_data:
            combined_data = pd.concat(all_data)
            combined_data = combined_data[~combined_data.index.duplicated(keep='last')]
            combined_data.sort_index(inplace=True)
            
            # Save temporary file
            temp_file = os.path.join(TEMP_DATA_DIR, f"temp_data_{year}{month:02d}.csv")
            combined_data.to_csv(temp_file)
            
            logging.info(f"Month {year}-{month} data saved to {temp_file}")
            return combined_data
        return None

async def process_all_months(start_year: int = START_YEAR, start_month: int = START_MONTH):
    """Process all monthly data"""
    current = datetime.now()
    end_year = END_YEAR or current.year
    end_month = END_MONTH or current.month
    all_monthly_data = []
    
    year, month = start_year, start_month
    while True:
        # Check if it exceeds the end time
        if year > end_year or (year == end_year and month > end_month):
            break
        # Check if it exceeds the current time
        if datetime(year, month, 1) > current:
            break
            
        logging.info(f"Processing data for {year}-{month}")
        monthly_data = await get_month_data(year, month)
        
        if monthly_data is not None:
            all_monthly_data.append(monthly_data)
            
        # Update year and month
        month += 1
        if month > 12:
            month = 1
            year += 1
    
    if all_monthly_data:
        final_data = pd.concat(all_monthly_data)
        final_data = final_data[~final_data.index.duplicated(keep='last')]
        final_data.sort_index(inplace=True)
        
        output_file = os.path.join(PROCESSED_DATA_DIR, 'solar_weather_data_5min.csv')
        final_data.to_csv(output_file)
        logging.info(f"All data processed and saved to {output_file}")
        
        return final_data
    return None

async def main():
    logging.info(f"Start processing data at {datetime.now()}")
    logging.info(f"Processing data from {START_YEAR}-{START_MONTH} to " + 
                f"{END_YEAR or 'current'}-{END_MONTH or 'current'}")
    
    # Clean expired cache
    DataCache.clean_expired_cache()
    
    data = await process_all_months()  # Use default values from configuration
    if data is not None:
        logging.info(f"\nFinal data shape: {data.shape}")
        logging.info(f"Data points: {len(data)}")
        logging.info("\nData statistics:")
        logging.info(f"\n{data.describe()}")
        
        # Clean expired cache after processing is complete
        DataCache.clean_expired_cache()
    else:
        logging.error("Failed to process data")
await main()
if __name__ == "__main__":
    asyncio.run(main())

2024-12-13 00:31:39,092 - INFO - Start processing data at 2024-12-13 00:31:39.092586
2024-12-13 00:31:39,092 - INFO - Processing data from 2023-11 to 2024-4
2024-12-13 00:31:39,138 - INFO - Processing data for 2023-11
  obj, end = self.scan_once(s, idx)
2024-12-13 00:31:39,425 - INFO - Month 2023-11 data saved to temp_data\temp_data_202311.csv
2024-12-13 00:31:39,425 - INFO - Processing data for 2023-12
2024-12-13 00:31:39,550 - ERROR - Data validation failed for 2023-12-19
2024-12-13 00:31:39,559 - ERROR - Data validation failed for 2023-12-20
2024-12-13 00:31:39,559 - ERROR - Data validation failed for 2023-12-21
2024-12-13 00:31:39,567 - ERROR - Data validation failed for 2023-12-22
2024-12-13 00:31:39,571 - ERROR - Data validation failed for 2023-12-23
2024-12-13 00:31:39,650 - INFO - Month 2023-12 data saved to temp_data\temp_data_202312.csv
2024-12-13 00:31:39,650 - INFO - Processing data for 2024-1
2024-12-13 00:31:39,884 - INFO - Month 2024-1 data saved to temp_data\temp_data_2

RuntimeError: asyncio.run() cannot be called from a running event loop

In [None]:
#The RuntimeError is beacause of the characteristic of asyncio. The data file can still be generated.