# **Automated Data Collection from OpenPrescribing API**

This script implements an automated data collection system to retrieve diabetes prescription data for West Yorkshire from the OpenPrescribing.net API. The script systematically downloads prescription records for seven categories of diabetes-related medications (insulin, sulfonylureas, biguanides, and other antidiabetic drugs) across all eleven Sub-Integrated Care Board (subICB) areas within West Yorkshire for the period 2020-2025.

Key features include:

(1) memory-optimized processing using chunked data handling to manage large datasets;

(2) systematic API querying with rate-limiting compliance;

(3) data filtering to retain only primary care prescribing data (setting=4);

(4) automated deduplication and data quality validation;

(5) transformation from long-format to wide-format data structure for subsequent analysis.

The output is a comprehensive dataset where each row represents a GP practice's monthly prescribing activity across all diabetes medication categories.

Could take up to 20 minutes.

In [None]:
import pandas as pd
import requests
import io
from datetime import datetime
import time
import gc
import os

# Memory optimization settings
pd.set_option('mode.chained_assignment', None)

# Drug categories and BNF code mapping
drug_groups = [
    ("6.1.1.1", "Short-acting insulins", ["0601011"]),
    ("6.1.1.2", "Intermediate and long-acting insulins", ["0601012"]),
    ("6.1.2.1", "Sulfonylureas", ["0601021"]),
    ("6.1.2.2", "Biguanides", ["0601022"]),
    ("6.1.2.3", "Other antidiabetic drugs", ["0601023"]),
    ("6.1.4", "Treatment of hypoglycaemia", ["0601040"]),
    ("6.1.6", "Diabetic diagnostic and monitoring agents", ["0601060"])
]

group_order = [g[1] for g in drug_groups]

# West Yorkshire subICB codes (derived from manual analysis of annual patient distribution data)
all_west_yorkshire_codes = [
    '02N', '02R', '02T', '02W', '03A', '03E', '03J', '03R', '15F', '36J', 'X2C4Y'
]

# Output file configuration
timestamp = datetime.now().strftime("%Y%m%d_%H%M")
temp_file = f"temp_raw_data_{timestamp}.csv"
output_file = f"WestYorkshire_GPDrugData_by_Practice.csv"

print("Starting West Yorkshire data download (memory optimized version)...")
print(f"Temporary data will be saved to: {temp_file}")

# Data collection with direct file writing to avoid memory accumulation
base_url = "https://openprescribing.net/api/1.0/spending_by_org/"
total_api_calls = 0
successful_calls = 0
is_first_write = True

for ch, group_name, drugcodes in drug_groups:
    for drug_code in drugcodes:
        print(f"Processing: {group_name} ({drug_code})")

        for subicb_code in all_west_yorkshire_codes:
            total_api_calls += 1
            full_url = f"{base_url}?org_type=practice&code={drug_code}&org={subicb_code}&format=csv"

            try:
                response = requests.get(full_url, timeout=180)
                response.raise_for_status()

                if len(response.content) > 100:
                    df = pd.read_csv(io.StringIO(response.text))
                    if len(df) > 0:
                        # Filter for setting=4 data only
                        df_filtered = df[df['setting'] == 4].copy()
                        if len(df_filtered) > 0:
                            # Add identifier columns
                            df_filtered['drug_code'] = drug_code
                            df_filtered['drug_group'] = group_name

                            # Optimize data types for memory efficiency
                            df_filtered['date'] = pd.to_datetime(df_filtered['date'])
                            df_filtered['items'] = pd.to_numeric(df_filtered['items'], downcast='integer')
                            df_filtered['quantity'] = pd.to_numeric(df_filtered['quantity'], downcast='float')
                            df_filtered['actual_cost'] = pd.to_numeric(df_filtered['actual_cost'], downcast='float')

                            # Keep only required columns
                            columns_needed = ['ccg', 'row_id', 'row_name', 'date',
                                            'items', 'quantity', 'actual_cost',
                                            'drug_code', 'drug_group']
                            df_filtered = df_filtered[columns_needed]

                            # Write directly to temporary file instead of storing in memory
                            if is_first_write:
                                df_filtered.to_csv(temp_file, mode='w', index=False)
                                is_first_write = False
                            else:
                                df_filtered.to_csv(temp_file, mode='a', header=False, index=False)

                            successful_calls += 1
                            print(f"Success {subicb_code}: {len(df_filtered)} rows (written to file)")

                            del df_filtered
                        else:
                            print(f"No setting=4 data for {subicb_code}")
                    else:
                        print(f"Empty data for {subicb_code}")

                    del df
                else:
                    print(f"No data for {subicb_code}")

            except Exception as e:
                print(f"Failed {subicb_code}: {str(e)[:50]}")
                if "429" in str(e):
                    print("Rate limit encountered, waiting 10 seconds...")
                    time.sleep(10)
                    continue

            gc.collect()
            time.sleep(0.5)

print(f"Data collection completed:")
print(f"   Total API calls: {total_api_calls}")
print(f"   Successful calls: {successful_calls}")

if successful_calls == 0:
    print("No data collected!")
    exit()

# Process temporary file in chunks to avoid memory overflow
print("Starting chunked data processing...")

chunk_size = 10000
processed_chunks = []
chunk_files = []

try:
    chunk_counter = 0
    for chunk in pd.read_csv(temp_file, chunksize=chunk_size):
        chunk_counter += 1
        print(f"Processing chunk {chunk_counter}: {len(chunk)} rows")

        # Optimize data types
        chunk['date'] = pd.to_datetime(chunk['date'])
        chunk['items'] = pd.to_numeric(chunk['items'], downcast='integer')
        chunk['quantity'] = pd.to_numeric(chunk['quantity'], downcast='float')
        chunk['actual_cost'] = pd.to_numeric(chunk['actual_cost'], downcast='float')

        # Deduplicate within chunk
        dedup_columns = ['ccg', 'row_id', 'row_name', 'date', 'drug_code', 'drug_group']
        chunk_deduped = chunk.groupby(dedup_columns, as_index=False).agg({
            'items': 'sum',
            'quantity': 'sum',
            'actual_cost': 'sum'
        })

        # Save processed chunk to temporary file
        chunk_file = f"chunk_{chunk_counter}_{timestamp}.csv"
        chunk_deduped.to_csv(chunk_file, index=False)
        chunk_files.append(chunk_file)

        print(f"Chunk deduplication completed: {len(chunk)} rows → {len(chunk_deduped)} rows")

        del chunk, chunk_deduped
        gc.collect()

except Exception as e:
    print(f"Chunked processing failed: {e}")
    exit()

# Merge all processed chunks
print("Merging all processed data chunks...")
final_deduped_file = f"final_deduped_{timestamp}.csv"
is_first_merge = True

for chunk_file in chunk_files:
    chunk_data = pd.read_csv(chunk_file)

    if is_first_merge:
        chunk_data.to_csv(final_deduped_file, mode='w', index=False)
        is_first_merge = False
    else:
        chunk_data.to_csv(final_deduped_file, mode='a', header=False, index=False)

    del chunk_data
    gc.collect()
    os.remove(chunk_file)

os.remove(temp_file)
print("Temporary file cleanup completed")

# Final deduplication (handle cross-chunk duplicates)
print("Executing final deduplication...")
final_data_list = []
dedup_columns = ['ccg', 'row_id', 'row_name', 'date', 'drug_code', 'drug_group']

for chunk in pd.read_csv(final_deduped_file, chunksize=chunk_size):
    final_data_list.append(chunk)

# Merge and perform final deduplication
df_final = pd.concat(final_data_list, ignore_index=True)
del final_data_list
gc.collect()

print(f"Before final deduplication: {len(df_final)} rows")
df_final = df_final.groupby(dedup_columns, as_index=False).agg({
    'items': 'sum',
    'quantity': 'sum',
    'actual_cost': 'sum'
})
print(f"After final deduplication: {len(df_final)} rows")

os.remove(final_deduped_file)

# Convert to wide table format (batch processing)
print("Converting to wide table format...")

# Group by practice and date, process in batches
practice_dates = df_final[['ccg', 'row_id', 'row_name', 'date']].drop_duplicates()
print(f"Unique practice-date combinations: {len(practice_dates)}")

# Create final wide table structure
expected_cols = (['ccg', 'row_id', 'row_name', 'date'] +
                [f"{group}_{suffix}"
                 for group in group_order
                 for suffix in ['items', 'quantity', 'actual_cost']])

# Initialize result DataFrame
result_chunks = []
batch_size = 1000

for i in range(0, len(practice_dates), batch_size):
    batch_practices = practice_dates.iloc[i:i+batch_size]
    print(f"Processing batch {i//batch_size + 1}/{(len(practice_dates)-1)//batch_size + 1}")

    # Create empty wide table for current batch
    batch_result = batch_practices.copy()

    # Add columns for each drug group
    for group in group_order:
        for metric in ['items', 'quantity', 'actual_cost']:
            if metric == 'items':
                batch_result[f"{group}_{metric}"] = 0
            else:
                batch_result[f"{group}_{metric}"] = 0.0

    # Fill actual data
    for _, practice_row in batch_practices.iterrows():
        # Find all drug data for this practice-date
        mask = ((df_final['ccg'] == practice_row['ccg']) &
                (df_final['row_id'] == practice_row['row_id']) &
                (df_final['date'] == practice_row['date']))

        practice_drugs = df_final[mask]

        # Fill data for each drug group
        for _, drug_row in practice_drugs.iterrows():
            group = drug_row['drug_group']
            for metric in ['items', 'quantity', 'actual_cost']:
                col_name = f"{group}_{metric}"
                if col_name in batch_result.columns:
                    batch_mask = ((batch_result['ccg'] == practice_row['ccg']) &
                                 (batch_result['row_id'] == practice_row['row_id']) &
                                 (batch_result['date'] == practice_row['date']))
                    batch_result.loc[batch_mask, col_name] = drug_row[metric]

    result_chunks.append(batch_result)

    del batch_result, batch_practices
    gc.collect()

# Merge all batch results
print("Merging all batch results...")
final_wide_df = pd.concat(result_chunks, ignore_index=True)
del result_chunks
gc.collect()

# Add missing quantity columns
for group in group_order:
    for suffix in ['quantity']:
        col_name = f"{group}_{suffix}"
        if col_name not in final_wide_df.columns:
            final_wide_df[col_name] = 0.0

# Ensure correct column order
final_wide_df = final_wide_df[expected_cols]

# Final data quality check
print("Final data quality check:")
print(f"   Final rows: {len(final_wide_df):,}")
print(f"   Final columns: {len(final_wide_df.columns)}")
print(f"   Date range: {final_wide_df['date'].min()} to {final_wide_df['date'].max()}")
print(f"   Unique SubICBs: {final_wide_df['ccg'].nunique()}")
print(f"   Unique GP practices: {final_wide_df['row_id'].nunique()}")

# Save final result
print(f"Saving to file: {output_file}")
final_wide_df.to_csv(output_file, index=False)

print("Process completed!")
print(f"File saved: {output_file}")
print(f"Final data: {len(final_wide_df):,} rows × {len(final_wide_df.columns)} columns")

# Generate data statistics report
print("Data statistics report:")
for group in group_order:
    items_col = f"{group}_items"
    if items_col in final_wide_df.columns:
        total_items = final_wide_df[items_col].sum()
        practices_prescribing = (final_wide_df[items_col] > 0).sum()
        print(f"   {group}:")
        print(f"     Total prescriptions: {total_items:,}")
        print(f"     Practice-month entries with prescriptions: {practices_prescribing:,}")

# Display memory usage
try:
    import psutil
    process = psutil.Process()
    memory_mb = process.memory_info().rss / 1024 / 1024
    print(f"Current memory usage: {memory_mb:.1f} MB")
except ImportError:
    print("Install psutil to view memory usage: !pip install psutil")

# Attempt file download if in Colab
try:
    from google.colab import files
    files.download(output_file)
    print("File automatically downloaded")
except ImportError:
    print("Manual file save required")

# Final cleanup
del final_wide_df
gc.collect()
print("Memory cleanup completed")