[DIY Disease Tracking Dashboard Kit](https://github.com/fsmeraldi/diy-covid19dash) (C) Fabrizio Smeraldi, 2020,2024 ([f.smeraldi@qmul.ac.uk](mailto:f.smeraldi@qmul.ac.uk) - [web](http://www.eecs.qmul.ac.uk/~fabri/)). This notebook is released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/).

# Influenza Tracking Dashboard

**Welcome!**

This dashboard allows you to explore weekly influenza-related statistics, specifically:
- Hospital admissions rates: Track how often influenza leads to hospitalizations.
- ICU and HDU admission rates: Monitor severe cases requiring intensive care.

Using data from the UK Health Security Agency (UKHSA), this interactive tool provides an up-to-date view of influenza trends in England.

**Key Features**

- Preloaded Data:
The dashboard starts with reliable, preloaded data for seamless access, even offline.
Users can explore historical trends without waiting for API responses.
- Real-Time Updates:
Refresh the data with a single click to access the latest statistics from the UKHSA API.
Ensures that you are informed about current trends.
- Interactive Graphs:
Select the metrics to display: Choose from hospital admissions or ICU/HDU admissions.
Switch between linear and logarithmic scales for better data visualization.
Dynamic updates ensure the graphs reflect your choices instantly.

**How to Use the Dashboard**
- Explore the Graphs:
Toggle the Scale Selector to switch between linear and logarithmic scales.
- Update the Data:
Click the Fetch Data button to retrieve fresh data from the UKHSA API.
The graphs will refresh automatically to reflect the latest data.


Start Exploring!

In [1]:
from IPython.display import clear_output
import ipywidgets as wdg
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import time
import json

In [2]:
%matplotlib inline
# make figures larger
plt.rcParams['figure.dpi'] = 100

In [3]:
# Load JSON files and store the raw data in some variable. Edit as appropriate
with open("hospital.json", "rt") as INFILE:
    hospital=json.load(INFILE)
with open("icuhdu.json", "rt") as INFILE:
    icuhdu=json.load(INFILE)
#with open("test_pos.json", "rt") as INFILE:
    #test_pos=json.load(INFILE)

jsondata={}
for dataset in [hospital, icuhdu]:
    for entry in dataset:
        date=entry['date']
        metric=entry['metric']
        value=entry['metric_value']
        if date not in jsondata:
            jsondata[date]={}
        jsondata[date][metric]=value
#jsondata



In [4]:

def wrangle_data(rawdata):
    """ Parameters: rawdata - data from json file or API call. Returns a dataframe.
    Edit to include the code that wrangles the data, creates the dataframe and fills it in. """
    # Create a sorted list of dates from the raw data
    dates = list(rawdata.keys())
    dates.sort()

    # Helper function to parse dates into pandas datetime objects
    def parse_date(datestring):
        return pd.to_datetime(datestring, format="%Y-%m-%d")

    # Define start and end dates for the DataFrame index
    startdate = parse_date(dates[0])
    enddate = parse_date(dates[-1])
    #print(f"Wrangling data from {startdate} to {enddate}")

    # Create a DataFrame with a date range as its index
    index = pd.date_range(startdate, enddate, freq='W-MON')
    timeseriesidf = pd.DataFrame(index=index, columns=['hospital', 'icuhdu'])
    

    # Define the mapping for the metrics
    metrics = {
        'hospital': 'influenza_healthcare_hospitalAdmissionRateByWeek',
        'icuhdu': 'influenza_healthcare_ICUHDUadmissionRateByWeek'
    }

    # Fill the DataFrame with values from the raw data
    for date, entry in rawdata.items():
        pd_date = parse_date(date)  # Convert string date to pandas datetime
        for column in ['hospital', 'icuhdu']:
            metric_name = metrics[column]
            # Fetch the value for the metric or use 0.0 if missing
            value = entry.get(metric_name, 0.0)
            timeseriesidf.loc[pd_date, column] = value

    # Fill any missing values in the DataFrame with 0.0
    timeseriesidf.fillna(0.0, inplace=True)

    return timeseriesidf


# Wrangle data from the loaded JSON
timeseriesidf = wrangle_data(jsondata)

# Verify the output
#print(timeseriesidf.head())


  timeseriesidf.fillna(0.0, inplace=True)


In [5]:
# Place your API access code in this function. Do not call this function directly; it will be called by 
# the button callback. 

class APIwrapper:
    # class variables shared among all instances
    _access_point="https://api.ukhsa-dashboard.data.gov.uk"
    _last_access=0.0 # time of last api access
    
    def __init__(self, theme, sub_theme, topic, geography_type, geography, metric):
        """ Init the APIwrapper object, constructing the endpoint from the structure
        parameters """
        # build the path with all the required structure parameters. You do not need to edit this line,
        # parameters will be replaced by the actual values when you instantiate an object of the class!
        url_path=(f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/" +
                  f"{geography_type}/geographies/{geography}/metrics/{metric}")
        # our starting API endpoint
        self._start_url=APIwrapper._access_point+url_path
        self._filters=None
        self._page_size=-1
        # will contain the number of items
        self.count=None

    def get_page(self, filters={}, page_size=5):
        """ Access the API and download the next page of data. Sets the count
        attribute to the total number of items available for this query. Changing
        filters or page_size will cause get_page to restart from page 1. Rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365); use the default value 
        for debugging your structure and filters. """
        # Check page size is within range
        if page_size>365:
            raise ValueError("Max supported page size is 365")
        # restart from first page if page or filters have changed
        if filters!=self._filters or page_size!=self._page_size:
            self._filters=filters
            self._page_size=page_size
            self._next_url=self._start_url
        # signal the end of data condition
        if self._next_url==None: 
            return [] # we already fetched the last page
        # simple rate limiting to avoid bans
        curr_time=time.time() # Unix time: number of seconds since the Epoch
        deltat=curr_time-APIwrapper._last_access
        if deltat<0.33: # max 3 requests/second
            time.sleep(0.33-deltat)
        APIwrapper._last_access=curr_time
        # build parameter dictionary by removing all the None
        # values from filters and adding page_size
        parameters={x: y for x, y in filters.items() if y!=None}
        parameters['page_size']=page_size
        # the page parameter is already included in _next_url.
        # This is the API access. Response is a dictionary with various keys.
        # the .json() method decodes the response into Python object (dictionaries,
        # lists; 'null' values are translated as None).
        response = requests.get(self._next_url, params=parameters).json()
        # update url so we'll fetch the next page
        self._next_url=response['next']
        self.count=response['count']
        # data are in the nested 'results' list
        return response['results'] 

    def get_all_pages(self, filters={}, page_size=365):
        """ Access the API and download all available data pages of data. Sets the count
        attribute to the total number of items available for this query. API access rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365), and controls the trade-off
        between time to load a page and number of pages; the default should work well 
        in most cases. The number of items returned should in any case be equal to 
        the count attribute. """
        data=[] # build up all data here
        while True:
            # use get_page to do the job, including the pacing
            next_page=self.get_page(filters, page_size)
            if next_page==[]:
                break # we are done
            data.extend(next_page)
        return data

def access_api():
    """ Accesses the UKHSA API. Return data as a like-for-like replacement for the "canned" data loaded from the JSON file. """
    try:
        # Define the structure for the query
        structure = {
            "theme": "infectious_disease", 
            "sub_theme": "respiratory",
            "topic": "Influenza",
            "geography_type": "Nation", 
            "geography": "England"
        }
        
        # Fetch hospital data
        structure["metric"] = "influenza_healthcare_hospitalAdmissionRateByWeek"
        api_hospital = APIwrapper(**structure)
        hospital = api_hospital.get_all_pages()
        print(f"Hospital Data Points Retrieved: {len(hospital)}")
        
        # Fetch icuhdu data
        structure["metric"] = "influenza_healthcare_ICUHDUadmissionRateByWeek"
        api_icuhdu = APIwrapper(**structure)
        icuhdu = api_icuhdu.get_all_pages()
        print(f"ICU/HDU Data Points Retrieved: {len(icuhdu)}")
        
        # Combine and return as dictionary
        fresh_data = {}
        for dataset in [hospital, icuhdu]:
            for entry in dataset:
                date = entry["date"]
                metric = entry["metric"]
                value = entry["metric_value"]
                if date not in fresh_data:
                    fresh_data[date] = {}
                fresh_data[date][metric] = value
        return fresh_data # return data read from the API

    except Exception as e:
        print(f"Error fetching data: {e}")
        return None

    
    


### Fetching Real-Time Data

The **Fetch Data** button retrieves the latest statistics from the UKHSA API. 
- If the API is unavailable, the dashboard retains the preloaded data.
- Error handling ensures the dashboard remains operational in case of connectivity issues.


In [6]:
# Button Callback Function
def api_button_callback(button):
    """Button callback to fetch and process API data."""
    print("Fetching fresh data from API...")
    fresh_data = access_api()
    
    if fresh_data is not None:
        global df
        df = wrangle_data(fresh_data)  # Process the data into a DataFrame
        print("Data refreshed successfully!")
        refresh_graph()  # Simulate widget interaction to refresh the graph
        apibutton.icon = "check"
        apibutton.button_style = "success"
        apibutton.tooltip = "Data refreshed successfully!"
    else:
        print("Failed to fetch data. Retaining previous data.")
        apibutton.icon = "times"
        apibutton.button_style = "danger"
        apibutton.tooltip = "API fetch failed."

def refresh_graph():
    """Simulate widget interaction to refresh the graph."""
    print("Graph refreshed!")
    # Implement graph-refreshing logic here if needed

# Create the Button
apibutton = wdg.Button(
    description="Fetch Data",
    disabled=False,
    button_style="",  # 'success', 'info', 'warning', 'danger' or ''
    tooltip="Click to fetch the latest data",
    icon="download"  # FontAwesome icon
)

# Register the callback with the button
apibutton.on_click(api_button_callback)

# Display the button
display(apibutton)



Button(description='Fetch Data', icon='download', style=ButtonStyle(), tooltip='Click to fetch the latest data…

### Weekly Influenza Statistics

This graph shows weekly trends for:
- **Admissions**: The number of hospitalisations due to influenza.
- **ICU/HDU Admissions**: The number of severe cases requiring intensive care.

#### How to Use:
- Select the metrics to display using the dropdown. Hold down ctrl/command to select both metrics
- Toggle between linear and logarithmic scales using the radio buttons.
- Click "Fetch Data" to refresh the graph with the latest data.


In [7]:
series=wdg.SelectMultiple(
    options=['hospital', 'icuhdu'],
    value=['hospital', 'icuhdu'],
    rows=2,
    description='Stats:',
    disabled=False
)

scale=wdg.RadioButtons(
    options=['linear', 'log'],
#   value='pineapple', # Defaults to 'pineapple'
#   layout={'width': 'max-content'}, # If the items' names are long
    description='Scale:',
    disabled=False
)

# try replacing HBox with a VBox
controls=wdg.HBox([series, scale])

"""def timeseriesi_graph(gcols, gscale):
    if gscale=='linear':
        logscale=False
    else:
        logscale=True
    ncols=len(gcols)
    if ncols>0:
        timeseriesidf[list(gcols)].plot(logy=logscale)
        plt.show() # important - graphs won't update if this is missing 
    else:
        print("Click to select data for graph")
        print("(CTRL-Click to select more than one category)")"""

def timeseriesi_graph(gcols, gscale):
    """
    Plots the selected columns with a static title.
    
    Parameters:
        gcols: list - selected columns to plot.
        gscale: str - scale type ('linear' or 'log').
    """
    # Determine the scale
    logscale = True if gscale == 'log' else False
    
    # Check if there are columns to plot
    if len(gcols) > 0:
        ax = timeseriesidf[list(gcols)].plot(logy=logscale)  # Plot the selected columns
        ax.set_title('Weekly Influenza Statistics')  # Add a static title
        ax.set_ylabel('Rate')  # Optional: Add a y-axis label
        ax.set_xlabel('Date')  # Optional: Add an x-axis label
        plt.show()  # Ensure the graph updates
    else:
        print("Click to select data for the graph")
        print("(CTRL-Click to select more than one category)")


# keep calling timeseriesi_graph(gcols=value_of_series, gscale=value_of_scale); 
# capture output in widget graph   
graph=wdg.interactive_output(timeseriesi_graph, {'gcols': series, 'gscale': scale})

display(controls, graph)


HBox(children=(SelectMultiple(description='Stats:', index=(0, 1), options=('hospital', 'icuhdu'), rows=2, valu…

Output()

**Author and License** Remember that if you deploy your dashboard as a Binder it will be publicly accessible. Change the copyright notice and take credit for your work! Also acknowledge your sources and the conditions of the license by including this notice: "Based on UK Government [data](https://ukhsa-dashboard.data.gov.uk/) published by the [UK Health Security Agency](https://www.gov.uk/government/organisations/uk-health-security-agency) and on the [DIY Disease Tracking Dashboard Kit](https://github.com/fsmeraldi/diy-covid19dash) by Fabrizio Smeraldi. Released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/)."