In [21]:
import requests
import os
import json
from zipfile import ZipFile
import pandas as pd
import geopandas as gpd
from pathlib import Path
from datetime import date
import shutil

In [3]:
# Set location we're working in
location = "home"

In [4]:
# Sets location to work from
if location == "home":
    folder_location = "C:/Users/Lara/Work/DataDownloads/"
    
if location == "work":
    folder_location = "O:/Data_team/GIS_data_downloads/"

download_location = f"{folder_location}testData/"
temp_download_location = f"{folder_location}testDataTemp/"
lookup_location = f"{folder_location}Lookups/"
item_url_file = "OS-API_URL.csv"
boundary_file = f"{folder_location}CountyBoundary.shp"

In [5]:
# Read in Wiltshire boundary file
boundary_gdf = gpd.read_file(boundary_file)

In [6]:
# Read the csv file
item_url_df = pd.read_csv(lookup_location + item_url_file)
item_url_df.tail(12)

Unnamed: 0,Dataset,DatasetProductName,URL,Format,Source
0,MiniScale,MiniScale,https://api.os.uk/downloads/v1/products/MiniSc...,"Zip file (containing EPS, Illustrator and TIFF...",OS
1,1:250 000 Scale Colour Raster,250kScaleColourRaster,https://api.os.uk/downloads/v1/products/250kSc...,TIFF-LZW,OS
2,Boundary-Line,BoundaryLine,https://api.os.uk/downloads/v1/products/Bounda...,GeoPackage,OS
3,Code-Point,CodePointOpen,https://api.os.uk/downloads/v1/products/CodePo...,GeoPackage,OS
4,OS Open Zoomstack,OpenZoomstack,https://api.os.uk/downloads/v1/products/OpenZo...,GeoPackage,OS
5,OS Open Greenspace,OpenGreenspace,https://api.os.uk/downloads/v1/products/OpenGr...,GeoPackage,OS
6,OS OpenMap - Local,OpenMapLocal,https://api.os.uk/downloads/v1/products/OpenMa...,GeoPackage,OS
7,OS Open Names,OpenNames,https://api.os.uk/downloads/v1/products/OpenNa...,GeoPackage,OS
8,OS Open Rivers,OpenRivers,https://api.os.uk/downloads/v1/products/OpenRi...,GeoPackage,OS
9,OS Open Roads,OpenRoads,https://api.os.uk/downloads/v1/products/OpenRo...,GeoPackage,OS


In [7]:
# get list of OS open data products available to download https://docs.os.uk/os-apis/accessing-os-apis/os-downloads-api/technical-specification/opendata-products
url = 'https://api.os.uk/downloads/v1/products'

response = requests.get(url)
product_list = response.json()

In [8]:
# Information about Boundary line data, and which layers to clip/ intersect 
wilts_clip = {
    "BoundaryLine":{
     "polling_districts_england":"intersect",
     "parish":"intersect",
     "westminster_const":"intersect",
     "unitary_electoral_division":"intersect"
     }
}

In [45]:
# Define functions

def process_250kScaleColourRaster_wilts(source:str, dataset:str)->None:
    """
    Processes 250kScaleColourRaster data for wilts by selecting relevant Tiff files and copying to Wilts folder

    Args:
        source (str): Source of dataset
        dataset (str): Dataset name
        
    """
    # Data location of tif files
    data_loc = Path(f"{download_location}Original/{source}/{dataset}/")
    # Location for wilts file
    wilts_folder_location = Path(str(data_loc).replace("Original", "Wiltshire"))
    wilts_folder_location.mkdir(parents=True, exist_ok=True)
    # List all tif files in original location
    tif_list = list(data_loc.rglob("*.tif"))
    # Extract only tif files which are relevant area and copy to wilts folder
    for tif in tif_list:
        if tif.stem in ["SU", "ST", "SP", "SO"]:
            wilts_file_location = wilts_folder_location / tif.name # Path(f"{wilts_folder_location}/{tif.stem}.tif")
            shutil.copy(tif,wilts_folder_location )
            
def process_boundaryline_wilts(gpd_layers:pd.DataFrame, file:Path, wilts_file_location:Path)-> None:
    """
    Processes boundary line data for Wilts and writes to Geopackage

    Args:
        gpd_layers (DataFrame): Layers in geopackage
        file (str): Geospatial file to read
        wilts_file_location (str): Output location for wilts dataset

        
    """
    
    count = 0
    # Loop through each layer in geopackage
    for i, layer in enumerate(gpd_layers.name):
        # Layers not listed in dict should not have a wiltshire version
        if layer not in wilts_clip['BoundaryLine']:
            continue
        # Read relevant layer
        gdf = gpd.read_file(file, layer = layer)
        # If layer listed as clip, clip layer to Wilts boundary
        if wilts_clip['BoundaryLine'][layer]== "clip":
            gdf_wilts = gpd.clip(gdf, boundary_gdf)
        # If layer listed as intersect, intersect with Wilts boundary
        elif wilts_clip['BoundaryLine'][layer]== "intersect":
            target_geom= boundary_gdf.geometry.iat[0]
            gdf_wilts = gdf[gdf.geometry.intersects(target_geom)]
            # gdf_wilts = gdf.intersection(boundary_gdf)
        # Check that geodataframe contains data
        if gdf_wilts.empty == False:
            # Write or append geopackage layer
            gdf_wilts.to_file(wilts_file_location,layer = layer, driver = "GPKG", mode = "w" if count==0 else "a")
            count=+1
    

def process_geopackage_wilts(folder_name_wilts:str, source:str , dataset:str , metadata:dict )-> None:
    """
    Processes original datasets into Wilts, geopackages only, and writes to geopackage

    Args:
        folder_name_wilts (str): Output location for wilts dataset
        source (str): Source of dataset
        dataset (str): Dataset name
        metadata (dict): Metadata information including version
        
    """
    # Get list of spatial files downloaded for specific datatset
    data_loc = Path(f"{download_location}Original/{source}/{dataset}")
    shp_list = list(data_loc.rglob("*.gpkg"))
    # Loop through each spatial file
    for file in shp_list:
        # Check folder destination exists, and if not create it
        wilts_file_location = Path(str(file).replace("Original", "Wiltshire"))
        if not os.path.exists(wilts_file_location.parent):
            os.makedirs(wilts_file_location.parent)
        # Remove file if it already exists as geopackage writes onto existing file
        if os.path.exists(wilts_file_location):
            os.remove(wilts_file_location)
        # Loop through each layer in file, clip to wilts and write to new location
        gpd_layers = gpd.list_layers(file)
        # Need separate code for boundary line, as handle each layer in geopackage differently (clip or interesect)
        if dataset == "BoundaryLine":
            process_boundaryline_wilts(gpd_layers, file, wilts_file_location)
        else:
            count = 0
            for i, layer in enumerate(gpd_layers.name):
                gdf = gpd.read_file(file, layer = layer)
                gdf_wilts = gpd.clip(gdf, boundary_gdf)
                # Check that geodataframe contains data
                if gdf_wilts.empty == False:
                    # Write or append geopackage layer
                    gdf_wilts.to_file(wilts_file_location,layer = layer, driver = "GPKG", mode = "w" if count==0 else "a")
                    count=+1
        
        with open(folder_name_wilts+"metadata.json", mode="w") as file:
            json.dump(metadata,file)

In [46]:
#Loop through each dataset name in lookup
for dataset in item_url_df.DatasetProductName.head(2):
    print(f"Dataset: {dataset}")
    # Get metadata for dataset
    dataset_details_list = [d for d in product_list if d['id'] in [dataset]]
    metadata = {"Version":dataset_details_list[0]['version'],
                "Date downloaded":date.today().strftime('%m/%d/%Y')} 

    # Product download info https://docs.os.uk/os-apis/accessing-os-apis/os-downloads-api/technical-specification/download-an-opendata-product
    url_product = f"https://api.os.uk/downloads/v1//products/{dataset}/downloads"
    response_product = requests.get(url_product)
    dataset_product_list = response_product.json()
    print(f"Product list: {dataset_product_list}")
    # Set format required from external csv lookup (some datasets have multiple)
    format = item_url_df[item_url_df.DatasetProductName==dataset]['Format'].item()
    # Set source for creating folder structure
    source = item_url_df[item_url_df.DatasetProductName==dataset]['Source'].item()

    # Loop through each product in dataset list
    for value in dataset_product_list:
        #Only interested in specified formats/ areas
        if value['format'] == format and value['area'] in ["GB"]:
            # Set variables 
            dataset_url = value['url']
            temp_folder_name = f"{temp_download_location}"
            folder_name_original = f"{download_location}Original/{source}/{dataset}/"
            folder_name_wilts = f"{download_location}Wiltshire/{source}/{dataset}/"
            file_name = f"{dataset}_{value['area']}"
            try:
                response = requests.get(dataset_url)
                response.raise_for_status() # Check if response is successful
                
                # Deletes old folder contents/ creates folder if not already there
                if os.path.exists(folder_name_original):
                    shutil.rmtree(folder_name_original)   # deletes the folder and all contents
                os.makedirs(folder_name_original, exist_ok=True)  # recreate it empty
               
                # Create zip file in temp location
                with open(temp_folder_name+file_name+".zip", mode="wb") as file:
                    file.write(response.content)
                # Extract zip and move to permanent location
                with ZipFile(temp_folder_name+file_name+".zip", 'r') as z_object:
                    z_object.extractall(path=folder_name_original)
                # Write out metadata
                with open(folder_name_original+"metadata.json", mode="w") as file:
                    json.dump(metadata,file)
                
                ## Export wiltshire data 
                # 250kScaleColourRaster
                if dataset == "250kScaleColourRaster":
                    process_250kScaleColourRaster_wilts(source, dataset)
                # Geopackages need clipping (some are not suitable for Wilts level data)
                if value['format'] == "GeoPackage":
                    process_geopackage_wilts(folder_name_wilts, source , dataset , metadata)
                    

            except Exception as e:
                print(f"Error occured for {dataset}:", e)


           

Dataset: MiniScale
Product list: [{'md5': '2ee5e66b2579c65d7fe6bd5370de186a', 'size': 311023804, 'url': 'https://api.os.uk/downloads/v1/products/MiniScale/downloads?area=GB&format=Zip+file+%28containing+EPS%2C+Illustrator+and+TIFF-LZW%29&redirect', 'format': 'Zip file (containing EPS, Illustrator and TIFF-LZW)', 'area': 'GB', 'fileName': 'minisc_gb.zip'}]
Dataset: 250kScaleColourRaster
Product list: [{'md5': '919da89a6a46bfdee0bb59c450e4539a', 'size': 134123650, 'url': 'https://api.os.uk/downloads/v1/products/250kScaleColourRaster/downloads?area=GB&format=TIFF-LZW&redirect', 'format': 'TIFF-LZW', 'area': 'GB', 'fileName': 'ras250_gb.zip'}]
