### Script to scrape published data from NHSD webpages and output a collated list of CSV files

In [None]:
import requests, os
from bs4 import BeautifulSoup
import re
from datetime import datetime
from urllib.parse import urlparse
from urllib.request import urlopen
import os
from io import BytesIO
from zipfile import ZipFile
from pathlib import Path
import shutil

In [None]:
check_url = 'https://digital.nhs.uk/data-and-information/publications/statistical/nhs-workforce-statistics'
file_source_url = 'https://digital.nhs.uk'

response = requests.get(check_url)

soup = BeautifulSoup(response.content, 'html.parser')
past_links = soup.find( id="past-publications").find_all(href=re.compile("publications/statistical/nhs-workforce-statistics/"))
latest_link = soup.find( id="latest-statistics").find_all(href=re.compile("publications/statistical/nhs-workforce-statistics/"))

In [None]:
latest_link

In [None]:
past_links.append(latest_link[0])

In [None]:
def retrieve_sub_url(thesoup, data_type, file_types = ['csv','xlsx'], hierachy = True):
    # if hierachy is set to True then only return results in order of file_types

    test = False

    if test:
        print(f"data type is: {data_type}")
        print(file_types)
        #print(soup)

    csv_search = thesoup.find_all(href=re.compile("\.csv$"))
    xlsx_search = thesoup.find_all(href=re.compile("\.xlsx$"))
    zip_search = thesoup.find_all(href=re.compile("\.zip$"))

    if test:
        print(zip_search)
        
    file_dict = {
        'csv': csv_search,
        'xlsx': xlsx_search,
        'zip' : zip_search
    }
    
    result = []

    for x in file_types:
        res = [y['href'] for y in file_dict[x] if data_type in y['href'].lower()]
        result.extend(res)    
        if test:
            print(res)
        if hierachy and len(result) > 0:
            break
    

        
    try:
        return(result)
    except:
        pass

In [None]:
def retrieve_stats_urls(url_string, file_source_url, data_types, file_types = ['csv', 'xlsx']):
    test = False
    
    x = url_string.split('/')
    if test:
        print(x[-1])

    date_string = x[-1]
    regex_date_string = [m.group() for m in re.finditer("((january|february|march|april|may|june|july|august|september|october|november|december)-\d{4})", date_string)]

    if test:
        print(regex_date_string)

    result_dict = {}

    try:
        # Retrieve find date, which takes account of URLs with a date range within
        formatted_date = datetime.strptime(regex_date_string[-1], "%B-%Y")
        if test:
            print(formatted_date)
        full_url = file_source_url+url_string
        if test:
            print(full_url)
        response = requests.get(full_url)
        soup = BeautifulSoup(response.content, 'html.parser')

        result_list = []

        for data_type in data_types:
            if test:
                print(f"checking {data_type}")
                
            dt = retrieve_sub_url(soup, data_type, file_types)
            
            if test:
                print(dt)
                
            result_list.append(dt)
            
        dictionary = dict(zip(data_types, result_list))
        dictionary.update({
            'the_date': formatted_date
        })

        #print(dictionary)
        return dictionary
    except:
        print('Could not format date')
        pass

In [None]:
past_links[1:10]


In [None]:
# Testing 
# reason_stats_urls_list = []
# test_links_href  = "/data-and-information/publications/statistical/nhs-sickness-absence-rates/nhs-sickness-absence-rates-february-2017"

# get_url = retrieve_stats_urls(test_links_href, file_source_url, ["reason", "rate", "covd"])
# print(get_url)

In [None]:
stats_urls_list = []

for link in past_links:
    #print(link['href'])
    get_urls = retrieve_stats_urls(link['href'], file_source_url, ["turnover"], ['zip'])
    if get_urls is not None:
        stats_urls_list.append(get_urls)


In [None]:
stats_urls_list

In [None]:
def check_and_create_dir(parent_directory, directories):
    if not os.path.exists(parent_directory):
        os.mkdir(parent_directory)

    # Create the subdirectories if they do not exist
    for directory in directories:
        directory_path = os.path.join(parent_directory, directory)
        if not os.path.exists(directory_path):
            os.mkdir(directory_path)

In [None]:
# Define the directory names
directories = ['turnover', 'zip']

# Specify the parent directory (tempdir) in the working directory
parent_directory = 'tempdir'

# # Check if the parent directory exists, and if not, create it
check_and_create_dir(parent_directory, directories)

In [None]:
len(stats_urls_list)

In [None]:
def download_zip_url(url_to_file, thedate, folder, temp_dir="tempdir/zip"):
    # We need to expand and process zip files
    http_response = urlopen(url_to_file)
    zipfile = ZipFile(BytesIO(http_response.read()))
    zipfile.extractall(path=temp_dir)

    # Get the files from the path provided in the OP
    files = Path(temp_dir).glob('*.csv')
    for f in files:
       #print(f)
        file_name = 'tempdir'
        additional_type = ""

        if "annual" in f.as_posix().lower():
            rate_type = "-annual"
        elif "monthly" in f.as_posix().lower():
            rate_type = '-monthly'
        additional_type = rate_type

        file_name = file_name + "/" + folder + "/" + thedate.strftime("%Y-%m-%d") + "-" + folder + additional_type + ".csv"

        #print(f.as_posix())
        #os.rename(f"tempdir/{f.as_posix()}", file_name)
        shutil.move(f.as_posix(),file_name)


In [None]:
def download_url(url_to_file, thedate, folder):

    # Set suffix for downloaded file
    suffix = "csv"
    if "xlsx" in url_to_file.lower():
        suffix = "xlsx"

    # Set filename
    
    file_name = "tempdir"
    additional_type = ""

    if folder == 'rate':
        rate_type = "-monthly"
        if "annual" in url_to_file.lower():
            rate_type = "-annual"
        elif "quarterly" in url_to_file.lower():
            rate_type = '-quarterly'
        additional_type = rate_type
    elif folder == "reason" and "mds" in url_to_file.lower():
        additional_type = '-mds'

    file_name = file_name + "/" + folder + "/" + thedate.strftime("%Y-%m-%d") + "-" + folder + additional_type + "." + suffix

    response = requests.get(url_to_file)

    with open(file_name, 'wb') as file:
        file.write(response.content)

In [None]:
stats_urls_list

In [None]:
for m in stats_urls_list:
    #print(link['href'])
    # NOTE: Absence by reason not available before April 2019
    if len(m['turnover']) > 0:
        for i in m['turnover']:
            download_zip_url(i, m['the_date'], "turnover")

# About a minute to download all files