![title](title-card.png)

### Prepare Workspace

In [1]:
# Load system libraries
import os
import time
import requests
import shutil

# Load web-scraping libraries
from selenium import webdriver
from selenium.webdriver.chrome.service import Service as ChromeService
from webdriver_manager.chrome import ChromeDriverManager
from selenium.webdriver.support.ui import WebDriverWait
from selenium.webdriver.support import expected_conditions as EC
from selenium.webdriver.chrome.options import Options
from selenium.webdriver.common.by import By
from selenium.common.exceptions import TimeoutException, NoSuchElementException

# Load data manipulation libraries
import numpy as np
import pandas as pd

# Load image manipulation libraries
from PIL import Image
from scipy.ndimage import label
from scipy.spatial.distance import cdist
import rasterio

# Load geospatial libraries
import geopandas as gpd
from shapely.geometry import Point, Polygon, MultiPoint

# Load machine learning libraries
from sklearn.cluster import DBSCAN

# Load email libraries
import yagmail

# Set up Chrome options
chrome_options = Options()
chrome_options.add_argument("--headless")

### Scrape ECMWF Tropical Storm Forecasts

In [2]:
# Define function to download images from ECMWF website
def download_images(storm_type):

    # Path to the directory where you want to save the images
    save_dir = "temp/" + storm_type
    
    # Create the directory if it doesn't exist
    if not os.path.exists(save_dir):
        os.makedirs(save_dir)
    
    # URL of the website
    url = "https://charts.ecmwf.int/products/medium-tc-genesis"
    
    # XPaths for the relevant interactions
    dimensions_xpath = '//span[contains(text(), "Select dimensions")]'
    dropdown_xpath = '//div[@id="mui-component-select-layer_name"]'
    storm_type_xpath_template = '//li[@class="MuiButtonBase-root MuiListItem-root MuiMenuItem-root MuiMenuItem-gutters MuiListItem-gutters MuiListItem-button" and @data-value="{}"]'
    close_xpath = '//span[@class="MuiButton-label" and text()="Close"]'
    image_xpath_template = '//*[@id="root"]/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[2]/img[{}]'
    next_button_xpath = '//*[@id="root"]/div/div/div/div[3]/div/div[2]/div[1]/div/div/div/div[3]/div[2]/div/button[4]'
    
    # Initialize the webdriver
    driver = webdriver.Chrome(service=ChromeService(ChromeDriverManager().install()), options=chrome_options)
    driver.get(url)
    
    # Wait for the page to load
    time.sleep(1)

    if storm_type != 'genesis_ts':
        try:

            # Wait for the dimensions button to be clickable and click it
            select_dimensions_button = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.XPATH, dimensions_xpath)))
            select_dimensions_button.click()
        
            # Wait for the dropdown to be clickable and click it
            dropdown = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.XPATH, dropdown_xpath)))
            dropdown.click()
        
            # Find and click on the storm type option
            storm_type_xpath = storm_type_xpath_template.format(storm_type)
            storm_type_element = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.XPATH, storm_type_xpath)))
            storm_type_element.click()
        
            # Wait for the close button to be clickable and click it
            close_button = WebDriverWait(driver, 10).until(EC.element_to_be_clickable((By.XPATH, close_xpath)))
            close_button.click()
    
            # Wait for the page to load after selecting options
            WebDriverWait(driver, 10).until(EC.visibility_of_element_located((By.XPATH, image_xpath_template.format(1))))
    
        except (TimeoutException, NoSuchElementException) as e:
            print(f"An error occurred: {e}")
            driver.quit()
            raise
    
    # Loop to download each image
    for i in range(1, 10):  # Assuming there are 9 images
        try:
            
            # Construct the XPath for the current image
            image_xpath = image_xpath_template.format(i)
    
            # Wait for the image element to be visible
            image_element = WebDriverWait(driver, 10).until(EC.visibility_of_element_located((By.XPATH, image_xpath)))
            
            # Get the image source URL
            image_url = image_element.get_attribute("src")
            
            # Get the alt attribute of the image
            alt_text = image_element.get_attribute("alt")
            
            # Create a filename using the alt text
            image_filename = os.path.join(save_dir, f"{alt_text}.png")
            
            # Download the image
            image_path = os.path.join(save_dir, f"{alt_text}.png")
            with open(image_path, "wb") as f:
                f.write(requests.get(image_url).content)
            
            # Click the next button
            next_button = driver.find_element(By.XPATH, next_button_xpath)
            next_button.click()
            
            # Wait for the page to load
            time.sleep(1)

        except (TimeoutException, NoSuchElementException) as e:
            print(f"An error occurred while processing image {i}: {e}")
            break
        
    # Close the webdriver
    driver.quit()

# Define function to get image paths
def get_image_paths(storm_type):
    folder_path = 'temp/' + storm_type

    # Get list of all files and directories in the folder
    all_items = os.listdir(folder_path)
    
    # Filter out directories, only keeping files
    files = [f for f in all_items if os.path.isfile(os.path.join(folder_path, f))]
    
    return files

### Detect Strike Probabilities

In [3]:
# Define function to convert pixel indices to geospatial coordinates
def pixel_to_geospatial(row, col, transform):
    x, y = rasterio.transform.xy(transform, row, col)
    return x, y

# Define function to check colour similarity
def is_similar_color(color1, color2, tolerance):
    return np.linalg.norm(np.array(color1) - np.array(color2)) < tolerance

# Define function to cluster points
def cluster_points(geo_df):
    if len(geo_df) > 0:
        
        # Convert spatial points into list
        points = geo_df.geometry.apply(lambda geom: (geom.x, geom.y)).tolist()
        
        # Set the minimum distance
        min_distance = 33392.607
        
        # Convert the GeoDataFrame points to a numpy array
        points = np.array(geo_df.geometry.apply(lambda geom: (geom.x, geom.y)).tolist())
        
        # Define the epsilon parameter as ten times the minimum distance
        epsilon = 5 * min_distance
        
        # Initialize DBSCAN clustering algorithm
        dbscan = DBSCAN(eps=epsilon, min_samples=1)
        
        # Fit DBSCAN to the points
        dbscan.fit(points)
        
        # Get labels assigned by DBSCAN
        labels = dbscan.labels_
        
        # Add cluster labels to the GeoDataFrame
        geo_df['cluster'] = labels
        
        # Count the number of points in each cluster
        cluster_counts = geo_df['cluster'].value_counts()
        
        # Get the cluster IDs with at least four points
        valid_clusters = cluster_counts[cluster_counts >= 4].index
        
        # Filter out points in clusters with less than four points
        geo_df = geo_df[geo_df['cluster'].isin(valid_clusters)]

        return geo_df

# Create function to detect strike probabilities from downloaded images
def detect_strike_probability(storm_type, date):

    # Loop through strike probabilities using associated colours and colour tolerance
    strike_probabilities = {0.1: (255,0,255,170), 0.2: (255,78,0,120), 0.3: (255,176,0,90), 0.4: (255,255,0,70), 0.5: (40,255,0,150),
                            0.6: (0,140,47,50), 0.7: (2,255,255,100), 0.8: (2,127,254,50), 0.9: (0,0,255,50), 1.0: (122,16,178,20)}
    
    # Load the original map image
    original_image = Image.open('temp/' + storm_type + '/' + date + '.png')
    
    # Convert the original image to numpy array
    original_array = np.array(original_image)
    
    # Load the georeferenced TIF
    georef_tif = 'ref_maps/georeferenced_map.tif'
    dataset = rasterio.open(georef_tif)
    
    # Get the transform from the georeferenced TIF
    transform = dataset.transform
    
    # Initialize an empty list to collect GeoDataFrames
    geo_df_list = []
    
    # Loop through strike probabilities
    for strike_prob in strike_probabilities.keys():
        
        # Define the strike probability target color and tolerance
        target_color = strike_probabilities[strike_prob][:3]
        color_tolerance = strike_probabilities[strike_prob][3]
        
        # Find the pixels that are similar to the target color
        matching_pixels = np.zeros((original_array.shape[0], original_array.shape[1]), dtype=bool)
        for row in range(original_array.shape[0]):
            for col in range(original_array.shape[1]):
                pixel_color = original_array[row, col, :3]
                if is_similar_color(pixel_color, target_color, color_tolerance):
                    matching_pixels[row, col] = True
        
        # Find the indices of the matching pixels
        pixel_indices = np.argwhere(matching_pixels)
        
        # Convert pixel indices to geospatial coordinates
        geospatial_points = [pixel_to_geospatial(row, col, transform) for row, col in pixel_indices]
        
        # Create a GeoDataFrame from the geospatial points
        geometry = [Point(x, y) for x, y in geospatial_points]
        geo_df_strike = gpd.GeoDataFrame(geometry=geometry, crs=dataset.crs)
        
        # Identify storm type and strike probability
        geo_df_strike['type'] = storm_type
        geo_df_strike['strike_probability'] = strike_prob
    
        # Append the current GeoDataFrame to the list
        geo_df_list.append(geo_df_strike)
    
    # Concatenate all GeoDataFrames in the list into a single GeoDataFrame
    geo_df = gpd.GeoDataFrame(pd.concat(geo_df_list, ignore_index=True), crs=dataset.crs)
    
    # Remove overlapping geometries by prioritising those with highest strike probability
    geo_df = geo_df.sort_values(by=['geometry', 'strike_probability'], ascending=[True, False])
    geo_df = geo_df.drop_duplicates(subset='geometry', keep='first')
    geo_df = geo_df.reset_index(drop=True)

    # Cluster points
    geo_df = cluster_points(geo_df)

    return geo_df

### Compare Storm Clusters With Population Map

In [4]:
# Define function to load population map
def load_pop_map():
    # Open the raster file
    file_path = 'ref_maps/gpw_v4_population_count_rev11_2020_2pt5_min.tif'
    with rasterio.open(file_path) as src:
        # Read the raster data
        pop_map = src.read(1)  # Read the first band
        # Read the nodata value from metadata
        nodata_value = src.nodata
        # Get metadata
        metadata = src.meta
        # Get the coordinate reference system (CRS) of the raster
        raster_crs = src.crs
    
    # Mask missing data from population map
    missing_data_mask = np.isclose(pop_map, nodata_value)
    
    # Remove missing values from pop_map
    pop_map = np.where(missing_data_mask, np.nan, pop_map)

    return pop_map, src

# Define function to load boundaries map
def load_boundaries_map():
    
    # Read boundaries shapefile
    boundaries = gpd.read_file('ref_maps/world-administrative-boundaries/world-administrative-boundaries.shp')
    
    # Reproject data to EPSG:4326
    boundaries = boundaries.to_crs(epsg=4326)
    
    return boundaries

# Define function to calculate expected impact
def caclulate_impact(geo_df):

    # Reproject data to EPSG:4326
    geo_df = geo_df.to_crs(epsg=4326)
    
    # Initialise dataframe
    df = pd.DataFrame({'regions': [], 'expected_impact': [], 'date': []})
    
    # Loop through storm clusters
    for storm in geo_df['cluster'].unique():
    
        # Extract pixel values corresponding to points
        storm_points = geo_df[geo_df['cluster'] == storm]
        pixel_values = []
        pixel_strike_prob = []
        for index, point in storm_points.iterrows():
            row, col = src.index(point.geometry.x, point.geometry.y)
            if not np.isnan(pop_map[row, col]):
                pixel_values.append(pop_map[row, col])
                pixel_strike_prob.append(point['strike_probability'])
        
        # Get unique pixel values affected by storm
        affected_pixels = np.unique(pixel_values)
    
        # Find which administrative regions are impacted by the storm
        join_gdf = gpd.sjoin(storm_points, boundaries, how="inner", predicate="within")
    
        # Extract the unique administrative regions
        unique_admin_regions = join_gdf['name'].unique()
    
        # Calculate expected impact
        expected_impact = sum([pop * strike_prob for pop, strike_prob in zip(affected_pixels, pixel_strike_prob)])
    
        # Create a new DataFrame for the current storm
        new_row = pd.DataFrame({'regions': [unique_admin_regions], 'expected_impact': [expected_impact], 'date': [date]})
    
        # Concatenate the new row to the existing DataFrame
        df = pd.concat([df, new_row], ignore_index=True)
    
        return df

### Execute Tool

In [5]:
from datetime import datetime
print(datetime.now().time().strftime("%H:%M:%S")) ### TO DELETE

# Load reference maps
pop_map, src = load_pop_map()
boundaries = load_boundaries_map()

# Initialise final dataframe
df_all = pd.DataFrame({'regions': [], 'expected_impact': [], 'date': [], 'storm_type': []})

# Delete temp folder if it exists
if os.path.exists('temp') and os.path.isdir('temp'):
    # Delete the folder and all its contents
    shutil.rmtree('temp')

# Loop through storm types
storm_type_options = {'genesis_ts': 'tropical storm', 'genesis_td': 'tropical cyclone', 'genesis_hr': 'hurricane'}
for storm_type in storm_type_options.keys():
    
    # Print progress
    print(f"Downloading {storm_type_options[storm_type]} images...")

    # Download images from ECMWF website
    download_images(storm_type)

    # Loop through downloaded images
    for file in get_image_paths(storm_type):

        # Get date from file name
        date = file[:-4]
        
        # Print progress
        print(f"Detecting {storm_type_options[storm_type]} strike probabilities for {date}")

        # Detect strike probabilities for selected date
        geo_df = detect_strike_probability(storm_type, date)

        # Compare storm clusters with population map
        if geo_df is not None:
            df = caclulate_impact(geo_df)
            df['storm_type'] = storm_type_options[storm_type]
    
            # Combine results into single dataframe
            df_all = pd.concat([df_all, df], ignore_index=True)

    print(datetime.now().time().strftime("%H:%M:%S")) ### TO DELETE
    
    # Print break
    print('---------------------------------------------------------------------------------')

print(datetime.now().time().strftime("%H:%M:%S")) ### TO DELETE

21:01:07
Downloading tropical storm images...
Detecting tropical storm strike probabilities for Sat 25 May 2024 12 UTC (T+96)
Detecting tropical storm strike probabilities for Sun 26 May 2024 12 UTC (T+120)
Detecting tropical storm strike probabilities for Sat 01 Jun 2024 12 UTC (T+264)
Detecting tropical storm strike probabilities for Wed 29 May 2024 12 UTC (T+192)
Detecting tropical storm strike probabilities for Tue 28 May 2024 12 UTC (T+168)
Detecting tropical storm strike probabilities for Thu 30 May 2024 12 UTC (T+216)
Detecting tropical storm strike probabilities for Fri 31 May 2024 12 UTC (T+240)
Detecting tropical storm strike probabilities for Fri 24 May 2024 12 UTC (T+72)
Detecting tropical storm strike probabilities for Mon 27 May 2024 12 UTC (T+144)
21:04:20
---------------------------------------------------------------------------------
Downloading tropical cyclone images...
Detecting tropical cyclone strike probabilities for Sat 25 May 2024 12 UTC (T+96)
Detecting tropi

In [10]:
df_all.sort_values('expected_impact', ascending=False)

Unnamed: 0,regions,expected_impact,date,storm_type
4,"[Myanmar, India, Bangladesh]",1025611.0,Tue 28 May 2024 12 UTC (T+168),tropical storm
3,"[India, Bangladesh]",999015.6,Wed 29 May 2024 12 UTC (T+192),tropical storm
8,"[Myanmar, India, Bangladesh]",968175.5,Mon 27 May 2024 12 UTC (T+144),tropical storm
1,"[Myanmar, India, Bangladesh]",683014.0,Sun 26 May 2024 12 UTC (T+120),tropical storm
11,"[Venezuela, Colombia, Nicaragua, Costa Rica, P...",94187.65,Sat 01 Jun 2024 12 UTC (T+264),tropical cyclone
15,"[Venezuela, Colombia, Nicaragua, Costa Rica, P...",68798.27,Fri 31 May 2024 12 UTC (T+240),tropical cyclone
13,"[Venezuela, Colombia, Panama]",54473.57,Tue 28 May 2024 12 UTC (T+168),tropical cyclone
6,[Japan],41025.47,Fri 31 May 2024 12 UTC (T+240),tropical storm
0,[Madagascar],6703.489,Sat 25 May 2024 12 UTC (T+96),tropical storm
2,[Japan],3346.325,Sat 01 Jun 2024 12 UTC (T+264),tropical storm


In [17]:
FROM = 'stormspyder.alerts@gmail.com'
TO = ["jessicakristenr@gmail.com"]
SUBJECT = "!!! ALERT: TROPICAL STORM APPROACHING INDIA AND BANGLADESH, 600K ESTIMATED IMPACTED"
TEXT = "This message was sent with Python's smtplib."

# Prepare actual message
message = """\
From: %s
To: %s
Subject: %s

%s
""" % (FROM, ", ".join(TO), SUBJECT, TEXT)


yag = yagmail.SMTP(FROM, 'kmgy vpdd ifwa ljzi')
yag.send(TO, SUBJECT, message)

{}