In [None]:
import os
import sys
from datetime import datetime
import logging
from pathlib import Path
from pybufrkit.decoder import Decoder
import json


SRC_PATH = Path.cwd() / 'src'
if str(SRC_PATH) not in sys.path:
    sys.path.insert(0, str(SRC_PATH))

from climada.hazard import Centroids, TropCyclone,TCTracks
from climada.hazard.tc_tracks_forecast import TCForecast
from typhoonmodel.utility_fun import track_data_clean, Check_for_active_typhoon, Sendemail, ucl_data, plot_intensity, initialize

from typhoonmodel.utility_fun import Rainfall_data

from typhoonmodel.utility_fun.forecast_process import Forecast

decoder = Decoder()
from climada.util import coordinates  
from typhoonmodel.utility_fun.settings import *
print(ecmwf_remote_directory)
from typhoonmodel.utility_fun.dynamicDataDb import DatabaseManager

from climada.util import coordinates 

initialize.setup_logger()
logger = logging.getLogger(__name__)



In [None]:
# folde for storing the data is created if it does not exist

# Base forecast directory
base_path = Path("forecast")

# Define subfolders
subfolders = [
    base_path / "Input" / "ECMWF",
    base_path / "Output",
    base_path / "rainfall"
]

# Create all folders if they don't exist
for folder in subfolders:
    folder.mkdir(parents=True, exist_ok=True)

print("Folder structure created or already exists:")
for folder in subfolders:
    print(f" - {folder}")


In [None]:
initialize.setup_cartopy()
start_time = datetime.now()
############## Defult variables which will be updated if a typhoon is active 
print('---------------------AUTOMATION SCRIPT STARTED---------------------------------')
print(str(start_time))

for countryCodeISO3 in countryCodes:
    logger.info(f"running piepline for {countryCodeISO3}")  
    admin_level=SETTINGS_SECRET[countryCodeISO3]["admin_level"]
    mock=SETTINGS_SECRET[countryCodeISO3]["mock"]
    mock_nontrigger_typhoon_event=SETTINGS_SECRET[countryCodeISO3]["mock_nontrigger_typhoon_event"]
    mock_trigger_typhoon_event=SETTINGS_SECRET[countryCodeISO3]["mock_trigger_typhoon_event"]
    mock_trigger=SETTINGS_SECRET[countryCodeISO3]["if_mock_trigger"]

    # Check if the data folder exists
    if not os.path.exists("./data"):
        logger.info("Data folder not found. Downloading...")

        # Initialize database manager
        db = DatabaseManager(countryCodeISO3, admin_level)
        filename = 'data.zip'
        
        # Download the data zip
        db.getDataFromDatalake(filename)
        logger.info("Finished data download and extraction.")
    else:
        logger.info("Data folder already exists. Skipping download.")
    
    

In [None]:
###############################################################
####  check setting for mock data 
if mock:
    if mock_trigger:
        typhoon_names=mock_trigger_typhoon_event
    else:
        typhoon_names=mock_nontrigger_typhoon_event                
    logger.info(f"mock piepline for typhoon{typhoon_names}")
    db = DatabaseManager(countryCodeISO3,admin_level)
    json_path = mock_data_path  + typhoon_names             
    db.uploadTrackData(json_path)            
    db.uploadTyphoonData(json_path)
    #db.sendNotificationTyphoon()
    db.processEvents()


else:
    fc = Forecast(countryCodeISO3, admin_level)

    if fc.Activetyphoon: #if it is not empty   
        for typhoon_names in fc.Activetyphoon:
            # upload data
            json_path = fc.Output_folder  + typhoon_names  
            EAP_TRIGGERED_bool=fc.eap_status_bool[typhoon_names]
            EAP_TRIGGERED=fc.eap_status[typhoon_names]                   
            fc.db.uploadTrackData(json_path)            
            fc.db.uploadTyphoonData(json_path) 
            fc.db.postResulToDatalake()
            fc.db.processEvents()
            #fc.db.sendNotificationTyphoon() 
            

    #if there is no active typhoon 
    else: #
        logger.info('no active Typhoon')
        df_total_upload=fc.pcode #data frame with pcodes 
        typhoon_names='null'
        df_total_upload['forecast_severity']=0
        df_total_upload['forecast_trigger']=0
        df_total_upload['affected_population']=0  
        df_total_upload['houses_affected']=0      
                       
        for layer in ["affected_population","houses_affected","forecast_severity","forecast_trigger"]:
            exposure_entry=[]
            # prepare layer
            logger.info(f"preparing data for {layer}")
            exposure_data = {'countryCodeISO3': countryCodeISO3}
 
            exposure_place_codes = []
            #### change the data frame here to include impact
            for ix, row in df_total_upload.iterrows():
                exposure_entry = {"placeCode": row["adm3_pcode"],
                                    "amount": row[layer]}
                exposure_place_codes.append(exposure_entry)
                
            exposure_data["exposurePlaceCodes"] = exposure_place_codes
            exposure_data["adminLevel"] = admin_level
            exposure_data["leadTime"] = '72-hour'
            exposure_data["dynamicIndicator"] = layer
            exposure_data["disasterType"] = "typhoon"
            exposure_data["eventName"] = typhoon_names                     
            json_file_path = fc.Output_folder  + typhoon_names+ f'_{layer}' + '.json'
            with open(json_file_path, 'w') as fp:
                json.dump(exposure_data, fp)
            
        #upload typhoon data      
        json_path = fc.Output_folder
        fc.db.uploadTyphoonData_no_event(json_path)  
        fc.db.processEvents()
                    

print('---------------------AUTOMATION SCRIPT FINISHED---------------------------------')
print(str(datetime.now()))