Import necessary libs

In [1]:
import os
import re
import json
import requests
import numpy as np
import pandas as pd
import province_mapper

from datetime import datetime
from bs4 import BeautifulSoup

print('[+] Successful setup...')

[+] Successful setup...


Defining global variables

In [2]:
PROVINCES_COUNT: int = 27
DUMP_FOLDER: str = 'vhi_dump'
PROVINCES_MAP: dict = province_mapper.get_mapped_provinces()

print(json.dumps(PROVINCES_MAP, indent=4))

{
    "1": {
        "name": "Cherkasy Region",
        "id": 24
    },
    "2": {
        "name": "Chernihiv Region",
        "id": 25
    },
    "3": {
        "name": "Chernivtsi Region",
        "id": 26
    },
    "4": {
        "name": "Republic of Crimea Region",
        "id": 1
    },
    "5": {
        "name": "Dnipropetrovsk Region",
        "id": 4
    },
    "6": {
        "name": "Donetsk Region",
        "id": 5
    },
    "7": {
        "name": "Ivano-Frankivsk Region",
        "id": 9
    },
    "8": {
        "name": "Kharkiv Region",
        "id": 21
    },
    "9": {
        "name": "Kherson Region",
        "id": 22
    },
    "10": {
        "name": "Khmelnytskyi Region",
        "id": 23
    },
    "11": {
        "name": "Kyiv Region",
        "id": 10
    },
    "12": {
        "name": "Kyiv City",
        "id": 11
    },
    "13": {
        "name": "Kirovohrad Region",
        "id": 12
    },
    "14": {
        "name": "Luhansk Region",
        "id": 13
    },

Init destination folder

In [3]:
def folder_init(folder_path: str, is_silent = False) -> None:
    """
    Creates folder if it doesn't exist 
    :param folder_path: Path to the folder
    :param is_silent: If true, will hide print about existing folder!
    :return: None
    """
    
    if not os.path.exists(folder_path):
        os.makedirs(folder_path)
        print(f'[+] Created folder: {folder_path}')
    else:
        if not is_silent: 
            print(f'[x] Folder {folder_path} already exists!')
    

File downloader (download handler)

In [4]:
def fetch(province_id: int, start_year: int, end_year: int) -> None:
    """
    Fetches mean data from NOAA 
    :param province_id: Province ID based on the NOAA mapping
    :param start_year: The starting year for the requested time series data Format: (YYYY)
    :param end_year: The ending year for the requested time series data. Format: (YYYY)
    :return: None
    """
    
    if not validate_id(province_id):
        print('[!] Invalid province ID')
        return
    
    mapped_province_id = PROVINCES_MAP[province_id]["id"]
    mapped_province_name = PROVINCES_MAP[province_id]["name"]
    
    start_year, end_year = validate_years(start_year, end_year)
    
    if start_year is None or end_year is None: 
        return

    print(f'[*] Fetching data:\n\t'
          f'Province ID: {province_id} | {mapped_province_name}\n\t'
          f'Start year: {start_year}\n\t'
          f'End year: {end_year}')

    url: str = f'https://www.star.nesdis.noaa.gov/smcd/emb/vci/VH/get_TS_admin.php?country=UKR&provinceID={province_id}&year1={start_year}&year2={end_year}&type=Mean'
    
    try:
        req = requests.get(url)
        
        if req.status_code != 200:
            print('[!] Request failed!')
            return
        
        # clearing response from html tags
        soup = BeautifulSoup(req.text, "html.parser")
        clear_data = soup.get_text()
        
        # assembling the file name
        timestamp = datetime.now().strftime('%Y-%m-%dT%H-%M')
        filename = f'vhi_{mapped_province_id}_{timestamp}.csv'
        
        # Deleting last comma in the data series that makes useless shift
        clear_data = re.sub(r',\s*$', '', clear_data, flags=re.MULTILINE)
        
        # changing province id based on the real mapping
        clear_data = re.sub(r"Province= \d+:", f"Province= {mapped_province_id}:", clear_data)
        print(f'[!] Province ID was changed to the real mapping: {province_id} (NOAA) -> {mapped_province_id} (UKR oblast centers mapping)')
        
        save(directory=DUMP_FOLDER, filename=filename, data=clear_data)
        #print(clear_data)
    
    except Exception as err:
        raise Exception(f'[!] Error happened: {err}')


Function for years validation 

In [5]:
def validate_years(start_year: int, end_year: int) -> (int, int):
    """
    Checks if start_year and end_year are within the correct range
    :param start_year: The starting year for the requested time series data Format: (YYYY)
    :param end_year: The ending year for the requested time series data. Format: (YYYY). Set end_year to 9999 to validate only start_year for the format.
    :return: A tuple of two integers if start_year and end_year passed all checks. 
    """
    current_year:int = datetime.today().year
    year_pattern = r'^\d{4}$'
    
    if not re.match(year_pattern, str(start_year)) or not re.match(year_pattern, str(end_year)):
        print(f'[!] Invalid year format. Only allowed one is: YYYY (4 integers)')
        return None, None
    
    if end_year == 9999 and start_year:
        print(start_year)
        return start_year
    
    if start_year < 1982:
        print(f'[!] Start year should be greater or equal to 1982!')
        return None, None
    
    if end_year >= current_year:
        print(f'[!] For full data coverage, end_year({end_year}) should be less than current_year({current_year})!')
        return None, None
    
    if start_year > end_year:
        print(f'[!] start_year({start_year}) cannot be greater than end_year({end_year})')
        return None, None
    
    print(f'[+] Successful years range validation!')
    return start_year, end_year
    

Function for ID validation

In [6]:
def validate_id(province_id: int) -> bool:
    """
    Checks if given province_id is in the mapped dictionary
    :param province_id: Province ID based on the NOAA mapping
    :return: Bool 
    """
    
    if province_id not in PROVINCES_MAP.keys():
        return False
    
    print(f'[+] Successful ID validation!')
    return True
    

File saver

In [7]:
def save(directory: str, filename: str, data: str) -> None:
    """
    Saves given data to the file in the specific dir
    :param directory: Directory where the data is saved.
    :param filename: Name of the file to save the data to.
    :param data: Data to save.
    :return: 
    """
    
    folder_init(directory, is_silent=True)
    
    with open(f'{directory}/{filename}', 'w') as file:
        file.write(data)
        print(f'[+] Saved {filename} to {directory}/')
        
        file.close()
    

Bulk file downloader

In [8]:
def fetch_bulk(start_id: int, end_id: int, start_year: int, end_year: int) -> None:
    """
    Using fetch() to bulk-download VHI data for the specific range of regions
    :param start_id: The starting province(region) ID (NOAA list) to download
    :param end_id: The ending province(region) ID (NOAA list) to download
    :param start_year: The starting year for the requested time series data Format: (YYYY)
    :param end_year: The ending year for the requested time series data. Format: (YYYY)
    :return: None
    """
    
    for i in range(start_id, end_id + 1):
        print(f'[*] Fetch order: {i}')
        try: 
            fetch(i, start_year, end_year)
        except Exception as err:
            print(f'[!] Error fetching: {err}')


Essentially, clears given directory (dump_folder, most of the time)

In [9]:
def clear_dump_folder(directory: str) -> None:
    """
    Cleans dump folder
    :param directory: Path to the folder
    :return: None
    """
    
    if not os.path.exists(directory):
        print(f'[!] Directory does not exist!')
        return
    
    for file in os.listdir(directory):
        file_path = os.path.join(directory, file)
        try:
            if os.path.isfile(file_path) or os.path.islink(file_path):
                os.unlink(file_path)
                print(f'[*] Deleted file: {file_path}')
        except Exception as err:
             print(f"[!] Failed to delete {file_path}. Reason: {err}")


Function that creates dataframe from the `.csv` files in the given directory

In [10]:
def get_dataframe(directory: str) -> pd.DataFrame | None:
    """
    Creates dataframe from the files in the directory
    :param directory: Path to the directory
    :return: Pandas dataframe
    """
    
    headers: list = ['Year', 'Week', 'SMN', 'SMT', 'VCI', 'TCI', 'VHI']
    dataframes: list = []
    
    if not os.path.exists(directory):
        print(f'[!] Directory does not exist!')
        return
    
    for file in os.listdir(directory):
        if not file.endswith('.csv'):
            print(f'[!] File ({file}) is not a .csv. Skipping it!')
            continue
        file_path = os.path.join(directory, file)
        
        try:
            df = pd.read_csv(file_path, header=1, names=headers, skiprows=1)
            province_id = int(file.split('_')[1])
            df.insert(0, 'PID', province_id, True)
            dataframes.append(df)
        except Exception as err:
            print(f'[!] Failed to read {err}')
    
    if not dataframes:
        raise Exception('No files found!')
        
    return pd.concat(dataframes, ignore_index=True) 
    

Function to get province name by its id (real mapping)

In [11]:
def get_province(province_id: int) -> str | None:
    """
    Returns the province name 
    :param province_id: Province ID based on the real mapping
    :return: string
    """
    
    for region_key, region_info in PROVINCES_MAP.items():
        if region_info.get('id') == province_id:
            return region_info.get('name')
    
    return

 Main entry point for the script. I love main(), even in the py notebook.... yes

In [12]:
# clearing dump folder
clear_dump_folder(DUMP_FOLDER)
    
# bulk fetch for provinces from 1 to 4 (NOAA id list)
fetch_bulk(1, PROVINCES_COUNT, 1982, 2023)
    
# collecting files into the pandas dataframe
df = get_dataframe(DUMP_FOLDER)

[*] Deleted file: vhi_dump/vhi_21_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_19_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_6_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_3_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_13_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_24_2024-04-17T12-41.csv
[*] Deleted file: vhi_dump/vhi_16_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_9_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_12_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_8_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_17_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_25_2024-04-17T12-41.csv
[*] Deleted file: vhi_dump/vhi_7_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_18_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_2_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_23_2024-04-17T12-42.csv
[*] Deleted file: vhi_dump/vhi_1_2024-04-17T12-41.csv
[*] Deleted file: vhi_dump/vhi_14_2024-04-17T12-42.csv
[*] Deleted file:

In [13]:
df

Unnamed: 0,PID,Year,Week,SMN,SMT,VCI,TCI,VHI
0,7,1982,2,0.059,255.61,16.96,85.31,51.13
1,7,1982,3,0.060,258.56,16.90,74.56,45.73
2,7,1982,4,0.060,261.17,16.45,63.52,39.98
3,7,1982,5,0.061,263.92,15.86,53.34,34.60
4,7,1982,6,0.061,266.49,14.98,47.33,31.15
...,...,...,...,...,...,...,...,...
56753,26,2023,48,0.146,268.58,63.65,43.93,53.79
56754,26,2023,49,0.137,267.07,64.58,42.78,53.68
56755,26,2023,50,0.136,268.02,70.72,31.94,51.33
56756,26,2023,51,0.132,269.29,76.10,21.63,48.87


Function that returns VHI series for sepecific region within specific year 

In [14]:
def get_vhi(dataframe: pd.DataFrame, province_id: int, year: int) -> pd.DataFrame | None:
    """
    Returns VHI series for the specified region and year.
    
    :param dataframe: Pandas DataFrame containing the data.
    :param province_id: Region ID as per 'PID' column in the DataFrame.
    :param year: Year as integer to filter the VHI series.
    :return: Pandas Series containing VHI values for the specified PID and year.
    """
    
    if province_id not in dataframe['PID'].unique():
        print(f"[!] PID {province_id} not found in the DataFrame.")
        return None

    if year not in df['Year'].unique():
        print(f"[!] Year {year} not found in the DataFrame.")
        return None
    
    print(f' VHI data series for {province_id=} and {year=}')
    
    temp = dataframe[(dataframe['PID'] == province_id) & (dataframe['Year'] == year)]
    
    return temp['VHI'].reset_index(drop=True)
    

In [15]:
get_vhi(df, 1, 2022)

 VHI data series for province_id=1 and year=2022


0     44.28
1     45.61
2     44.44
3     40.63
4     40.24
5     40.19
6     41.37
7     41.80
8     41.67
9     42.30
10    43.03
11    43.30
12    43.96
13    44.50
14    45.52
15    48.08
16    53.09
17    57.25
18    60.03
19    60.43
20    59.39
21    60.43
22    60.93
23    60.22
24    60.07
25    59.63
26    59.18
27    58.56
28    58.15
29    52.95
30    51.57
31    49.29
32    46.48
33    43.41
34    40.62
35    37.58
36    36.78
37    37.88
38    39.34
39    40.95
40    41.73
41    42.07
42    40.80
43    37.01
44    34.83
45    35.12
46    35.85
47    38.75
48    40.24
49    41.19
50    42.21
51    41.77
Name: VHI, dtype: float64

In [16]:
def get_max_min_vhi(dataframe: pd.DataFrame, provinces: list, years: list) -> pd.DataFrame | None:
    """
    Returns Min VHI and Max VHI series for the specified provinces and years
    :param dataframe: Pandas DataFrame containing the data.
    :param provinces: List of provinces id (real regional mapping)
    :param years: List of years to retrieve
    :return: Pandas DataFrame or Nothing if error occurs
    """
    
    if not set(provinces).issubset(dataframe['PID'].unique()):
        print("[!] One or more specified provinces are not in the DataFrame.")
        print(f'[?] Possible PID in the given DateFrame: {suggest_values(dataframe)[0]}')
        return None

    if not set(years).issubset(dataframe['Year'].unique()):
        print("[!] One or more specified years are not in the DataFrame.")
        print(f'[?] Possible years in the given DateFrame: {suggest_values(dataframe)[1]}')
        return None
    
    result:list = []
    
    for pid in provinces: 
        for year in sorted(years):
            
            temp = dataframe[(dataframe['PID'] == pid) & (dataframe['Year'] == year)]
            
            if temp.empty:
                continue
            
            min_vhi = temp['VHI'].min()
            max_vhi = temp['VHI'].max()
            
            result.append({'Province ID': pid, 
                                    'Year': year, 
                                    'Min VHI': min_vhi, 
                                    'Max VHI': max_vhi})
    
    result = pd.DataFrame(result)
    
    return result if not result.empty else None
    
    

In [17]:
get_max_min_vhi(df, [1, 22, 24], [2019, 2022, 2014, 2017])

Unnamed: 0,Province ID,Year,Min VHI,Max VHI
0,1,2014,28.95,56.51
1,1,2017,21.17,60.38
2,1,2019,18.85,58.77
3,1,2022,34.83,60.93
4,22,2014,26.24,53.41
5,22,2017,24.72,58.49
6,22,2019,28.06,61.08
7,22,2022,35.7,54.81
8,24,2014,37.59,67.44
9,24,2017,35.58,59.5


Some util function that suggests years and province ids based on the provided dataframe 

In [18]:
def suggest_values(dataframe: pd.DataFrame) -> (list, list):
    """
    Returns the unique years and province IDs from the DataFrame
    :param dataframe: Pandas DataFrame
    :return: A tuple containing two lists of years and province ids
    """
    
    if dataframe.empty:
        print(f'[!] DataFrame is empty :/')
        return 
    
    years = dataframe['Year'].unique().tolist()
    ids = dataframe['PID'].unique().tolist()
    
    years.sort()
    ids.sort()
    
    return ids, years

Returns VHI series for the specified provinces and years

In [19]:
def get_vhi_range(dataframe: pd.DataFrame, provinces: list, years: list) -> pd.DataFrame | None:
    """
    Returns VHI series for the specified provinces and years
    :param dataframe: Pandas DataFrame containing the data.
    :param provinces: List of provinces id (real regional mapping)
    :param years: List of years to retrieve
    :return: Pandas DataFrame or Nothing if error occurs
    """
    
    if not set(provinces).issubset(dataframe['PID'].unique()):
        print("[!] One or more specified provinces are not in the DataFrame.")
        print(f'[?] Possible PID in the given DateFrame: {suggest_values(dataframe)[0]}')
        return None

    if not set(years).issubset(dataframe['Year'].unique()):
        print("[!] One or more specified years are not in the DataFrame.")
        print(f'[?] Possible years in the given DateFrame: {suggest_values(dataframe)[1]}')
        return None
    
    result: pd.DataFrame = dataframe[dataframe['PID'].isin(provinces) & dataframe['Year'].isin(years)]
    
    if result.empty:
        print("[!] Some error happened, so no data is returned.")
        return
    
    result = result[['PID', 'Year', 'Week', 'VHI']]
    
    return result
    

In [20]:
get_vhi_range(df, [1, 22, 24], [2022, 2019])

Unnamed: 0,PID,Year,Week,VHI
25936,24,2019,1,52.26
25937,24,2019,2,53.94
25938,24,2019,3,56.98
25939,24,2019,4,56.87
25940,24,2019,5,52.78
...,...,...,...,...
54518,1,2022,48,38.75
54519,1,2022,49,40.24
54520,1,2022,50,41.19
54521,1,2022,51,42.21


In [58]:
def analyse_droughts(dataframe: pd.DataFrame, vhi_min: int, vhi_max: int,  affected: int) -> pd.DataFrame | None:
    """
    Get entries from the dataframe where VHI are within given bounds (vhi_min and vhi_max) and return affected% of provinces
    
    :param dataframe: Pandas DataFrame containing the data.
    :param vhi_min: The lower VHI threshold 
    :param vhi_max: the upper VHI threshold 
    :param affected: Sets bound for top X% provinces
    :return: Padnas dataframe
    """
    
    drought_conditions = dataframe[(dataframe['VHI'] >= vhi_min) & (dataframe['VHI'] <= vhi_max)]

    # calculate the total number of entries to include (top X% lowest VHI across all entries)
    total_entries_to_include = int(np.ceil(len(drought_conditions) * affected / 100))
    
    # sort all drought conditions by VHI to find the most severe droughts
    worst_hit_entries = drought_conditions.nsmallest(total_entries_to_include, 'VHI')
    
    worst_hit_entries = worst_hit_entries.groupby('PID').apply(lambda x: x.nsmallest(1, 'VHI')).reset_index(drop=True)
    
    # map PID to province names
    worst_hit_entries['Province'] = worst_hit_entries['PID'].apply(get_province)

    # sort resulting dataframe by vhi in the ascending order
    worst_hit_entries = worst_hit_entries.sort_values(by='VHI', ascending=True).reset_index(drop=True)
    
    return worst_hit_entries[['Year', 'Province', 'VHI']]
    

In [59]:
analyse_droughts(df, 0, 15, 20)

Unnamed: 0,Year,Province,VHI
0,2007,Odesa Region,5.52
1,2007,Mykolaiv Region,5.94
2,1993,Donetsk Region,6.26
3,2000,Kyiv City,6.49
4,2000,Sevastopol Region,8.14


For severe droughts (VHI <= 20)

In [55]:
analyse_droughts(df, 20, 30, 20)

Unnamed: 0,Year,Province,VHI
0,2007,Odesa Region,20.0
1,1986,Mykolaiv Region,20.0
2,1985,Zakarpattia Region,20.01
3,2020,Zaporizhia Region,20.03
4,1986,Kirovohrad Region,20.03
5,1986,Donetsk Region,20.05
6,2019,Republic of Crimea Region,20.06
7,1993,Dnipropetrovsk Region,20.06
8,2000,Chernihiv Region,20.09
9,2019,Cherkasy Region,20.14


For moderate droughts (VHI >= 20 and <= 30)

In [64]:
def get_droughts_count(dataframe, vhi_min, vhi_max, percentage) -> pd.DataFrame:
    
    num_provinces_affected = round((percentage / 100) * len(df['PID'].unique()))
    
    droughts = dataframe[(dataframe['VHI'] >= vhi_min) & (dataframe['VHI'] <= vhi_max)]
    
    yearly_droughts = droughts.groupby(['Year', 'PID']).size().reset_index(name='count')

    years_with_extreme_droughts = yearly_droughts.groupby('Year')['PID'].agg(['count', list]).reset_index()
    
    years_affected = years_with_extreme_droughts[years_with_extreme_droughts['count'] >= num_provinces_affected]
    
    years_affected.columns = ['Year', 'Entries', 'Regions']
    
    return years_affected.reset_index(drop=True)

Test func :/

In [65]:
get_droughts_count(df, 0, 15, 5)

Unnamed: 0,Year,Entries,Regions
0,1984,1,[19]
1,1986,2,"[5, 13]"
2,1993,2,"[5, 13]"
3,1994,1,[3]
4,1999,1,[11]
5,2000,6,"[2, 10, 11, 21, 24, 27]"
6,2003,1,[22]
7,2007,5,"[1, 8, 15, 16, 22]"
8,2012,1,[1]
