Influenza Tracking Dashboard - Hieu Luu, 2024. This notebook is released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/).

# Influenza Disease Trackboard

This is a disease tracking dashboard, built using UK Government data to visualise the virus Influenza effects on the UK population.

According to the UK Health Security Agency (UKHSA), seasonal Influenza is a recurring illness that significantly contributes to the increased uptake and pressure on the NHS during the winter months. With this context, the dashboard is designed to visualize the impact of Influenza, highlighting its effects on the NHS across a number of years,through different seasons. 

**Dashboard Methodology**

1. The data is taken from the UKHSA Application Programming Interface (API), here: (https://ukhsa-dashboard.data.gov.uk/access-our-data/data-structure)

2. Two key metrics were chosen, both of which are plotted on the y axis of the graph.
 
    a. influenza_healthcare_ICUHDUadmissionRateByWeek
        This is defined as: "Influenza weekly admission rate of critical patients", and the metric shows the weekly rate per 100,000 people of the total number of people with confirmed influenza admitted to a hospital Intensive Care Unit (ICU) or High Dependency Unit (HDU) in the 7 days up to and including the date shown.

    b. influenza_testing_positivityByWeek
        This is defined as: "Influenza percentage of positive PCR tests in a 7 day period" and the metric shows the percentage of the total number of PCR tests for influenza taken in the 7 days up to and including the date shown which had a positive result. Data is shown by the date that the test was taken (specimen date).

3. The Python script is structured as below:

    a. Using an API wrapper object to access and download the required data for the two metrics above, saving then in the json file named: "admissions.json" and "testing.json"

    b. Clean and wrangle the data in order to get the two key fields, "date" and "metric_value"

    c. Using the new wrangled data, plot two graphs visualising the two metrics over time. This was done using the pands and matplotlib library.
    
    d. Added a functionality for an interactive button to combine the two graphs into one for increased useability, along with an added "refresh data" button which will update the graph using the latest UKHSA data. This is done by utilising the API function defined before to update the ```dataframe```, without the need to overwrite the included json files.

In [1]:
# Creating the APIwrapper class

import requests
import time

class APIwrapper:
    _access_point="https://api.ukhsa-dashboard.data.gov.uk"
    _last_access=0.0
    
    # Defining the data structure parameters, using the UKHSA datapath 
    def __init__(self, theme, sub_theme, topic, geography_type, geography, metric):
        url_path=(f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/" +
                  f"{geography_type}/geographies/{geography}/metrics/{metric}")
   
        self._start_url=APIwrapper._access_point+url_path
        self._filters=None
        self._page_size=-1
      
        self.count=None
# Function to get data on page, and also to filter page size, preventing IP ban
    def get_page(self, filters={}, page_size=5):
        if page_size > 365:
            raise ValueError("Max supported page size is 365")
    
        if filters != self._filters or page_size != self._page_size:
            self._filters = filters
            self._page_size = page_size
            self._next_url = self._start_url
    
        if self._next_url is None: 
            return [] 
    
        curr_time = time.time()
        deltat = curr_time - APIwrapper._last_access
        if deltat < 0.33: 
            time.sleep(0.33 - deltat)
        APIwrapper._last_access = curr_time
    
        parameters = {x: y for x, y in filters.items() if y is not None}
        parameters['page_size'] = page_size
    
        response = requests.get(self._next_url, params=parameters).json()
    
        self._next_url = response['next']
        self.count = response['count']
    
        # Assign response['results'] to 'page_data' 
        page_data = response['results']
    
        return page_data
# Loop the .getpage() to fetch all data pages in one go.
    def get_all_pages(self, filters={}, page_size=365):
        
        data=[] 
        while True:
           
            next_page=self.get_page(filters, page_size)
            if next_page==[]:
                break 
            data.extend(next_page)
        return data

In [2]:
# Download and Query parameters set up for the 1st metric

structure = {
    "theme": "infectious_disease",
    "sub_theme": "respiratory",
    "topic": "Influenza",
    "geography_type": "Nation",
    "geography": "England"
}
structure["metric"] = "influenza_healthcare_ICUHDUadmissionRateByWeek"


# Filter for all ages
filters = {
    "stratum": None,
    "age": "all",
    "sex": None,
    "year": None,
    "month": None,
    "epiweek": None,
    "date": None,
    "in_reporting_delay_period": None
}

# Initialise API Wrapper
api = APIwrapper(**structure)

# Fetch data from all pages
admissions = api.get_all_pages(filters=filters)

# Validate data - commented out for final version
# print(f"Data points expected: {api.count}")
# print(f"Data points retrieved: {len(admissions)}")

# Test results - first 5 results - commented out for final version
# print("First 5 results:")
# for case in admissions[:5]:  # Display the first 5 results
#     print(case)

In [3]:
# Setting up the 2nd metric

structure = {
    "theme": "infectious_disease",
    "sub_theme": "respiratory",
    "topic": "Influenza",
    "geography_type": "Nation",
    "geography": "England"
}
structure["metric"] = "influenza_testing_positivityByWeek"


# Filter for all ages
filters = {
    "stratum": None,
    "age": "all",
    "sex": None,
    "year": None,
    "month": None,
    "epiweek": None,
    "date": None,
    "in_reporting_delay_period": None
}

# Initialise API Wrapper
api = APIwrapper(**structure)

# Fetch data from all pages
testing = api.get_all_pages(filters=filters)

# Validate data - commented out for final version
# print(f"Data points expected: {api.count}")
# print(f"Data points retrieved: {len(testing)}")

# Test results - first 5 results - commented out for final version
# print("First 5 results:")
# for case in testing[:5]:  # Display the first 5 results
#     print(case)

In [4]:
# Saving the downloaded data into json files
import json
with open("admissions.json", "wt") as OUTF:
    json.dump(admissions, OUTF)

with open("testing.json", "wt") as OUTF:
    json.dump(testing, OUTF)

In [5]:
# Wrangling and cleaning up the data for the two metrics above

import pandas as pd
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import json

# Embedding of matplotlib output
%matplotlib inline
# make figures larger
plt.rcParams['figure.dpi'] = 100

# Loading the JOSN file
with open("admissions.json", "rt") as INFILE:
    admissions=json.load(INFILE)

with open("testing.json", "rt") as INFILE:
    testing=json.load(INFILE)    

data={}
for dataset in [admissions, testing]:
    for entry in dataset:
        date=entry['date']
        metric=entry['metric']
        value=entry['metric_value']
        if date not in data:
            data[date]={}
        data[date][metric]=value

# Extract and sort dates
dates=list(data.keys())
dates.sort()

# Convert dates above to pandas type, finding the earliest and latest time
def parse_date(datestring):
    """ Convert a date string into a pandas datetime object """
    return pd.to_datetime(datestring, format="%Y-%m-%d")

startdate=parse_date(dates[0])
enddate=parse_date(dates[-1])

# Using the updated dates to define the dataframe with the two new columns
index=pd.date_range(startdate, enddate, freq='D')
timeseriesdf=pd.DataFrame(index=index, columns=['admissions', 'testing'])

# translate the columns to the two metrics
metrics ={'admissions': 'influenza_healthcare_ICUHDUadmissionRateByWeek',
          'testing' :   'influenza_testing_positivityByWeek'}

for date, entry in data.items(): 
    pd_date=parse_date(date)
    for column in ['admissions','testing']: 
        metric_name=metrics[column]
        # Insert a 0.0 if no value
        value= entry.get(metric_name, 0.0)
        timeseriesdf.loc[date, column]=value
# fill in any remaining for any missing dates
timeseriesdf.fillna(0.0, inplace=True)
            

  timeseriesdf.fillna(0.0, inplace=True)


In [6]:
#Adding interactive controls - Refresh button

# Refresh button set up

from IPython.display import clear_output
import ipywidgets as wdg
%matplotlib inline
# make figures larger
plt.rcParams['figure.dpi'] = 100
# Create an Output widget to capture print statements here and also in the graph output down below in the next script
output_widget = wdg.Output()

# Defining the API access click back function
def access_api(button):
    with output_widget:  # Redirect output to the widget
        clear_output(wait=True)  # Clear previous outputs
        print("Fetching data from the API...")
    # Define the structure template
        structure = {
            "theme": "infectious_disease",
            "sub_theme": "respiratory",
            "topic": "Influenza",
            "geography_type": "Nation",
            "geography": "England"
        }
        
        # Define metrics
        metrics = {
            'admissions': 'influenza_healthcare_ICUHDUadmissionRateByWeek',
            'testing': 'influenza_testing_positivityByWeek'
        }
        
        # Define filters
        filters = {
            "stratum": None,
            "age": "all",
            "sex": None,
            "year": None,
            "month": None,
            "epiweek": None,
            "date": None,
            "in_reporting_delay_period": None
        }
    # Declare the global DataFrame
        global timeseriesdf  

        try:
            data = {}  
            
            for column, metric_name in metrics.items():
                # Update the structure with the current metric
                structure["metric"] = metric_name
                
                # Fetch new data from the API with filters applied and printing results
                api = APIwrapper(**structure)
                new_data = api.get_all_pages(filters=filters)  # Pass filters to API call
                print(f"Updated {len(new_data)} records for metric '{metric_name}'.")
                
                # Extract only 'date' and 'metric_value' keys and organize into the 'data' dictionary
                for entry in new_data:
                    date = entry['date']
                    value = entry.get('metric_value', 0.0)  
                    if date not in data:
                        data[date] = {}
                    data[date][metric_name] = value

            # Update the DataFrame with processed data
            for date, entry in data.items():
                pd_date = pd.to_datetime(date)
                for column, metric_name in metrics.items():
                    value = entry.get(metric_name, 0.0)  #
                    timeseriesdf.loc[pd_date, column] = value

            # Fill any remaining with 0.0
            timeseriesdf.fillna(0.0, inplace=True)
            print("Graph updated successfully with the latest UKHSA data.")
            
            # Update the button to indicate success
            apibutton.icon = "check"
            apibutton.disabled = False 

        except Exception as e:
            print(f"Error while fetching data: {e}")
            apibutton.icon = "times"

# Refresh button widget
apibutton = wdg.Button(
    description='Refresh data',
    disabled=False,
    button_style='', 
    tooltip='Click to download the latest metrics',
    icon='download'
)

# Register the callback function with the button
apibutton.on_click(access_api)

## Interactive graph

Please use the buttons and options to interact with the graph, key features including:
1. **Year selection** - Please click to choose the desired year.
2. **Metric selection** - Please use control/command and right click to one/both metrics.
3. **Scale selection** - Please click to choose either normal or log graphs.
4. **Refresh data** - To update the graph with latest UKHSA data, please click on the refresh button, there will be comments below the graph to indicate the number of records which have been downloaded and updated, with a confirmation statement to confirm the update has been completed. Please then change the year/metric to update the graph with the latest data. (Please note that the updated data will not overwrite the exisitng json file and just update the graph).

In [7]:
#Adding interactive controls - year, categogies and scale

# Widget for year selection
year = wdg.Select(
    options=timeseriesdf.index.year.unique(),  
    value=timeseriesdf.index.year[-1],        
    rows=4,                                   
    description='Year:',                      
    disabled=False                            
)

# Widget for selecting data categories (admissions, testing)
series = wdg.SelectMultiple(
    options=['admissions', 'testing'],
    value=['admissions', 'testing'],
    rows=3,
    description='Metrics:',
    disabled=False
)

# Widget for selecting scale (linear or log)
scale = wdg.RadioButtons(
    options=['linear', 'log'],
    description='Scale:',
    disabled=False
)

# Combine controls into a box
controls = wdg.Box([year, series, scale])

# Main graph function
def timeseries_graph(graphyear, gcols, gscale):
    if gscale == 'linear':
        logscale = False
    else:
        logscale = True
    
    # Filter the DataFrame for the selected year
    yeardf = timeseriesdf[timeseriesdf.index.year == graphyear]
    
    ncols = len(gcols)
    if ncols > 0:
        # Plot the selected data for the filtered year
        yeardf[list(gcols)].plot(logy=logscale)
        plt.title(f"Year: {graphyear}")
        plt.xlabel("Date")
        plt.ylabel("Value")
        plt.grid(True, linestyle="--", alpha=0.7)
        plt.legend(loc="best")
        plt.show()  
    else:
        print("Click to select data for the graph")
        print("(Command/Ctrl-Click to select more than one test category)")

# Interactive output for the graph
graph = wdg.interactive_output(
    timeseries_graph, 
    {'graphyear': year, 'gcols': series, 'gscale': scale}
)

# Display widgets and the graph with comments confirmation
display(apibutton,controls, graph, output_widget)

Button(description='Refresh data', icon='download', style=ButtonStyle(), tooltip='Click to download the latest…

Box(children=(Select(description='Year:', index=9, options=(2015, 2016, 2017, 2018, 2019, 2020, 2021, 2022, 20…

Output()

Output()

**License**

"Based on UK Government [data](https://ukhsa-dashboard.data.gov.uk/) published by the [UK Health Security Agency](https://www.gov.uk/government/organisations/uk-health-security-agency) and on the [DIY Disease Tracking Dashboard Kit](https://github.com/fsmeraldi/diy-covid19dash) by Fabrizio Smeraldi. Released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/)."