## Dashboard

In [6]:
from IPython.display import clear_output
import ipywidgets as wdg
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import time
import json

In [7]:
%matplotlib inline
# make figures larger
plt.rcParams['figure.dpi'] = 100

## Load initial data from disk

In [8]:
# Load JSON files and store the raw data in some variable. Edit as appropriate
#jsondata={}
with open("cases.json", "rt") as INFILE:
    resistance_cases=json.load(INFILE)

In [9]:
with open("testing.json", "rt") as INFILE:
    number_testing=json.load(INFILE)

In [10]:
with open("percent_testing.json", "rt") as INFILE:
    percent_testing=json.load(INFILE)

## Wrangle the data¶

In [11]:
def parse_date(datestring):
    """ Convert a date string into a pandas datetime object """
    return pd.to_datetime(datestring, format="%Y-%m-%d")

In [12]:
def wrangle_data(resistance_cases,number_testing,percent_testing):
    """ Parameters: rawdata - data from json file or API call. Returns a dataframe.
    Edit to include the code that wrangles the data, creates the dataframe and fills it in. """
    names={'k-pneumoniae_testing_bacteraemiaPercentResistantRollingMonth':'Percent_resistance',
       'k-pneumoniae_testing_bacteraemiaPercentTestedRollingMonth':'Percent_tested',
       'k-pneumoniae_testing_bacteraemiaNumberTestedRollingMonth':'Number_tested'}
    data={}
    for dataset in [resistance_cases,number_testing,percent_testing]:
        for entry in dataset:
            date=entry['date']
            metric=names[entry['metric']]
            value=entry['metric_value']
            drug=entry['stratum']
            if metric not in data:
                data[metric]={}
            if date not in data[metric]:
                data[metric][date]={}
            data[metric][date][drug]=value
    dfs = {}
    for metric, metric_data in data.items(): #returns using tuple unpacking like ("Percent_resistance", {...}) and ("Percent_tested", {...}) because using .items results in tuples in the data dictionary of (metric_type,metric_data{....}_
        dates=sorted(metric_data.keys())
        df = pd.DataFrame.from_dict(metric_data, orient='index')
        df.index = pd.to_datetime(df.index)
        df.sort_index(inplace=True)
        df.fillna(0.0, inplace=True)
        dfs[metric] = df
    return dfs

dfs = wrangle_data(resistance_cases, number_testing, percent_testing)

number_tested_df = dfs['Number_tested']
percent_tested_df = dfs['Percent_tested']
percent_resistance_df = dfs['Percent_resistance']


# putting the wrangling code into a function allows you to call it again after refreshing the data through 
# the API. You should call the function directly on the JSON data when the dashboard starts, by including 
# the call in this cell as below:
df=wrangle_data(resistance_cases,number_testing,percent_testing) # df is the dataframe for plotting

## Download the current data

In [13]:
import requests
import time

class APIwrapper:
    # class variables shared among all instances
    _access_point="https://api.ukhsa-dashboard.data.gov.uk"
    _last_access=0.0 # time of last api access
    
    def __init__(self, theme, sub_theme, topic, geography_type, geography, metric):
        """ Init the APIwrapper object, constructing the endpoint from the structure
        parameters """
        # build the path with all the required structure parameters. You do not need to edit this line,
        # parameters will be replaced by the actual values when you instantiate an object of the class!
        url_path=(f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/" +
                  f"{geography_type}/geographies/{geography}/metrics/{metric}")
        # our starting API endpoint
        self._start_url=APIwrapper._access_point+url_path
        self._filters=None
        self._page_size=-1
        # will contain the number of items
        self.count=None

    def get_page(self, filters={}, page_size=7): #good for exploring, by default it gets 7 pages - useful for getting and trying different metrics and understanding results
        """ Access the API and download the next page of data. Sets the count
        attribute to the total number of items available for this query. Changing
        filters or page_size will cause get_page to restart from page 1. Rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365); use the default value 
        for debugging your structure and filters. """
        # Check page size is within range
        if page_size>365:
            raise ValueError("Max supported page size is 365")
        # restart from first page if page or filters have changed
        if filters!=self._filters or page_size!=self._page_size:
            self._filters=filters
            self._page_size=page_size
            self._next_url=self._start_url
        # signal the end of data condition
        if self._next_url==None: 
            return [] # we already fetched the last page
        # simple rate limiting to avoid bans
        curr_time=time.time() # Unix time: number of seconds since the Epoch
        deltat=curr_time-APIwrapper._last_access
        if deltat<0.33: # max 3 requests/second
            time.sleep(0.33-deltat)
        APIwrapper._last_access=curr_time
        # build parameter dictionary by removing all the None
        # values from filters and adding page_size
        parameters={x: y for x, y in filters.items() if y!=None}
        parameters['page_size']=page_size
        # the page parameter is already included in _next_url.
        # This is the API access. Response is a dictionary with various keys.
        # the .json() method decodes the response into Python object (dictionaries,
        # lists; 'null' values are translated as None).
        response = requests.get(self._next_url, params=parameters).json()
        # update url so we'll fetch the next page
        self._next_url=response['next']#changes url to next page - each time you call  get page it will give you next new page
        self.count=response['count']#set number of counts you have
        # data are in the nested 'results' list
        return response['results'] 

    def get_all_pages(self, filters={}, page_size=365): #for retrieving all the data as one big list - more convenient
        """ Access the API and download all available data pages of data. Sets the count
        attribute to the total number of items available for this query. API access rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365), and controls the trade-off
        between time to load a page and number of pages; the default should work well 
        in most cases. The number of items returned should in any case be equal to 
        the count attribute. """
        data=[] # build up all data here
        while True:
            # use get_page to do the job, including the pacing
            next_page=self.get_page(filters, page_size)
            if next_page==[]:
                break # we are done
            data.extend(next_page)
        return data

In [18]:
with open("cases.json", "wt") as OUTF: #open a file for writing in text - and writes in json format = now sved into disc - you can also get more data for your dashboard
    json.dump(cases, OUTF)

In [19]:
with open("percent_testing.json", "wt") as OUTF:
    json.dump(percent_testing, OUTF)

In [20]:
with open("testing.json", "wt") as OUTF:
    json.dump(testing, OUTF)

In [23]:
# Place your API access code in this function. Do not call this function directly; it will be called by 
# the button callback. 
def access_api():
    """ Accesses the UKHSA API. Return data as a like-for-like replacement for the "canned" data loaded from the JSON file. """
    structure={"theme": "infectious_disease", 
           "sub_theme": "antimicrobial_resistance",
           "topic": "K-pneumoniae",
           "geography_type": "Nation", 
           "geography": "England"}
    #puts together as dictcionary

    # Antimicrobial resistance in Klebsiella pneumoniae, percent of resistance by month
    structure["metric"]="k-pneumoniae_testing_bacteraemiaPercentResistantRollingMonth" 
    api=APIwrapper(**structure)
    cases=api.get_all_pages() #once you are done exploring and investigating the metrics, you can do this - or you can get page and look like code below
    print(f"Data points expected: {api.count}")
    print(f"Data points retrieved: {len(cases)}")

    structure["metric"]="k-pneumoniae_testing_bacteraemiaPercentTestedRollingMonth"
    api=APIwrapper(**structure)
    percent_testing=api.get_all_pages()
    print(f"Data points expected: {api.count}")
    print(f"Data points retrieved: {len(percent_testing)}")
    
    structure["metric"]="k-pneumoniae_testing_bacteraemiaNumberTestedRollingMonth"
    api=APIwrapper(**structure)
    testing=api.get_all_pages()
    print(f"Data points expected: {api.count}")
    print(f"Data points retrieved: {len(testing)}")
    
    return {
        "resistance": cases,
        "percent_testing": percent_testing,
        "testing": testing
}
         # return data read from the API

In [24]:
# Printout from this function will be lost in Voila unless captured in an
# output widget - therefore, we give feedback to the user by changing the 
# appearance of the button
def api_button_callback(button):
    """ Button callback - it must take the button as its parameter (unused in this case).
    Accesses API, wrangles data, updates global variable df used for plotting. """
    # Get fresh data from the API. If you have time, include some error handling
    # around this call.
    apidata=access_api()
    # wrangle the data and overwrite the dataframe for plotting
    global df
    dfs=wrangle_data(apidata)
    # the graph won't refresh until the user interacts with the widget.
    # this function simulates the interaction, see Graph and Analysis below.
    # The function needs to be adapted to your graph; you can omit this call
    # in the first instance
    refresh_graph()
    # after all is done, you can switch the icon on the button to a "check" sign
    # and optionally disable the button - it won't be needed again. If you are 
    # implementing error handling, you can use icons "unlink" or "times" and 
    # change the button text to "Unavailable" when the api call fails.
    apibutton.icon="check"
    # apibutton.disabled=True




apibutton=wdg.Button(
    description='Refresh', # you may want to change this...
    disabled=False,
    button_style='info', # 'success', 'info', 'warning', 'danger' or ''
    tooltip="Click to graph new data",
    # FontAwesome names without the `fa-` prefix - try "download"
    icon='sync'
)

# remember to register your button callback function with the button
apibutton.on_click(api_button_callback) # the name of your function inside these brackets

display(apibutton)

# run all cells before clicking on this button

Button(button_style='info', description='Refresh', icon='sync', style=ButtonStyle(), tooltip='Click to graph n…

## Graphs and analysis

In [25]:
series=wdg.SelectMultiple(
    options=['Piperacillin with tazobactam', 'Gentamicin', 'Ciprofloxacin', 'Co-amoxiclav', 'Third Generation Cephalosporins', 'Carbapenems', 'Amikacin'],
    value=['Piperacillin with tazobactam', 'Gentamicin', 'Ciprofloxacin', 'Co-amoxiclav', 'Third Generation Cephalosporins', 'Carbapenems', 'Amikacin'],
    rows=7,
    description='Drug:',
    disabled=False
)

scale=wdg.RadioButtons(
    options=['linear', 'log'],
#   value='pineapple', # Defaults to 'pineapple'
#   layout={'width': 'max-content'}, # If the items' names are long
    description='Stats:',
    disabled=False
)

# try replacing HBox with a VBox
controls=wdg.HBox([series, scale])

def timeseries_graph(gcols, gscale):
    if gscale=='linear':
        logscale=False
    else:
        logscale=True
    ncols=len(gcols)
    if ncols>0:
        percent_resistance_bar=percent_resistance_df[list(gcols)].plot(logy=logscale)
        percent_resistance_bar.set_ylabel("Percentage resistance of K.pneumoniae")
        plt.show() # important - graphs won't update if this is missing 
        print("(CTRL-Click to select more than one category)")
    else:
        print("Click to select data for graph")
        print("(CTRL-Click to select more than one category)")

# --- the refresh function ---
def refresh_graph():
    """Force the graph to refresh after the data (percent_resistance_df) changes."""
    current_scale = scale.value
    # Flip the scale temporarily to force a re-render
    if current_scale == 'linear':
        other = 'log'
    else:
        other = 'linear'
    scale.value = other
    scale.value = current_scale


# keep calling timeseries_graph(gcols=value_of_series, gscale=value_of_scale); 
# capture output in widget graph   
graph=wdg.interactive_output(timeseries_graph, {'gcols': series, 'gscale': scale})

display(controls, graph)


HBox(children=(SelectMultiple(description='Drug:', index=(0, 1, 2, 3, 4, 5, 6), options=('Piperacillin with ta…

Output()

In [29]:
year=wdg.Select(
    options=number_tested_df.index.year.unique(), # options available
    value=number_tested_df.index.year[-1], # initial value: most recent year
    rows=1, # rows of the selection box
    description='Year',
    disabled=False
)

controls=wdg.HBox([series])

def lineage_graph(graphyear):
    yeardf=number_tested_df[number_tested_df.index.year==graphyear] #only gets the dataframe for the year selected by user
    monthly = yeardf.resample('ME').mean()
    number_tested_bar=monthly.plot(kind='bar', figsize=(12,6), width=0.8)
    number_tested_bar.set_xticklabels([d.strftime('%b') for d in monthly.index], rotation=45)
    number_tested_bar.set_title(f'Number of Klebsiella pneumoniae cases tested per month in {graphyear}')
    number_tested_bar.set_ylabel('Number of Cases')
    plt.tight_layout()
    plt.show()

output = wdg.interactive_output(lineage_graph, {'graphyear': year})
display(year, output)


Select(description='Year', index=5, options=(2020, 2021, 2022, 2023, 2024, 2025), rows=1, value=2025)

Output()