In [1]:
# ====================================================
# =========== Imports and Helper Functions ===========
# ====================================================

In [2]:
# ==== Imports ====
from IPython.display import clear_output
import ipywidgets as wdg
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import time
import json
import gc
import traceback 
from IPython.display import HTML

# Inject MathJax manually to prevent the "ReferenceError" crash in Voila
display(HTML("""
<script type="text/javascript" async
  src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.7/MathJax.js?config=TeX-MML-AM_CHTML">
</script>
<script>
    if (window.MathJax) {
        window.MathJax.Hub.Config({
            tex2jax: {
                inlineMath: [['$','$'], ['\\(','\\)']],
                processEscapes: true
            }
        });
    }
</script>
"""))

In [3]:
# ==== Make figures larger ====
%matplotlib inline
plt.rcParams['figure.dpi'] = 100

In [4]:
# ==== Parse date ====
def parse_date(datestring):
    """ Convert a date string into a pandas datetime object """
    return pd.to_datetime(datestring, format="%Y-%m-%d")

# ==== API Wrapper ====
class APIwrapper:
    # class variables shared among all instances
    _access_point="https://api.ukhsa-dashboard.data.gov.uk"
    _last_access=0.0 # time of last api access
    
    def __init__(self, theme, sub_theme, topic, geography_type, geography, metric):
        """ Init the APIwrapper object, constructing the endpoint from the structure
        parameters """
        # build the path with all the required structure parameters. You do not need to edit this line,
        # parameters will be replaced by the actual values when you instantiate an object of the class!
        url_path=(f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/" +
                  f"{geography_type}/geographies/{geography}/metrics/{metric}")
        # our starting API endpoint
        self._start_url=APIwrapper._access_point+url_path
        self._filters=None
        self._page_size=-1
        # will contain the number of items
        self.count=None

    def get_page(self, filters={}, page_size=5):
        """ Access the API and download the next page of data. Sets the count
        attribute to the total number of items available for this query. Changing
        filters or page_size will cause get_page to restart from page 1. Rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365); use the default value 
        for debugging your structure and filters. """
        # Check page size is within range
        if page_size>365:
            raise ValueError("Max supported page size is 365")
        # restart from first page if page or filters have changed
        if filters!=self._filters or page_size!=self._page_size:
            self._filters=filters.copy() #1 hour 12 min
            self._page_size=page_size
            self._next_url=self._start_url
        # signal the end of data condition
        if self._next_url==None: 
            return [] # we already fetched the last page
        # simple rate limiting to avoid bans
        curr_time=time.time() # Unix time: number of seconds since the Epoch
        deltat=curr_time-APIwrapper._last_access
        if deltat<0.33: # max 3 requests/second
            time.sleep(0.33-deltat)
        APIwrapper._last_access=curr_time
        # build parameter dictionary by removing all the None
        # values from filters and adding page_size
        parameters={x: y for x, y in filters.items() if y!=None}
        parameters['page_size']=page_size
        # the page parameter is already included in _next_url.
        # This is the API access. Response is a dictionary with various keys.
        # the .json() method decodes the response into Python object (dictionaries,
        # lists; 'null' values are translated as None).
        response = requests.get(self._next_url, params=parameters).json()
        # update url so we'll fetch the next page
        self._next_url=response['next']
        self.count=response['count']
        # data are in the nested 'results' list
        return response['results'] 

    def get_all_pages(self, filters={}, page_size=365):
        """ Access the API and download all available data pages of data. Sets the count
        attribute to the total number of items available for this query. API access rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365), and controls the trade-off
        between time to load a page and number of pages; the default should work well 
        in most cases. The number of items returned should in any case be equal to 
        the count attribute. """
        data=[] # build up all data here
        while True:
            # use get_page to do the job, including the pacing
            next_page=self.get_page(filters, page_size)
            if next_page==[]:
                break # we are done
            data.extend(next_page)
        return data

In [5]:
# ==== Access API ====
def access_api(filters, changes):
    """ Accesses the UKHSA API. Return data as a like-for-like replacement for the "canned" data loaded from the JSON file. """
    structure={"theme": "infectious_disease", 
           "sub_theme": "respiratory",
           "geography_type": "Nation", 
           "geography": "England"}
    for key, value in changes.items():
        structure[key] = value
    api = APIwrapper(**structure)
    res = api.get_all_pages(filters)
    return res # return data read from the API

In [6]:
# ==== API button callback function ====
def api_button_callback(button):
    """ Button callback - it must take the button as its parameter (unused in this case).
    Accesses API, wrangles data, updates global variable df used for plotting. """
    try:
        # Get fresh data from the 5 API Calls. Include error handling
        adenovirus_apidata = access_api(adenovirus_filters, adenovirus_changes)
        influenza_apidata = access_api(respiratory_filters, influenza_changes)
        rsv_apidata = access_api(respiratory_filters, rsv_changes)
        rhinovirus_apidata = access_api(respiratory_filters, rhinovirus_changes)
        covid_apidata = access_api(respiratory_filters, covid_changes)
        # wrangle the data and overwrite the dataframe for plotting
        global adenovirus_df
        adenovirus_df = wrangle_adenovirus(adenovirus_apidata)
        global respiratory_df
        influenza_df = wrangle_individual_respiratory(influenza_apidata, 'influenza')
        del influenza_apidata
        rsv_df = wrangle_individual_respiratory(rsv_apidata, 'rsv')
        del rsv_apidata
        rhinovirus_df = wrangle_individual_respiratory(rhinovirus_apidata, 'rhinovirus')
        del rhinovirus_apidata
        covid_df = wrangle_individual_respiratory(covid_apidata, 'covid')
        del covid_apidata
        respiratory_df = wrangle_respiratory(influenza_df, rsv_df, rhinovirus_df, covid_df)
        # Refresh Graphs and widgets
        refresh_graphs()
        # Update button:
        apibutton.description='API Refreshed'
        apibutton.icon= "check"
        apibutton.button_style = "success" 
        apibutton.disabled=True
    except Exception as e: 
        print(e)
        # Update button:
        apibutton.description='Failed - Try again'
        apibutton.icon='exclamation-triangle'
        apibutton.button_style = "danger" 
        apibutton.disabled=False
        

# ==== Define API Button ====
apibutton=wdg.Button(
    description='Refresh API', # you may want to change this...
    disabled=False,
    button_style='info', # 'success', 'info', 'warning', 'danger' or ''
    tooltip="Keep calm and carry on",
    # FontAwesome names without the `fa-` prefix - try "download"
)

# remember to register your button callback function with the button
apibutton.on_click(api_button_callback) # the name of your function inside these brackets

In [7]:
# ==== Refresh Graphs ====
def refresh_graphs():
    # Update both graphs
    refresh_graph(respiratory_year, respiratory_df)
    refresh_graph(adenovirus_year, adenovirus_df) 
    
def refresh_graph(widget, dataframe): 
    """ We change the value of the widget in order to force a redraw of the graph;
    this is useful when the data have been updated. This is a bit of a gimmick; it
    needs to be customised for one of your widgets. """
    # Get value of current widget:
    current=widget.value
    # Update options to reflect new additions:
    widget.options = dataframe.index.year.unique()
    # Refresh logic:
    if current==widget.options[0]:
        other=widget.options[1]
    else:
        other=widget.options[0]
    widget.value=other # forces the redraw
    widget.value=current # now we can change it back
    


In [8]:
# ==== Adenovirus Wrangling function ====
def wrangle_adenovirus(rawdata):
    """ Parameters: rawdata - data from json file or API call. Returns a dataframe. """
    data = {}
    # Remove data we don't need
    for entry in rawdata:
        date=entry['date']
        age=entry['age']
        value=entry['metric_value']
        if date not in data:
            data[date]={}
        if age != 'all':
            data[date][age]=value
    # Find dates:
    dates=list(data.keys())
    dates.sort()
    # Parse start and end dates
    startdate=parse_date(dates[0])
    enddate=parse_date(dates[-1])
    # Find all age groups
    ages=[]
    for entry in data.values():
        for age in entry.keys():
            if age not in ages:
                ages.append(age)
    ages.sort()
    # Create Data Frame
    index=pd.date_range(startdate, enddate, freq='W-MON')
    df=pd.DataFrame(index=index, columns=ages)
    # Populate Data Frame
    for date, entry in data.items(): 
        pd_date=parse_date(date) 
        for column in entry.keys(): 
            df.loc[date, column]=entry[column]
    # fill in any remaining "holes" due to missing dates and variants
    df.fillna(0.0, inplace=True)
    # adjust types of columns filled in
    df.infer_objects(copy=False)
    # Garbage Collection
    del data
    gc.collect()
    return df

In [9]:
# ==== Respiratory Wrangling Functions ====
def wrangle_individual_respiratory(rawdata, metric_name):
    data = {}
    # remove data that we don't need:
    for entry in rawdata:
        date = entry['date']
        value = entry['metric_value']
        data[date] = value
    # Create Dataframe:
    df = pd.DataFrame.from_dict(data, orient='index', columns=[metric_name])
    # Convert index to datetime:
    df.index = pd.to_datetime(df.index)
    df = df.sort_index()
    # Garbage collection:
    del data
    gc.collect()
    return df

In [10]:
def wrangle_respiratory(influenza_df, rsv_df, rhinovirus_df, covid_df):
    """ Parameters: rawdata - receive data from 4 json files or API calls. Returns a dataframe. """
    # CONCAT INTO 1 DF (We do an` inner join, which means that we only keep rows where all values are present)
    respiratory_df = pd.concat([influenza_df,rsv_df,rhinovirus_df,covid_df], axis = 1 ,join='inner')

    return respiratory_df

In [11]:
# ===========================================
# =========== API Function Inputs ===========
# ===========================================

In [12]:
# ==== Filters ====
adenovirus_filters={"stratum" : None, # Smallest subgroup a metric can be broken down into e.g. ethnicity, testing pillar
         "age": None, # Smallest subgroup a metric can be broken down into e.g. 15_44 for the age group of 15-44 years
         "sex": None, #  Patient gender e.g. 'm' for Male, 'f' for Female or 'all' for all genders
         "year": None, #  Epi year of the metrics value (important for annual metrics) e.g. 2020
         "month": None, # Epi month of the metric value (important for monthly metrics) e.g. 12
         "epiweek" :None, # Epi week of the metric value (important for weekly metrics) e.g. 30
         "date" : None, # The date which this metric value was recorded in the format YYYY-MM-DD e.g. 2020-07-20
         "in_reporting_delay_period": None # Boolean indicating whether the data point is considered to be subject to retrospective updates
        }

respiratory_filters={"stratum" : "default", 
                     "age": "all",
                     "sex": "all",
                     "year": None, #  Epi year of the metrics value (important for annual metrics) e.g. 2020
                     "month": None, # Epi month of the metric value (important for monthly metrics) e.g. 12
                     "epiweek" :None, # Epi week of the metric value (important for weekly metrics) e.g. 30
                     "date" : None, # The date which this metric value was recorded in the format YYYY-MM-DD e.g. 2020-07-20
                     "in_reporting_delay_period": None # Boole
                    }


In [13]:
# ==== Changes to structure ====
adenovirus_changes = { "topic": "Adenovirus",
                       "metric": "adenovirus_testing_positivityByWeek"
                     }

influenza_changes = { "topic": "Influenza",
                      "metric": "influenza_testing_positivityByWeek"
                    }

rsv_changes = { "topic": "RSV",
                "metric": "RSV_testing_positivityByWeek"
              }

rhinovirus_changes = { "topic": "Rhinovirus",
                      "metric": "rhinovirus_testing_positivityByWeek"
                    }

covid_changes = { "topic": "COVID-19",
                      "metric": "COVID-19_testing_positivity7DayRolling"
                    }

# Respiratory Disease Tracking Dashboard

The dashboard is pre-loaded with data up to and including 2024. To fetch more recent data, hit the 'Refresh API' button!

In [14]:
display(apibutton)

Button(button_style='info', description='Refresh API', style=ButtonStyle(), tooltip='Keep calm and carry on')

### Correlation between age and testing positive for Adenovirus

This chart shows how Adenovirus affects people of different age groups, over time. 

For most years, 70-80% of Adenovirus cases have been for those between the age of 0 and 14. 

In [15]:
with open("adenovirus.json", "rt") as INFILE:
    adenovirus_json=json.load(INFILE)  
    adenovirus_df = wrangle_adenovirus(adenovirus_json)
del adenovirus_json
gc.collect();        

In [None]:
# ==== Plotting Adenovirus ====
# Widgets:
adenovirus_year=wdg.Select(
    options=adenovirus_df.index.year.unique(),
    value=adenovirus_df.index.year[-1], 
    rows=1, 
    description='Year',
    disabled=False
)

def plot_adenovirus(graphyear):
    fig, ax = plt.subplots(figsize=(6, 4))
    try:
        yeardf=adenovirus_df[adenovirus_df.index.year==graphyear]
        # average the rows by month
        monthly= yeardf.groupby(pd.Grouper(freq='M')).mean()
        totals=monthly.sum(axis=1) # over the rows
        # make sure it's all normalised to 100
        monthly=monthly.div(totals, axis=0)*100
        # older dates on top of the graph
        monthly = monthly[::-1]
        monthly.plot(kind='barh', stacked=True,cmap='tab20', ax = ax) 
        ax.set_title('How Adenovirus affects different age groups');
        ax.legend(loc='center left',bbox_to_anchor=(1.0, 0.5)) # Adding a legend
        ax.set_yticklabels(monthly.index.strftime('%Y-%m-%d')) # Adding y axis labels
        display(fig)
        plt.close(fig)
    except:
        print(traceback.format_exc())

# Connect the plotting function and the widgets:
adenovirus_output=wdg.interactive_output(plot_adenovirus, {'graphyear': adenovirus_year}) 

display(adenovirus_year, adenovirus_output)


Select(description='Year', index=7, options=(2017, 2018, 2019, 2020, 2021, 2022, 2023, 2024), rows=1, value=20…

Output()

### Positive tests for Respiratory diseases
This graph tracks the percentage of positive tests for the four most common respiratory diseases. The motivation for creating this graph was to compare the effects of COVID-19 on the other three diseases.

The first case of COVID-19 in the UK was on January 29th 2020.

Between February 2020 and April 2020, we can see that a very sharp increase in the percentage of positive COVID-19 cases led to sharp decreases in the percentage of positive cases for the other three diseases. 

To view trends over a given year, select a 'Time Scale' of 'month to month', then select the relevant year. Altenatively, select a 'Time Scale' of 'year to year'

In [17]:
with open("influenza.json", "rt") as INFILE:
    influenza_json=json.load(INFILE) 
    influenza_df = wrangle_individual_respiratory(influenza_json, 'influenza')
del influenza_json
gc.collect();

with open("rsv.json", "rt") as INFILE:
    rsv_json=json.load(INFILE)
    rsv_df = wrangle_individual_respiratory(rsv_json, 'rsv')
del rsv_json
gc.collect();    
with open("rhinovirus.json", "rt") as INFILE:
    rhinovirus_json=json.load(INFILE)
    rhinovirus_df = wrangle_individual_respiratory(rhinovirus_json, 'rhinovirus')
del rhinovirus_json
gc.collect();        
with open("covid.json", "rt") as INFILE:
    covid_json=json.load(INFILE)  
    covid_df = wrangle_individual_respiratory(covid_json, 'covid')
del covid_json
gc.collect();    

In [18]:
respiratory_df = wrangle_respiratory(influenza_df, rsv_df, rhinovirus_df, covid_df)

In [None]:
# ==== Plot Respiratory ====
# Widgets:
timeScale=wdg.Select(
    options=['year to year','month to month'], 
    value= 'month to month',
    rows=1, 
    description='Time Scale:',
    disabled=False
)

respiratory_year=wdg.Select(
        options=respiratory_df.index.year.unique(), 
    value=respiratory_df.index.year[-1], 
    rows=1,
    description='Year:',
    disabled=False
)

disease=wdg.SelectMultiple(
    options=['influenza', 'rsv', 'rhinovirus', 'covid'],
    value=['influenza', 'rsv', 'rhinovirus', 'covid'],
    rows=4,
    description='Disease:', 
    disabled=False
)

right_box = wdg.VBox([timeScale, respiratory_year])
controls=wdg.HBox([disease, right_box])

def plot_respiratory(timeScale, graphYear, gcols):
    fig, ax = plt.subplots(figsize=(6, 4))
    try:
        df = respiratory_df
        # ==== YEAR TOGGLE ====
        if timeScale == 'month to month':
            # Show 'Year' toggle:
            respiratory_year.layout.visibility = 'visible'
            # Callback function:
            df = df[df.index.year == graphYear]
            # average the rows by month
            df = df.groupby(pd.Grouper(freq='M')).mean()
        else:
            # Hide 'Year' toggle:
            respiratory_year.layout.visibility = 'hidden'

        # ==== DISEASE TOGGLE and PLOT ====
        ncols=len(gcols)
        if ncols>0: 
            df[list(gcols)].plot(ax = ax) # convert to a list, then plot
            ax.set_title('Positive tests for common respiratory diseases')
            ax.set_ylabel('Percentage Tested Positive (%)')
            display(fig)
            plt.close(fig) 
        else:
            print("Click to select data for graph")
            print("(CTRL-Click (PC) or COMMAND-click (MAC) to select more than one category)")
    except:
        print(traceback.format_exc())

# Connect the plotting function and the widgets:
respiratory_output = wdg.interactive_output(plot_respiratory, {'timeScale': timeScale,'graphYear': respiratory_year, 'gcols': disease})

display(controls, respiratory_output)

HBox(children=(SelectMultiple(description='Disease:', index=(0, 1, 2, 3), options=('influenza', 'rsv', 'rhinov…

Output()

**(C) 2025 - Vu Dao** 

Based on UK Government [data](https://ukhsa-dashboard.data.gov.uk/) published by the [UK Health Security Agency](https://www.gov.uk/government/organisations/uk-health-security-agency) and on the [DIY Disease Tracking Dashboard Kit](https://github.com/fsmeraldi/diy-covid19dash) by Fabrizio Smeraldi. Released under the [GNU GPLv3.0 or later](https://www.gnu.org/licenses/).