# Influenza Tracking Dashboard - Christy Choi

In [1]:
from IPython.display import clear_output
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import requests
import ipywidgets as wdg
import time
import json


%matplotlib inline
# make figures larger
plt.rcParams["figure.dpi"] = 100


jsondata = {}

# Load the data from the JSON files
with open("tests.json", "rt") as INFILE:
    jsondata["tests"] = json.load(INFILE)
with open("icuhdu.json", "rt") as INFILE:
    jsondata["icuhdu"] = json.load(INFILE)
with open("admission.json", "rt") as INFILE:
    jsondata["admission"] = json.load(INFILE)


def wrangle_data(rawdata):
    """Parameters: rawdata - data from json file or API call. Returns a dataframe with age-specific columns."""

    data = {}
    # Define the metrics and age groups you are interested in

    age_mapping = {
        "0-4": "0-4",
        "00-04": "0-4",
        "05-14": "05-14",
        "15-44": "15-44",
        "45-54": "45-54",
        "55-64": "55-64",
        "65-74": "65-79",
        "65-79": "65-79",
        "80+": "80+",
        "85+": "80+",
        "all": "all",
    }
    age_groups = [
        "0-4",
        "05-14",
        "15-44",
        "45-54",
        "55-64",
        "65-79",
        "80+",
        "all",
    ]
    # age_groups_set = ['0-4', '00-04', '05-14', '15-44', '45-54', '55-64', '65-74', '65-79', '75-84', '80+', '85+', 'all']
    metrics = {
        "influenza_testing_positivityByWeek": "tests",
        "influenza_healthcare_ICUHDUadmissionRateByWeek": "icuhdu",
        "influenza_healthcare_hospitalAdmissionRateByWeek": "admission",
    }

    # Iterate over each dataset (tests, icuhdu, admission)
    for dataset in [
        rawdata.get("tests", []),
        rawdata.get("icuhdu", []),
        rawdata.get("admission", []),
    ]:
        for entry in dataset:
            if entry["age"] in age_mapping:
                date = entry["date"]
                age = age_mapping[entry["age"]]
                metric = metrics.get(entry["metric"])
                value = entry["metric_value"]

                if date not in data:
                    data[date] = {metric: {age: value}}
                else:
                    if age not in data[date]:
                        data[date][metric] = {age: value}
                    else:
                        data[date][metric][age] = value

    # Create a dataframe
    dates = list(data.keys())
    dates.sort()

    def parse_date(datestring):
        """Convert a date string into a pandas datetime object."""
        return pd.to_datetime(datestring, format="%Y-%m-%d")

    startdate = parse_date(dates[0])
    enddate = parse_date(dates[-1])
    # print(startdate, ' to ', enddate)

    # Set up the dataframe with a multi-level index
    index = pd.date_range(startdate, enddate, freq="D")
    columns = pd.MultiIndex.from_product(
        [["tests", "icuhdu", "admission"], age_groups],
        names=["metric_value", "age"],
    )
    # print("컬럼", columns)
    influenzadf = pd.DataFrame(index=index, columns=columns)

    # Populate the dataframe with values
    for date, metrics in data.items():
        pd_date = parse_date(date)
        for metric, age_entries in metrics.items():
            for age, value in age_entries.items():
                # print("날짜",pd_date, "메트릭", metric, "나이", age,"밸류", value)
                column_name = (
                    metric,
                    age,
                )  # Create a multi-level column name (age_group, metric)
                influenzadf.loc[pd_date, column_name] = value

    # Fill in any missing values
    influenzadf = influenzadf.astype(float)
    influenzadf.fillna(0.0, inplace=True)
    return influenzadf


# Call the function with your raw JSON data
influenzadf = wrangle_data(jsondata)  # df is the dataframe for plotting


class APIwrapper:
    # class variables shared among all instances
    _access_point = "https://api.ukhsa-dashboard.data.gov.uk"
    _last_access = 0.0  # time of last api access

    def __init__(
        self, theme, sub_theme, topic, geography_type, geography, metric
    ):
        """Init the APIwrapper object, constructing the endpoint from the structure
        parameters"""
        url_path = (
            f"/themes/{theme}/sub_themes/{sub_theme}/topics/{topic}/geography_types/"
            + f"{geography_type}/geographies/{geography}/metrics/{metric}"
        )
        # our starting API endpoint
        self._start_url = APIwrapper._access_point + url_path
        self._filters = None
        self._page_size = -1
        # will contain the number of items
        self.count = None

    def get_page(self, filters={}, page_size=5):
        """Access the API and download the next page of data. Sets the count
        attribute to the total number of items available for this query. Changing
        filters or page_size will cause get_page to restart from page 1. Rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365); use the default value
        for debugging your structure and filters."""
        # Check page size is within range
        if page_size > 365:
            raise ValueError("Max supported page size is 365")
        # restart from first page if page or filters have changed
        if filters != self._filters or page_size != self._page_size:
            self._filters = filters
            self._page_size = page_size
            self._next_url = self._start_url
        # signal the end of data condition
        if self._next_url == None:
            return []  # we already fetched the last page
        # simple rate limiting to avoid bans
        curr_time = time.time()  # Unix time: number of seconds since the Epoch
        deltat = curr_time - APIwrapper._last_access
        if deltat < 0.33:  # max 3 requests/second
            time.sleep(0.33 - deltat)
        APIwrapper._last_access = curr_time
        # build parameter dictionary by removing all the None
        # values from filters and adding page_size
        parameters = {x: y for x, y in filters.items() if y != None}
        parameters["page_size"] = page_size
        # the page parameter is already included in _next_url.
        # This is the API access. Response is a dictionary with various keys.
        # the .json() method decodes the response into Python object (dictionaries,
        # lists; 'null' values are translated as None).
        response = requests.get(self._next_url, params=parameters).json()
        # update url so we'll fetch the next page
        self._next_url = response["next"]
        self.count = response["count"]
        # data are in the nested 'results' list
        return response["results"]

    def get_all_pages(self, filters={}, page_size=365):
        """Access the API and download all available data pages of data. Sets the count
        attribute to the total number of items available for this query. API access rate
        limited to three request per second. The page_size parameter sets the number
        of data points in one response page (maximum 365), and controls the trade-off
        between time to load a page and number of pages; the default should work well
        in most cases. The number of items returned should in any case be equal to
        the count attribute."""
        data = []  # build up all data here
        while True:
            # use get_page to do the job, including the pacing
            next_page = self.get_page(filters, page_size)
            if next_page == []:
                break  # we are done
            data.extend(next_page)
        return data


def access_api():
    """Accesses the UKHSA API. Return data as a like-for-like replacement for the "canned" data loaded from the JSON file."""

    structure = {
        "theme": "infectious_disease",
        "sub_theme": "respiratory",
        "topic": "Influenza",
        "geography_type": "Nation",
        "geography": "England",
    }

    filters = {
        "stratum": None,  # Smallest subgroup a metric can be broken down into e.g. ethnicity, testing pillar
        "age": None,  # Smallest subgroup a metric can be broken down into e.g. 15_44 for the age group of 15-44 years
        "sex": None,  #  Patient gender e.g. 'm' for Male, 'f' for Female or 'all' for all genders
        "year": None,  #  Epi year of the metrics value (important for annual metrics) e.g. 2020
        "month": None,  # Epi month of the metric value (important for monthly metrics) e.g. 12
        "epiweek": None,  # Epi week of the metric value (important for weekly metrics) e.g. 30
        "date": None,  # The date which this metric value was recorded in the format YYYY-MM-DD e.g. 2020-07-20
        "in_reporting_delay_period": None,  # Boolean indicating whether the data point is considered to be subject to retrospective updates
    }

    structure["metric"] = "influenza_testing_positivityByWeek"
    api = APIwrapper(**structure)
    tests = api.get_all_pages(filters)

    structure["metric"] = "influenza_healthcare_ICUHDUadmissionRateByWeek"
    api = APIwrapper(**structure)
    icuhdu = api.get_all_pages(filters)

    structure["metric"] = "influenza_healthcare_hospitalAdmissionRateByWeek"
    api = APIwrapper(**structure)
    admission = api.get_all_pages(filters)

    return {
        "tests": tests,
        "icuhdu": icuhdu,
        "admission": admission,
    }  # return data read from the API


In [2]:
series = wdg.SelectMultiple(
    options=["tests", "icuhdu", "admission"],
    value=["tests", "icuhdu", "admission"],
    rows=3,
    description="Series:",
    disabled=False,
)

year = wdg.Select(
    # options aailable: unique years in the dataframe
    options=influenzadf.index.year.unique(),  # options available
    value=influenzadf.index.year[-1],  # initial value: most recent year
    rows=1,  # rows of the selection box
    description="Year",
    disabled=False,
)

age = wdg.Dropdown(
    options=["0-4", "05-14", "15-44", "45-54", "55-64", "65-79", "80+", "all"],
    value="all",
    rows=1,
    description="Age",
    disabled=False,
)

scale = wdg.RadioButtons(
    options=["linear", "log"],
    value="linear",
    description="Scale:",
    disabled=False,
)


def refresh_graph():
    """We change the value of the widget in order to force a redraw of the graph;
    this is useful when the data have been updated. This is a bit of a gimmick; it
    needs to be customised for one of your widgets."""

    current = scale.value
    if current == scale.options[0]:
        other = scale.options[1]
    else:
        other = scale.options[0]

    scale.value = other
    scale.value = current

    return


controls = wdg.HBox([series, year, scale, age])


In [3]:
def api_button_callback(button):
    """Button callback - it must take the button as its parameter (unused in this case).
    Accesses API, wrangles data, updates global variable df used for plotting.
    """
    # Get fresh data from the API. If you have time, include some error handling
    # around this call.
    try:
        button.icon = "spinner"
        button.button_style = "warning"
        button.description = "Fetching data..."
        button.disabled = True

        try:
            apidata = access_api()
        except Exception as e:
            print("Error fetching data from API:", e)

        # wrangle the data and overwrite the dataframe for plotting
        global influenzadf

        influenzadf = wrangle_data(apidata)

        # the graph won't refresh until the user interacts with the widget.
        # this function simulates the interaction, see Graph and Analysis below.
        # The function needs to be adapted to your graph; you can omit this call
        # in the first instance
        refresh_graph()

    except Exception as e:
        button.icon = "times"
        button.button_style = "danger"
        button.description = "Error"
        print("Error:", e)
    # after all is done, you can switch the icon on the button to a "check" sign
    # and optionally disable the button - it won't be needed again. If you are
    # implementing error handling, you can use icons "unlink" or "times" and
    # change the button text to "Unavailable" when the api call fails.
    apibutton.icon = "check"
    apibutton.disabled = True
    apibutton.button_style = "success"
    apibutton.description = "Data updated"


apibutton = wdg.Button(
    description="Refresh data",
    disabled=False,
    button_style="info",  # 'success', 'info', 'warning', 'danger' or ''
    tooltip="Click to download current Public Health England data",
    icon="refresh",
)


# remember to register your button callback function with the button
apibutton.on_click(
    api_button_callback
)  # the name of your function inside these brackets


In [4]:
def plot_influenzadf_graph(gcols, gscale, gyear, gage):
    if gscale == "linear":
        logscale = False
    else:
        logscale = True

    filterdf = influenzadf[(influenzadf.index.year == gyear)]

    selected_cols = [
        col for col in filterdf.columns if col[0] in gcols and col[1] == gage
    ]

    filterdf = filterdf[selected_cols]
    monthly = filterdf.groupby(pd.Grouper(freq="1ME")).sum()
    monthly = monthly.replace(0, 1e-10)
    totals = monthly.sum(axis=0)  # over the rows
    # make sure it's all normalised to 100
    monthly = monthly.div(totals, axis=1) * 100
    # older dates on top of the graph
    monthly = monthly[::-1]

    ax = monthly.plot(kind="line", logy=logscale)
    ax.set_title(
        "Trends in Influenza Tests, ICU/HDU Admissions, and Hospital Admissions"
    )
    ax.text(
        0.5,
        -0.25,
        "This graph shows the trends in influenza tests, ICU/HDU admissions, and hospital admissions\n"
        "for the selected age group over the selected year. The data is normalized to show percentages\n"
        "over time. The scale can be adjusted to linear or logarithmic for better visualization of trends.\n"
        "This allows you to examine how influenza affects different age groups across the months of the year.",
        horizontalalignment="center",
        verticalalignment="center",
        transform=ax.transAxes,
        fontsize=10,
        color="black",
        style="italic",
        bbox=dict(
            facecolor="white",
            alpha=0.7,
            edgecolor="black",
            boxstyle="round,pad=0.6",
        ),
    )

    ax.legend(loc="center left", bbox_to_anchor=(1.0, 0.5))
    plt.show()


output = wdg.interactive_output(
    plot_influenzadf_graph,
    {"gcols": series, "gscale": scale, "gyear": year, "gage": age},
)
display(controls, output)

display(apibutton)


HBox(children=(SelectMultiple(description='Series:', index=(0, 1, 2), options=('tests', 'icuhdu', 'admission')…

Output()

Button(button_style='info', description='Refresh data', icon='refresh', style=ButtonStyle(), tooltip='Click to…