# Download USGS Data 
This script is to download USGS data from both the nwis package and the website. Downloading from NWIS is faster and efficient compared to from the website

NWIS

In [None]:
import pandas as pd
from dataretrieval import nwis
import logging

# Setup basic configuration for logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

# Load the gauge file
gauge_file = pd.read_excel('usgs_with_comid.xlsx')
gauge_file = gauge_file[0:5]

# Ensure site numbers are strings of length 8 with leading zeros if necessary
gauge_file['station_no'] = gauge_file['station_no'].apply(lambda x: f"{x:08d}")

# Extract the correctly formatted site numbers
sites = gauge_file['station_no']

# Define dates and parameter code
start, end = "2000-01-01", "2024-07-01"
parameter_code = '00060'  # Discharge, cubic feet per second

# Create an empty DataFrame to store combined data
combined_discharge = pd.DataFrame()

# Process each site
for site in sites:
    try:
        logging.info(f"Fetching data for site {site}")
        discharge = nwis.get_record(sites=site, service='iv', parameterCd=parameter_code, start=start, end=end)

        if not discharge.empty:
            discharge_columns = [col for col in discharge.columns if '00060' in col and 'cd' not in col]
            if discharge_columns:
                discharge_column = discharge_columns[0]
                discharge[site] = discharge[discharge_column] * 0.0283168  # Convert cfs to cms
                discharge.index = pd.to_datetime(discharge.index)
                hourly_discharge = discharge[[site]].resample('h').mean()

                if combined_discharge.empty:
                    combined_discharge = hourly_discharge
                else:
                    combined_discharge = pd.concat([combined_discharge, hourly_discharge], axis=1)
    except Exception as e:
        logging.error(f"Failed to process data for site {site}: {e}")

combined_discharge.reset_index(inplace=True)
combined_discharge.rename(columns={'index': 'datetime'}, inplace=True)
combined_discharge['datetime'] = pd.to_datetime(combined_discharge['datetime']).dt.strftime('%Y-%m-%d %H:%M:%S')

# Save and log
combined_discharge.to_csv('usgs_data.csv', index=False)
logging.info("Combined discharge data saved to 'discharge_data.csv'.")


Website: This script was written by passaH2O group

In [None]:
import json
import pandas as pd
import numpy as np
import os
import requests
from time import sleep
from dask import delayed, compute
from tqdm import tqdm
from dask.diagnostics import ProgressBar

# Load COMID and gauge information
df = pd.read_excel('usgs_with_comid.xlsx')
df = df[0:5]  # Limit to the first 5 gauges for testing
comid = df.COMID.values
gage_num = df.station_no.values
gage_name = df.station_nm.values

# Example outputs to check
print(f'Example COMID: {comid[0]}')
print(f'Example Gauge ID: {gage_num[0]}')

# Define start and end dates for data retrieval
START_DATE = '1979-02-01'
END_DATE = '2023-02-01'

# Define the folder where CSV files will be saved
folder_name = 'usgs_1979_2023_website'

# Ensure the folder exists
os.makedirs(folder_name, exist_ok=True)

# Function to parse the datetime in the JSON response
def row_time(row):
    time_split = row.Time.split('T')
    date = time_split[0]
    time = time_split[1].split('.')[0]
    return date, time

# Function to fetch data from the USGS API and convert it to a DataFrame
def url_to_csv(url):
    try:
        # Convert requested URL to JSON
        jso = requests.get(url).json()
    except requests.exceptions.RequestException as e:
        print(f"Request failed for {url}: {e}")
        return 0
    
    # Subset the time series data
    dict_temp_a = list(jso.values())[3]['timeSeries']
    time_series_length = len(dict_temp_a)
    
    if time_series_length > 1:
        variableTypes = []
        dict_temp = []
        dict_temp2 = []

        # Loop through the available data
        for NUM, aaa in enumerate(dict_temp_a):
            variableNameVal = aaa['variable']['variableCode'][0]['value']
            variableTypes.append(variableNameVal)

            # Discharge data
            if variableNameVal == '00060':
                dict_temp = dict_temp_a[NUM]['values'][0]['value']

            # Stage data
            elif variableNameVal == '00065':
                dict_temp2 = dict_temp_a[NUM]['values'][0]['value']

        # Check if both discharge and stage data are available
        if '00065' in variableTypes:
            times = []
            discharge = []
            stage = []
            times2 = []

            # Collect discharge and stage data
            for i, vals in enumerate(dict_temp):
                times.append(vals['dateTime'])
                discharge.append(vals['value'])
            for j, vals2 in enumerate(dict_temp2):
                times2.append(vals2['dateTime'])
                stage.append(vals2['value'])

            # Create DataFrames
            value_dict = {"Time": times, "Discharge": discharge}
            value_dict2 = {"Time": times2, "Stage": stage}
            df1 = pd.DataFrame(value_dict)
            df2 = pd.DataFrame(value_dict2)

            # Merge stage and discharge data
            df = pd.merge(df1, df2, how='left', left_on='Time', right_on='Time')

            # Process the time information
            dates = df.apply(row_time, axis=1)
            date = []
            measure_time = []
            for j in dates:
                date.append(j[0])
                measure_time.append(j[1])

            df.drop(columns=["Time"], inplace=True)
            df['Date'] = date
            df['Time_temp'] = measure_time
            df['Time'] = pd.to_datetime(df['Date'] + ' ' + df['Time_temp'])
            df.drop(columns=["Date", "Time_temp"], inplace=True)
            return df
        else:
            return 0
    else:
        return 0

# Function to process each gage in parallel
@delayed
def process_gage(gage_num, comid, folder_name, START_DATE, END_DATE):
    url = f'https://waterservices.usgs.gov/nwis/iv/?sites=0{gage_num}&startDT={START_DATE}&endDT={END_DATE}&format=json'
    print(f"Fetching data from {url}")
    df = url_to_csv(url)

    if isinstance(df, pd.DataFrame):
        df['Gage'] = gage_num
        df['Comid'] = comid
        df['URL_data'] = url
        # Save the file to the designated folder
        df.to_csv(f'{folder_name}/discharge_0{gage_num}.csv')
        sleep(0.75)  # To avoid overwhelming the server
        return f"Saved data for Gauge: {gage_num}"
    else:
        sleep(0.2)
        return f"No data for Gauge: {gage_num}"

# Process all gauges
def fetch_all_gages(gage_num, comid, folder_name, START_DATE, END_DATE):
    tasks = []
    for i in range(len(gage_num)):
        tasks.append(process_gage(gage_num[i], comid[i], folder_name, START_DATE, END_DATE))

    # Use Dask to execute in parallel with progress tracking
    with ProgressBar():
        results = compute(*tasks)
    
    # Print final results
    for result in results:
        print(result)

# Run the main function to fetch data for all gauges
fetch_all_gages(gage_num, comid, folder_name, START_DATE, END_DATE)
