# Import libraries

In [2]:
import requests
from requests import get
import json
import pandas as pd
from time import sleep
import datetime
import os
import sqlite3
from datetime import date, timedelta

# Connect to database and create cursor

In [3]:
# Create a database connection
conn = sqlite3.connect('Covid19.db') 
 
# Create a cursor 
c = conn.cursor() 

# Define a function to drop tables and clean database

In [3]:
def dropTables():
    # Get a list of all tables in the database
    tables = conn.execute("SELECT name FROM sqlite_master WHERE type='table'").fetchall()
    # Drop each table in the database
    for table in tables:
        conn.execute(f"DROP TABLE IF EXISTS {table[0]}")
    # Free up unused space on the db
    c.execute('VACUUM')
    conn.commit()

# Define a function to drop the database

In [4]:
# Delete the file that contains the database
def dropDB():
    if conn is not None:
        conn.close()
    os.remove('Covid19.db')

# Define a  function to run sql

In [3]:
def runSQL(sql):
    c.execute(sql)
    conn.commit()

# Define a function to add information to an error log

In [6]:
def addToErrorLog(text):
    fileName = 'errorLog.txt'
    if os.path.exists(fileName):
        with open(fileName, 'a') as f:
            f.write(text + '\n')
            f.close()
    else:
        with open(fileName, 'w') as f:
            f.write(text + '\n') 
            f.close()

# Define a function to get data from an API

In [10]:
def get_data(url,areaType=None,metricCode=None):
    
    currentDate = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
    
    try:
        response = get(endpoint, timeout=10)
        if response.status_code == 429:
            sleep(5)
            response = get(endpoint, timeout=10)
        elif response.status_code >= 400:
            raise RuntimeError()
        return response.json()
        sleep(1) # after a response has been returned, wait a second to ensure we're not sending too many requests too frequently
    except RuntimeError as rErr:
        addToErrorLog('| ' + currentDate + ': RuntimeError | AreaType: ' + areaType + ' | metricCode: ' + metricCode  + ' | Repsonse: ' + response.text + ' | ')
    except ValueError as vErr:
        addToErrorLog('| ' + currentDate + ': ValueError | AreaType: ' + areaType + ' | metricCode: ' + metricCode  + ' | Repsonse: ' + vErr.args[0] + ' | ')
    except requests.exceptions.Timeout:
        addToErrorLog('| ' + currentDate + ': ReadTimeoutError | AreaType: ' + areaType + ' | metricCode: ' + metricCode  + ' | Repsonse: | ')
        pass

# Fetch any announcements

In [8]:
# Get any announcements

endpoint = 'https://api.coronavirus.data.gov.uk/generic/announcements'
data = get_data(endpoint)
df = pd.DataFrame(data)
df.to_sql('Announcements', conn, if_exists='replace', index=False)

88

## Amend to set the id field to the primary key

In [8]:
# Create a new table with the desired schema
c.execute('''CREATE TABLE new_Announcements (
                    body TEXT,
                    date TEXT,
                    expire TEXT,
                    has_expired INTEGER,
                    id TEXT PRIMARY KEY,
                    launch TEXT
                )''')

# Step 2: Copy data from the old table to the new table
c.execute('''INSERT INTO new_Announcements (body, date, expire, has_expired, id, launch)
                    SELECT body, date, expire, has_expired, id, launch
                    FROM Announcements''')

# Step 3: Drop the old table
c.execute('DROP TABLE Announcements')

# Step 4: Rename the new table to the original table name
c.execute('ALTER TABLE new_Announcements RENAME TO Announcements')

# Commit the changes and close the connection
conn.commit()

# Areas to get details for

In [8]:
areas = ['nation','region','nhsRegion','nhsTrust','msoa','utla','ltla']

## Get area details and populate sql table

In [11]:
# Get area details

areaDetails_df = pd.DataFrame()

for i in areas:
    endpoint = 'https://api.coronavirus.data.gov.uk/generic/area/' + i
    areaData = get_data(endpoint)
    area_df = pd.DataFrame(areaData)
    area_df = area_df.assign(areaType=i)
    areaDetails_df = pd.concat([areaDetails_df,area_df],ignore_index=True)
    
# Add a primary key column named "id"
areaDetails_df['id'] = range(1, len(areaDetails_df) + 1)

areaDetails_df.to_sql('Areas',conn,if_exists='replace',index=False)

7623

# Metric categories to get details for

In [17]:
categories = ['Cases','Testing','Healthcare','Deaths','Vaccinations']

## Get metric category details

In [18]:
# Get metric details by category

metricDetails_df = pd.DataFrame()

for i in categories:
    endpoint = 'https://api.coronavirus.data.gov.uk/generic/metrics?category=' + i
    catData = get_data(endpoint)
    cat_df = pd.DataFrame(catData)
    cat_df = cat_df.assign(category=i)
    metricDetails_df = pd.concat([metricDetails_df,cat_df],ignore_index=True)
    
# Add a primary key column named "id"
metricDetails_df['id'] = range(1, len(metricDetails_df) + 1)    

# Convert the lists to strings
metricDetails_df['tags'] = metricDetails_df['tags'].apply(lambda x: ','.join(x))
metricDetails_df.to_sql('MetricDetails',conn,if_exists='replace',index=False)

221

# Fetch the metric details per area and metric detail

In [13]:
areas = ['nation']

In [14]:
# loop through the areaTypes and metrics and append to a dataframe and output

metricData_df = pd.DataFrame() 

for i in areas:
    
    for index,row in metricDetails_df.iterrows():

        metricCode = row['metric']

        areaType = i

        endpoint = (
                'https://api.coronavirus.data.gov.uk/v1/data?'
                'filters=areaType=' + areaType + '&'
                'structure={"date":"date","areaCode":"areaCode","metricValue":"' +  metricCode + '"}'
            )


        data = get_data(endpoint,areaType=areaType,metricCode=metricCode)

        if data is None:
            continue
        else:
            df = pd.DataFrame(data["data"])
            # check if any data in the dataframe is a dictionary (need to handle these appropriately)
            for col in df.columns:
                if any(isinstance(x, dict) for x in df[col]):
                    addToErrorLog('| DictError: ' + currentDate + ' | AreaType: ' + areaType + ' | metricCode: ' + metricCode  + ' | Repsonse: ' + response.text + ' | ')
                    continue
            df = df.astype(str)
            df = df.assign(metricCode=metricCode)
            df = df.assign(areaType=areaType)
            df.to_sql('Metrics',conn,if_exists='append',index=False)

# Create a date table

In [35]:
# Create date table
c.execute('''CREATE TABLE Dates
             (id INTEGER PRIMARY KEY,
              date TEXT,
              weekEnding TEXT,
              monthEnding TEXT,
              monthName TEXT,
              dayName TEXT,
              year INTEGER,
              month INTEGER)''')

# Insert dates into table
start_date = date(2020, 1, 1)
end_date = date(2025, 12, 31)
delta = timedelta(days=1)

while start_date <= end_date:
    end_of_week = (start_date + timedelta(days=(6 - start_date.weekday()))).strftime("%Y-%m-%d")
    end_of_month = (date(start_date.year, start_date.month, 1) + timedelta(days=32)).replace(day=1) - timedelta(days=1)
    month_name = start_date.strftime("%B")
    day_name = start_date.strftime("%A")
    c.execute('''INSERT INTO Dates (date
              , weekEnding
              , monthEnding
              , monthName
              , dayName
              , year
              , month) 
              VALUES (?, ?, ?, ?, ?, ?, ?)'''
              , (start_date.strftime("%Y-%m-%d")
                 , end_of_week
                 , end_of_month.strftime("%Y-%m-%d")
                 , month_name
                 , day_name
                , start_date.year
                , start_date.month)
             )
    start_date += delta

# Commit changes and close connection
conn.commit()

In [37]:
sql = 'select * from Dates limit 10'
c.execute(sql)
conn.commit()
c.fetchall()

[(1,
  '2020-01-01',
  '2020-01-05',
  '2020-01-31',
  'January',
  'Wednesday',
  2020,
  1),
 (2, '2020-01-02', '2020-01-05', '2020-01-31', 'January', 'Thursday', 2020, 1),
 (3, '2020-01-03', '2020-01-05', '2020-01-31', 'January', 'Friday', 2020, 1),
 (4, '2020-01-04', '2020-01-05', '2020-01-31', 'January', 'Saturday', 2020, 1),
 (5, '2020-01-05', '2020-01-05', '2020-01-31', 'January', 'Sunday', 2020, 1),
 (6, '2020-01-06', '2020-01-12', '2020-01-31', 'January', 'Monday', 2020, 1),
 (7, '2020-01-07', '2020-01-12', '2020-01-31', 'January', 'Tuesday', 2020, 1),
 (8,
  '2020-01-08',
  '2020-01-12',
  '2020-01-31',
  'January',
  'Wednesday',
  2020,
  1),
 (9, '2020-01-09', '2020-01-12', '2020-01-31', 'January', 'Thursday', 2020, 1),
 (10, '2020-01-10', '2020-01-12', '2020-01-31', 'January', 'Friday', 2020, 1)]

In [15]:
sql = 'select date, areaCode, areaType, metricCode, metricValue from Metrics limit 10'
c.execute(sql)
conn.commit()
c.fetchall()

[('2023-04-26',
  'E92000001',
  'nation',
  'changeInNewCasesBySpecimenDate',
  '942'),
 ('2023-04-25',
  'E92000001',
  'nation',
  'changeInNewCasesBySpecimenDate',
  '1426'),
 ('2023-04-24',
  'E92000001',
  'nation',
  'changeInNewCasesBySpecimenDate',
  '1630'),
 ('2023-04-23',
  'E92000001',
  'nation',
  'changeInNewCasesBySpecimenDate',
  '1391'),
 ('2023-04-22',
  'E92000001',
  'nation',
  'changeInNewCasesBySpecimenDate',
  '1162'),
 ('2023-04-21',
  'E92000001',
  'nation',
  'changeInNewCasesBySpecimenDate',
  '1291'),
 ('2023-04-20',
  'E92000001',
  'nation',
  'changeInNewCasesBySpecimenDate',
  '1448'),
 ('2023-04-19',
  'E92000001',
  'nation',
  'changeInNewCasesBySpecimenDate',
  '695'),
 ('2023-04-18',
  'E92000001',
  'nation',
  'changeInNewCasesBySpecimenDate',
  '169'),
 ('2023-04-17',
  'E92000001',
  'nation',
  'changeInNewCasesBySpecimenDate',
  '135')]

In [3]:
tableName = 'Areas'
c.execute(f"PRAGMA table_info({tableName})")
columns = [col[1] for col in c.fetchall()]
print(columns)

['areaCode', 'areaName', 'areaType']


In [12]:
sql = 'select * from Areas limit 10'
c.execute(sql)
conn.commit()
c.fetchall()

[('E92000001', 'England', 'nation', 1),
 ('N92000002', 'Northern Ireland', 'nation', 2),
 ('S92000003', 'Scotland', 'nation', 3),
 ('K02000001', 'United Kingdom', 'nation', 4),
 ('W92000004', 'Wales', 'nation', 5),
 ('E12000004', 'East Midlands', 'region', 6),
 ('E12000006', 'East of England', 'region', 7),
 ('E12000007', 'London', 'region', 8),
 ('E12000001', 'North East', 'region', 9),
 ('E12000002', 'North West', 'region', 10)]

In [42]:
# Execute the PRAGMA statement to get the data types and lengths of all fields in the table
c.execute('PRAGMA table_info(Areas)')

# Fetch all rows of the result set
rows = c.fetchall()

# Loop through each row and print the data type and length of the field
for row in rows:
    print(row[1], row[2])

areaCode TEXT
areaName TEXT
areaType TEXT


In [13]:
sql = 'select * from MetricDetails limit 10'
c.execute(sql)
conn.commit()
c.fetchall()

[('Cases',
  None,
  '2022-09-08',
  'changeInNewCasesBySpecimenDate',
  'Change in new cases by specimen date',
  'event date'),
 ('Cases',
  None,
  '2022-09-08',
  'cumCasesByPublishDate',
  'Cumulative cases by publish date',
  'cumulative,reporting date'),
 ('Cases',
  None,
  '2022-09-08',
  'cumCasesByPublishDateRate',
  'Cumulative cases by publish date rate',
  'cumulative,incidence rate,reporting date'),
 ('Cases',
  None,
  '2022-09-08',
  'cumCasesBySpecimenDate',
  'Cumulative cases by specimen date',
  'cumulative,event date'),
 ('Cases',
  None,
  '2022-09-08',
  'cumCasesBySpecimenDateRate',
  'Cumulative cases by specimen date rate',
  'cumulative,event date,incidence rate'),
 ('Cases',
  None,
  '2022-09-08',
  'cumCasesLFDConfirmedPCRBySpecimenDate',
  'Cumulative cases LFD confirmed by PCR by specimen date',
  'cumulative,event date'),
 ('Cases',
  None,
  '2022-09-08',
  'cumCasesLFDOnlyBySpecimenDate',
  'Cumulative cases LFD-only by specimen date',
  'cumulative,

In [14]:
# Execute the PRAGMA statement to get the data types and lengths of all fields in the table
c.execute('PRAGMA table_info(MetricDetails)')

# Fetch all rows of the result set
rows = c.fetchall()

# Loop through each row and print the data type and length of the field
for row in rows:
    print(row[1], row[2])

category TEXT
deprecated TEXT
doc_last_modified TEXT
metric TEXT
metric_name TEXT
tags TEXT


In [4]:
# Execute the PRAGMA statement to get the data types and lengths of all fields in the table
c.execute('PRAGMA table_info(Announcements)')

# Fetch all rows of the result set
rows = c.fetchall()

# Loop through each row and print the data type and length of the field
for row in rows:
    print(row[1], row[2])

body TEXT
date TEXT
expire TEXT
has_expired INTEGER
id TEXT
launch TEXT


In [20]:
sql = 'select * from Areas where areaType = "nation" limit 10'
c.execute(sql)
conn.commit()
c.fetchall()

[('E92000001', 'England', 'nation', 1),
 ('N92000002', 'Northern Ireland', 'nation', 2),
 ('S92000003', 'Scotland', 'nation', 3),
 ('K02000001', 'United Kingdom', 'nation', 4),
 ('W92000004', 'Wales', 'nation', 5)]