# DIY Disease Tracking Dashboard
## COVID-19 healthcare occupied beds by day

The following dashboard focuses on the metric 'COVID-19 healthcare occupied beds by day'. This shows the mean number of beds occupied by confirmed COVID-19 patients over the 7 days up to and including the dates shown.

These statistics are important as understanding the rates and patterns of hospital admissions can help to inform planning around hospital pressures including beds and staffing.

In [17]:
# Import required modules 
from IPython.display import clear_output
import ipywidgets as wdg
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import time
import json

In [18]:
# Ensure plots display directly inside notebook
%matplotlib inline
# Ensure plots appear larger
plt.rcParams['figure.dpi'] = 100

## Methodology of the data

This data includes people admitted to hospital who tested positive for COVID-19 in the 14 days before their admission and during their stay. Hospital inpatients who are diagnosed with COVID-19 after admission are reported as being admitted on the day before their diagnosis. Admissions figures include people admitted to NHS acute hospitals and mental health and learning disability (MHLD) trusts.

Updates are published by NHS England on the second Thursday of each month, and contain data up to the end of the previous month.

In [27]:
# Load JSON files and store the raw data in a variable
# Create jsondata variable as dictionary
jsondata = {}
# Load the JSON file from disk into jsondata
with open("beds.json", "rt") as INFILE:
    jsondata = json.load(INFILE)

In [29]:
# Utility function to parse dates
def parse_date(datestring):
    """ Convert a date string into a pandas datetime object """
    return pd.to_datetime(datestring, format="%Y-%m-%d")

# Create function to wrangle the data and return a DataFrame
def wrangle_data(jsondata):
    """ Parameters: rawdata - data from json file or API call. Returns a dataframe.
    Edit to include the code that wrangles the data, creates the dataframe and fills it in. """
  
    # Retrieve the values from the data slice and sort into a dictionary with the dates as keys and desired metrics as the values
    data = {}
    for dataset in [jsondata]:
        for entry in dataset:
            date = entry['date']
            metric = entry['metric']
            value = entry['metric_value']
            if date not in data:
                data[date]= {}
            data[date][metric] = value
    
    # Extract all the dates from data and sort them
    dates = list(data.keys())
    dates.sort()
    dates
    # Using function parse_date find and print the first and last date for the data set
    startdate = parse_date(dates[0])
    enddate = parse_date(dates[-1])
    # Create a Pandas index as a date_range: this is the date analog of a range for integers, and it will include any dates that may be missing from our list.
    index = pd.date_range(startdate, enddate, freq='D')

    # Define the DateFrame by specifying its index and the title of its columns.
    timeseriesdfbeds = pd.DataFrame(index=index, columns=['beds'])

    # Fill DataFrame with values from COVID-19 healthcare occupied beds by day
    # Translate the columns to metrics
    metrics = {'beds': 'COVID-19_healthcare_occupiedBedsByDay'}
    for date, entry in data.items(): # each entry is a dictionary with beds
        pd_date = parse_date(date) # convert to Pandas format
        for column in ['beds']: 
            metric_name = metrics[column]
            # Do not assume all values are there for every date - if a value is not available, insert a 0.0
            value = entry.get(metric_name, 0.0)
            # Access a specific location in the dataframe - use .loc and put index, column in a single set of [ ]
            timeseriesdfbeds.loc[date, column] = value
                
    # Fill in any remaining null values with 0.0
    timeseriesdfbeds.fillna(0.0, inplace=True)
    # Return the final DataFrame        
    return timeseriesdfbeds

# Wrangle initial data from the JSON file
df = wrangle_data(jsondata) # df is the dataframe for plotting

## Graphs and Analysis

These graphs give a visual representation on the COVID-19 healthcare occupied beds by day metric between the dates 08/01/2020 to 31/10/2024.

1.	Graph 1 (Linear Scale):

- This graph shows the daily number of hospital beds occupied by confirmed COVID-19 cases over time on a   linear scale.

- Key features:
	 - A peak in occupied beds around early 2021, which corresponds to the significant wave of COVID-19 cases during that period.
	 - Subsequent smaller waves in 2022 and a noticeable decline in 2023 and 2024.
        
2.	Graph 2 (Logarithmic Scale):

- This graph displays the same data but on a logarithmic scale, which is particularly useful for analyzing changes over time when values vary widely.

- Key features:
     - The logarithmic scale highlights proportional changes, making smaller fluctuations in bed occupancy more visible, especially during periods with fewer cases.
	 - While the peaks align with the first graph, the visualization helps emphasize gradual trends in decline.


In [33]:
# Create SelectMultiple widget for selecting data series (only beds is available)
series = wdg.SelectMultiple(
    options=['beds'], # Available options for selection
    value=['beds'], # Default value
    rows=1, # Number of visible rows
    description='Stats:',
    disabled=False
)
# Create RadioButtons widget for selecting the scale type (linear/logarithmic)
scale = wdg.RadioButtons(
    options=['linear', 'log'],
    description='Scale:',
    disabled=False
)

# Group widgets into a horizontal box (HBox) layout 
controls = wdg.HBox([series, scale])

# Define function to plot the time series graph based on selected options
def timeseries_graph(gcols, gscale):
    global df # Updated DataFrame
   # Determine if the y-axis should be logarithmic
    logscale = gscale == 'log'
    if gcols:
        try:
            # Plot the selected columns with the appropriate scale
            df[list(gcols)].plot(logy=logscale) # Use updated DataFrame from refresh button
            plt.xlabel("Date")  # Label for the x-axis
            plt.ylabel("Occupied Beds")  # Label for the y-axis
            plt.title("Daily Number of Occupied Beds of Confirmed COVID-19 Cases")  # Graph title
            plt.legend(title="Stats")  # Add a legend with a title
            plt.show()  # Display the graph
        except KeyError:
            # Handle the case where the selected columns are not found in the DataFrame
            print(f"Error: Columns {gcols} not found in DataFrame.")
    else:
        # Provide instructions if no data is selected for plotting
        print("Please select one data series to plot.")

# Keep calling timeseries_graph(gcols=value_of_series, gscale=value_of_scale); 
# Capture output in widget graph   
graph = wdg.interactive_output(timeseries_graph, {'gcols': series, 'gscale': scale})

# Output widget controls with their respective graphs
display(controls, graph)

# Provide user instructions
print("Instructions:")
print("1. Use the 'Stats' selector to choose one or more data series to display.")
print("2. Use the 'Scale' selector to switch between linear and logarithmic scale.")
print("3. The graph will update automatically when selections are changed.")

HBox(children=(SelectMultiple(description='Stats:', index=(0,), options=('beds',), rows=1, value=('beds',)), R…

Output()

Instructions:
1. Use the 'Stats' selector to choose one or more data series to display.
2. Use the 'Scale' selector to switch between linear and logarithmic scale.
3. The graph will update automatically when selections are changed.


## Download current data

If you desire to view the latest data based on the metric please click Fetch Data button below.

This will generate current data from the API and refresh the graphs.

In [35]:
# Create APIwrapper class to build the endpoint from the structure and handle the paging
class APIwrapper:
    # class variables shared among all instances
    _access_point = "https://api.ukhsa-dashboard.data.gov.uk"
    _last_access = 0.0 # time of last api access
    
    def __init__(self, theme, sub_theme, topic, geography_type, geography, metric):
        """ Init the APIwrapper object, constructing the endpoint from the structure
        parameters """
        # Build the path with all the required structure parameters. Parameters will be replaced by the actual values when object of the class is instantiated.
        url_path = (f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/" + f"{geography_type}/geographies/{geography}/metrics/{metric}")
        # Starting API endpoint
        self._start_url = APIwrapper._access_point + url_path
        self._filters = None
        self._page_size = -1
        # Will contain the number of items
        self.count = None

    def get_page(self, filters={}, page_size=5):
        """ Access the API and download the next page of data (fetches single page of data from the API). Sets the countattribute to the total number of items available for this query. Changingfilters or page_size will cause get_page to restart from page 1. Rate limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365); use the default value 
        for debugging your structure and filters. """
        # Check page size is within range
        if page_size > 365:
            raise ValueError("Max supported page size is 365")
        # Restart from first page if page or filters have changed
        if filters!=self._filters or page_size!=self._page_size:
            self._filters=filters
            self._page_size=page_size
            self._next_url=self._start_url
        # Signal the end of data condition
        if self._next_url == None: 
            return [] 
        # Simple rate limiting to avoid bans
        curr_time = time.time() # Unix time: number of seconds since the Epoch
        deltat = curr_time-APIwrapper._last_access
        if deltat <0.33: # Max 3 requests/second
            time.sleep(0.33 - deltat)
        APIwrapper._last_access = curr_time
        # Build parameter dictionary by removing all the None values from filters and adding page_size
        parameters = {x: y for x, y in filters.items() if y != None}
        parameters['page_size'] = page_size
        # The page parameter is already included in _next_url.
        # This is the API access. Response is a dictionary with various keys.
        # The .json() method decodes the response into Python object (dictionaries,lists; 'null' values are translated as None).
        response = requests.get(self._next_url, params=parameters).json()
        # Update url to fetch the next page
        self._next_url = response['next']
        self.count = response['count']
        # Data is nested in 'results' list
        return response['results'] 

    def get_all_pages(self, filters={}, page_size=365):
        """ Access the API and download all available data pages of data. Sets the count
        attribute to the total number of items available for this query. API access rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365), and controls the trade-off
        between time to load a page and number of pages; the default should work well 
        in most cases. The number of items returned should in any case be equal to 
        the count attribute. """
        data = [] # Build up all data here
        while True:
            # Use get_page to do the job, including the pacing
            next_page=self.get_page(filters, page_size)
            if next_page==[]:
                break 
            data.extend(next_page)
        return data
    
# Function to access the API and return fresh data
def access_api():
    """ Accesses the UKHSA API. Return data as a like-for-like replacement for the "canned" data loaded from the JSON file. """
    # Define the structure of the query
    structure = {"theme": "infectious_disease", 
            "sub_theme": "respiratory",
            "topic": "COVID-19",
            "geography_type": "Nation", 
            "geography": "England", 
            "metric": "COVID-19_healthcare_occupiedBedsByDay",
    }
    try: # Try, except block for handling error if API access does not work
        api = APIwrapper(**structure)
        beds_current =  api.get_all_pages()
        print(f"Total data points retrieved: {len(beds_current)}")
        return beds_current
    except Exception as e:
        print(f"Error accessing the API: {e}")
        return []

In [37]:
# Create function to refresh graph using API data
def refresh_graph():
    """ We change the value of the widget in order to force a redraw of the graph;
    this is useful when the data have been updated. """
    current_selection = series.value
    # Toggle the selection to force redraw
    if current_selection:
        series.value = []  # Clear selection temporarily
        series.value = current_selection  # Reset to the original selection
        print("Graph refreshed!")

# Refresh button callback to fetch API data and update the DataFrame
def api_button_callback(button):
    """ Button callback - it must take the button as its parameter (unused in this case).
    Accesses API, wrangles data, updates global variable df used for plotting. """
    global df
    try:
        print("Accessing data from the API...")
        apidata = access_api() # Fetching data from API
        if apidata:
            print("Wrangling the data...")
            df = wrangle_data(apidata) # Wrangle and update the global DataFrame
            # Force graph refresh
            refresh_graph() 
            # Switch the icon on the button to a "check" sign
            apibutton.icon = "check"
            apibutton.tooltip = "Data refreshed!"
        else:
            # Error handling uses icons "unlink" or "times" and changes the button text to "Unavailable" when the api call fails.
            apibutton.icon = "Unavailable"
            apibutton.tooltip = "API call failed. Please try again"
    except Exception as e:
        print(f"An error occurred: {e}")
        apibutton.icon = "times"
        apibutton.tooltip = "Error occurred. Check the logs"

# Define and display the refresh button   
apibutton = wdg.Button(
    description='Fetch Data', 
    disabled=False,
    button_style='warning', 
    tooltip="Fetch fresh data from the API",
    icon = 'download'
)

# Register button callback function with the button
apibutton.on_click(api_button_callback)
# Display button
display(apibutton)




**Author and License** 

"Based on UK Government [data](https://ukhsa-dashboard.data.gov.uk/) published by the [UK Health Security Agency](https://www.gov.uk/government/organisations/uk-health-security-agency) and on the [DIY Disease Tracking Dashboard Kit](https://github.com/fsmeraldi/diy-covid19dash) by Fabrizio Smeraldi. Released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/)."