In [1]:
import requests
import zipfile
import os
import geopandas as gpd
import pandas as pd
import folium
from datetime import datetime, timedelta
import pytz
from shapely.geometry import mapping

# Define the base URL for the IEM data request
base_url = 'https://mesonet.agron.iastate.edu/cgi-bin/request/gis/watchwarn.py'

# Define the start and end timestamps
start_date = '2024-05-01T00:00Z'
# end_date = datetime.utcnow().strftime('%Y-%m-%dT%H:%M:%SZ')
end_date = '2024-07-24T00:00Z'

# Define the parameters
params = {
    'accept': 'shapefile',         # Request data in shapefile format
    'sts': start_date,             # Start timestamp
    'ets': end_date,               # End timestamp
    'location_group': 'states',    # Data grouped by state
    'states': 'NY'                 # Data for New York
}

# Directory to store shapefiles
shapefile_dir = 'ny_warnings_data'

# Check if the directory exists
if os.path.exists(shapefile_dir):
    # Remove all files in the directory
    for file_name in os.listdir(shapefile_dir):
        file_path = os.path.join(shapefile_dir, file_name)
        if os.path.isfile(file_path):
            os.remove(file_path)
    print(f'Cleared existing files in the directory: {shapefile_dir}')
else:
    # Create the directory if it does not exist
    os.makedirs(shapefile_dir)
    print(f'Created directory: {shapefile_dir}')

# Download the shapefile zip file
response = requests.get(base_url, params=params)

# Check if the request was successful
if response.status_code == 200:
    # Save the response content to a file
    zip_filename = 'ny_warnings_data.zip'
    with open(zip_filename, 'wb') as file:
        file.write(response.content)
    print(f'Data successfully downloaded and saved as {zip_filename}')
    
    # Extract the downloaded zip file
    with zipfile.ZipFile(zip_filename, 'r') as zip_ref:
        # Extract all files to the specified directory
        zip_ref.extractall(shapefile_dir)
        
        # List all files in the extraction directory
        extracted_files = os.listdir(shapefile_dir)
        
        # Find the shapefile in the extracted files
        shapefile_name = None
        for file in extracted_files:
            if file.endswith('.shp'):
                shapefile_name = file
                break
        
        if shapefile_name:
            print(f'Shapefile found: {shapefile_name}')
        else:
            print('No shapefile found in the extracted files.')
        
        # Check if all necessary shapefile components are present
        shapefile_components = ['.shp', '.shx', '.dbf', '.prj']
        missing_components = [ext for ext in shapefile_components if not any(f.endswith(ext) for f in extracted_files)]
        
        if not missing_components:
            print('All necessary shapefile components are present.')
        else:
            print(f'Missing shapefile components: {missing_components}')
else:
    print(f'Failed to retrieve data. Status code: {response.status_code}')

# Define paths to shapefiles
if shapefile_name:
    nws_shapefile_path = os.path.join(shapefile_dir, shapefile_name)
else:
    raise FileNotFoundError("Shapefile not found in the extracted files.")

state_shapefile_path = '/nfs/home11/ugrad/2020/tr588861/SWRCC/State_Shapefiles/state/State.shp'
county_shapefile_path = '/nfs/home11/ugrad/2020/tr588861/SWRCC/State_Shapefiles/state/Counties.shp'

# Load the NWS data
nws_data = gpd.read_file(nws_shapefile_path)
state_boundary = gpd.read_file(state_shapefile_path)
county_boundaries = gpd.read_file(county_shapefile_path)

# Load the VTEC dictionaries from CSV files
phenomena_df = pd.read_csv('Decoder/VTEC_PHENOMENA.csv', header=None, names=['Code', 'Description'])
significance_df = pd.read_csv('Decoder/VTEC_SIGNIFICANCE.csv', header=None, names=['Code', 'Description'])
colors_df = pd.read_csv('Decoder/NWS_COLORS.csv', header=None, names=['Code', 'Color'])

# Convert CSV data to dictionaries
VTEC_PHENOMENA = dict(zip(phenomena_df['Code'], phenomena_df['Description']))
VTEC_SIGNIFICANCE = dict(zip(significance_df['Code'], significance_df['Description']))
NWS_COLORS = dict(zip(colors_df['Code'], colors_df['Color']))


import os
os.environ['USE_PYGEOS'] = '0'
import geopandas

In the next release, GeoPandas will switch to using Shapely by default, even if PyGEOS is installed. If you only have PyGEOS installed to get speed-ups, this switch should be smooth. However, if you are using PyGEOS directly (calling PyGEOS functions on geometries from GeoPandas), this will then stop working and you are encouraged to migrate from PyGEOS to Shapely 2.0 (https://shapely.readthedocs.io/en/latest/migration_pygeos.html).
  import geopandas as gpd


Shapefile found: wwa_202405010000_202407240000.shp
All necessary shapefile components are present.


In [2]:
# Function to convert UTC to Eastern Time
def convert_to_eastern(utc_time):
    from pytz import timezone
    eastern = timezone('US/Eastern')
    utc = timezone('UTC')
    utc_time = pd.to_datetime(utc_time).tz_localize('UTC')
    eastern_time = utc_time.astimezone(eastern)
    return eastern_time

# Convert time columns to Eastern Time
nws_data['ISSUED_ET'] = nws_data['ISSUED'].apply(convert_to_eastern)
nws_data['EXPIRED_ET'] = nws_data['EXPIRED'].apply(convert_to_eastern)

# Filter the data for new warnings
new_warnings = nws_data[(nws_data['STATUS'] == 'NEW') & (nws_data['GTYPE'] == 'P')]

In [3]:
# Function to create a Folium map for a specific warning type and date
def create_warning_map(warning_type, center=[43.0, -75.0], zoom=7, date=None):
    if date:
        warning_data = new_warnings[(new_warnings['PHENOM'] == warning_type) & (new_warnings['ISSUED_ET'].dt.date == date)]
    else:
        warning_data = new_warnings[new_warnings['PHENOM'] == warning_type]
        
    m = folium.Map(location=center, zoom_start=zoom)
    
    # Add state boundary to the map
    folium.GeoJson(
        state_boundary,
        style_function=lambda x: {
            'fillColor': 'none',
            'color': 'black',
            'weight': 3
        }
    ).add_to(m)
    
    # Add county boundaries to the map
    folium.GeoJson(
        county_boundaries,
        style_function=lambda x: {
            'fillColor': 'none',
            'color': 'gray',
            'weight': 0.5
        }
    ).add_to(m)
    
    for _, row in warning_data.iterrows():
        # Decode the PHENOM and SIG codes
        phenom_code = row['PHENOM']
        sig_code = row['SIG']
        phenom_desc = VTEC_PHENOMENA.get(phenom_code, phenom_code)
        sig_desc = VTEC_SIGNIFICANCE.get(sig_code, sig_code)
        
        # Get the color for the warning
        color_key = f"{phenom_code}.{sig_code}"
        color = NWS_COLORS.get(color_key, '#000000')  # Default to black if not found
        
        # Extract the polygon geometry
        geom = row['geometry']
        
        # Convert Shapely geometry to GeoJSON
        geo_json = gpd.GeoSeries([geom]).__geo_interface__
        
        # Add the polygon to the map
        folium.GeoJson(
            geo_json,
            style_function=lambda x, color=color: {
                'fillColor': color,
                'color': color,
                'weight': 2,
                'fillOpacity': 0.4
            },
            tooltip=f"{phenom_desc} {sig_desc}"
        ).add_to(m)
    
    return m

def create_combined_map(center=[43.0, -75.0], zoom=7):
    combined_map = folium.Map(location=center, zoom_start=zoom)
    
    # Add state boundary to the map
    folium.GeoJson(
        state_boundary,
        style_function=lambda x: {
            'fillColor': 'none',
            'color': 'black',
            'weight': 3
        }
    ).add_to(combined_map)
    
    # Add county boundaries to the map
    folium.GeoJson(
        county_boundaries,
        style_function=lambda x: {
            'fillColor': 'none',
            'color': 'gray',
            'weight': 0.5
        }
    ).add_to(combined_map)
    
    for warning_type in ['SV', 'TO', 'FF']:
        warning_data = new_warnings[new_warnings['PHENOM'] == warning_type]
        
        for _, row in warning_data.iterrows():
            # Decode the PHENOM and SIG codes
            phenom_code = row['PHENOM']
            sig_code = row['SIG']
            phenom_desc = VTEC_PHENOMENA.get(phenom_code, phenom_code)
            sig_desc = VTEC_SIGNIFICANCE.get(sig_code, sig_code)
            
            # Get the color for the warning
            color_key = f"{phenom_code}.{sig_code}"
            color = NWS_COLORS.get(color_key, '#000000')  # Default to black if not found
            
            # Extract the polygon geometry
            geom = row['geometry']
            
            # Convert Shapely geometry to GeoJSON
            geo_json = gpd.GeoSeries([geom]).__geo_interface__
            
            # Add the polygon to the map
            folium.GeoJson(
                geo_json,
                style_function=lambda x, color=color: {
                    'fillColor': color,
                    'color': color,
                    'weight': 2,
                    'fillOpacity': 0.4
                },
                tooltip=f"{phenom_desc} {sig_desc}"
            ).add_to(combined_map)
    
    return combined_map

# Create maps for each warning type
severe_thunderstorm_map = create_warning_map('SV')
tornado_map = create_warning_map('TO')
flash_flood_map = create_warning_map('FF')

# Create the Output_Maps directory if it doesn't exist
output_dir = 'Output_Maps'
os.makedirs(output_dir, exist_ok=True)

# Save the maps to the Output_Maps directory
severe_thunderstorm_map.save(os.path.join(output_dir, 'severe_thunderstorm_map.html'))
tornado_map.save(os.path.join(output_dir, 'tornado_map.html'))
flash_flood_map.save(os.path.join(output_dir, 'flash_flood_map.html'))

# Create the combined map with all dates
combined_map = create_combined_map()

# Save the combined map
combined_map_filename = os.path.join('Output_Maps', 'combined_map.html')
combined_map.save(combined_map_filename)

print('Maps saved successfully in the Output_Maps directory.')

Maps saved successfully in the Output_Maps directory.


In [4]:
# Reproject to UTM zone 18N (EPSG:32618)
nws_data = nws_data.to_crs(epsg=32618)
state_boundary = state_boundary.to_crs(epsg=32618)

# Spatial join to find the intersection
intersection = gpd.overlay(nws_data, state_boundary, how='intersection')

# Filter by minimum area within NY (adjust threshold as needed)
min_area_threshold = 1e6  # 1,000,000 square meters (1 square kilometer)
intersection['area'] = intersection.geometry.area
filtered_intersection = intersection[intersection['area'] >= min_area_threshold]

# Convert to DataFrame for further processing
filtered_data = pd.DataFrame(filtered_intersection.drop(columns='geometry'))

# Convert UTC to Eastern Time
filtered_data['ISSUED'] = pd.to_datetime(filtered_data['ISSUED'], format='%Y%m%d%H%M')
filtered_data['ISSUED_ET'] = filtered_data['ISSUED'].dt.tz_localize('UTC').dt.tz_convert('US/Eastern')

# Extract the date part for easier filtering
filtered_data['ISSUED_DATE_ET'] = filtered_data['ISSUED_ET'].dt.date

# Remove duplicates to keep only the 'NEW' status and 'P' type
unique_data = filtered_data[(filtered_data['STATUS'] == 'NEW') & (filtered_data['GTYPE'] == 'P')]

# Translate PHENOM and SIG columns
unique_data['PHENOM'] = unique_data['PHENOM'].map(VTEC_PHENOMENA)
unique_data['SIG'] = unique_data['SIG'].map(VTEC_SIGNIFICANCE)

# Create a new column for the combined alert type
unique_data['ALERT_TYPE'] = unique_data['PHENOM'] + ' ' + unique_data['SIG']

# Deduplicate based on the maximum area for each alert type and time
max_area_alerts = unique_data.loc[
    unique_data.groupby(['ISSUED_ET', 'ALERT_TYPE'])['area'].idxmax()
]

# Group by date and alert type and count occurrences
summary = max_area_alerts.groupby(['ISSUED_DATE_ET', 'ALERT_TYPE']).size().reset_index(name='Count')

# Save the summary to a CSV file in the Output_Maps directory
summary_csv_path = os.path.join(output_dir, 'nws_warning_summary.csv')
summary.to_csv(summary_csv_path, index=False)

print("Summary and maps have been generated and saved.")

Summary and maps have been generated and saved.


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  unique_data['PHENOM'] = unique_data['PHENOM'].map(VTEC_PHENOMENA)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  unique_data['SIG'] = unique_data['SIG'].map(VTEC_SIGNIFICANCE)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  unique_data['ALERT_TYPE'] = unique_data['PHENOM'] + ' ' + unique_data['SIG']