# DHIS2 Analytics API - Using Training.DHIS2

In [37]:
# Important Packages

import pandas as pd
import requests
import json
from pathlib import Path
from requests.auth import HTTPBasicAuth
from datetime import datetime, timedelta
import logging
import ast

### Enable Logging

In [38]:
# Setup logging
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

### Set Directories

In [39]:
# Current notebook's directory
base_dir = Path.cwd().parent  # Moves one level up from current working directory

# Paths
credentials_path = base_dir / '00_Local' / '01_Configs' / 'credentials.txt'
data_dir = base_dir / '00_Local' / '02_Data'
data_elements_csv = Path.cwd().parent / '00_Local' / '02_Data' / 'nd2_data_elements_ids.csv'
org_units_levels_file = data_dir / 'organisation_units_levels.csv'
districts_file = data_dir / 'districts.csv'
provinces_file = data_dir / 'provinces.csv'

### Load credentials from text file

In [40]:
# Load credentials from the file
credentials = {}
with credentials_path.open('r') as file:
    for line in file:
        if '=' in line:
            key, value = line.strip().split('=', 1)
            credentials[key.strip()] = value.strip()

# Extract values
DHIS2_URL = credentials.get('DHIS2_URL')
USERNAME = credentials.get('USERNAME')
PASSWORD = credentials.get('PASSWORD')
PAT = credentials.get('PAT')

print("DHIS2 instance URL:", DHIS2_URL)

DHIS2 instance URL: https://training.eidsr.znphi.co.zm/


### Query DHIS2 Analytics API for available data sets.

In [41]:
# My API endpoint for datasets
endpoint = f"{DHIS2_URL}/api/dataSets.json"

# My request with authentication
response = requests.get(endpoint, auth=(USERNAME, PASSWORD), params={'paging': 'false'})

# Stepwise check
if response.status_code == 200:
    datasets = response.json()
    print("Data retrieved successfully!")
else:
    print("Failed to retrieve data:", response.status_code, response.text)


Data retrieved successfully!


### View the Retrieved Data in a Readable Format

In [42]:
# # Pretty-print the JSON response to understand the contents of the dictionary
# print(json.dumps(datasets, indent=4))

### Convert the Data to a Pandas DataFrame

In [43]:
# Convert JSON to DataFrame
df = pd.DataFrame(datasets['dataSets'])

In [44]:
df.head(20)

Unnamed: 0,displayName,id
0,Cholera Vaccination,xoR1fw9UAnC
1,Community EBS,GWpIuhj6AEI
2,Community EBS Redesign,nkH6V3U9bJ8
3,COVID-19 Daily Monitoring and Contact Tracing,X2vykx4vd8C
4,COVID-19 Homecare Report,dBM7yaDxy9g
5,COVID-19 Laboratory Daily Aggregated Tool,gOr8UcpIqLA
6,COVID-19 Surveillance Daily Aggregate Report,Ctvbrr4atAn
7,COVID Cases Management Daily Aggregate Report,dTFXTfmgjgv
8,HF-IDSR Core Function Assessment Tool,SA4ANhumZ9v
9,ND2 Report,BtzVmjyJ6DZ


In [45]:
# Save Data for Future Analysis

df.to_csv(data_dir / 'datasets.csv', index=False)
print("Datasets saved to datasets.csv")

Datasets saved to datasets.csv


### View All Organisation Units

In [46]:
# API endpoint for organisation units
org_units_url = f"{DHIS2_URL}/api/organisationUnits.json"

# Parameters to retrieve all units without paging
params = {
    'paging': 'false',  # Retrieve all records without pagination
    'fields': 'id,displayName,level,parent[id,displayName]'  # Fetch necessary fields
}

# Request
response = requests.get(org_units_url, auth=(USERNAME, PASSWORD), params=params)

# Process the response
if response.status_code == 200:
    org_units = response.json()
    print("Organisation Units retrieved successfully!")
else:
    print("Error fetching organisation units:", response.status_code, response.text)


Organisation Units retrieved successfully!


In [47]:
# print(json.dumps(org_units, indent=4))

In [48]:
# Extract the organisation unit details
org_units_list = org_units.get('organisationUnits', [])

# Convert to DataFrame
df_org_units = pd.DataFrame(org_units_list)

In [49]:
df_org_units

Unnamed: 0,parent,displayName,id,level
0,"{'displayName': 'ea Chasefu District', 'id': '...",ae Chasefu Mini Hospital,qt4781BQa2L,4
1,"{'displayName': 'ls Chilanga District', 'id': ...",Balmoral Health Post,FrOn8lcbzPb,4
2,"{'displayName': 'ce Luano DIstrict', 'id': 'hP...",ce Bbusa Health Post,EPtuFlHv12i,4
3,"{'displayName': 'ce Kabwe District', 'id': 'sN...",ce Blossom Surgery Clinic,S9tqzMHVdMU,4
4,"{'displayName': 'ce Chisamba District', 'id': ...",ce Bombwe Health post,FIU3dK7BniN,4
...,...,...,...,...
3827,"{'displayName': 'ls Chilanga District', 'id': ...",ZA-Apollo Rural Health Centre,sfSGuVk954T,4
3828,"{'displayName': 'ls Chilanga District', 'id': ...",Zambia Helpers Society Hospital,oqBLYbMJXcg,4
3829,"{'displayName': 'ce Chibombo District', 'id': ...",zm Test Facility,BU7fjlCqw4o,4
3830,,zm Zambia Ministry of Health,PS5JpkoHHio,1


#### Definition of Levels

In [50]:
# API endpoint for organisation unit levels (national, provincial, district etc...)
org_unit_levels_url = f"{DHIS2_URL}/api/organisationUnitLevels.json"

# Parameters to retrieve all levels without paging
params = {
    'paging': 'false',  # Retrieve all records without pagination
    'fields': 'id,level,displayName'  # Fetch necessary fields
}

# Request
response = requests.get(org_unit_levels_url, auth=(USERNAME, PASSWORD), params=params)

# Process the response
if response.status_code == 200:
    org_unit_levels = response.json()
    print("Organisation Unit Levels retrieved successfully!")
else:
    print("Error fetching organisation unit levels:", response.status_code, response.text)


Organisation Unit Levels retrieved successfully!


In [51]:
# Extract the organisation unit levels
org_levels_list = org_unit_levels.get('organisationUnitLevels', [])

# Convert to DataFrame
df_org_levels = pd.DataFrame(org_levels_list)

df_org_levels = df_org_levels.sort_values(by='level', ascending=True)

df_org_levels

Unnamed: 0,level,displayName,id
2,1,National,qXnFgkCQcRC
4,2,Province,HDS8ZqmIuJh
0,3,District,Dz7Sm3imvLU
1,4,Health Facilities,MQLiogB9XBV
3,5,NHC-Zones,R2H4dpJ4PSF


In [52]:
df_org_levels.to_csv(data_dir / 'organisation_units_levels.csv', index=False)
print("Organisation_units saved to organisation_units_levels.csv")

Organisation_units saved to organisation_units_levels.csv


### Provinces

In [53]:
# Filter only province-level organisation units (level 2 as shown above)
df_provinces = df_org_units[df_org_units['level'] == 2]

# Province IDs as a comma-separated string
province_unit_ids = ",".join(df_provinces['id'])

print("Total provinces found:", len(df_provinces))
print("Province unit IDs:", province_unit_ids)

Total provinces found: 10
Province unit IDs: AWn3s2RqgAN,utIjliUdjp8,J7PQPWAeRUk,KozcEjeTyuD,B1u1bVtIA92,dbTLdTi7s8F,SwwuteU1Ajk,q5hODNmn021,oPLMrarKeEY,g1bv2xjtV0w


In [54]:
# Levels in the dataset
df_org_units['level'].unique()

# Ensure the correct level is being used for provinces (assumed level 2)
df_provinces = df_org_units[df_org_units['level'] == 2]

# Check if provinces were correctly filtered
df_provinces[['displayName', 'id', 'level']]

Unnamed: 0,displayName,id,level
8,ce Central Province,AWn3s2RqgAN,2
457,co Copperbelt Province,utIjliUdjp8,2
990,ea Eastern Province,J7PQPWAeRUk,2
1515,ls Lusaka Province,KozcEjeTyuD,2
1837,lu Luapula Province,B1u1bVtIA92,2
2157,mu Muchinga Province,dbTLdTi7s8F,2
2550,no Northern Province,SwwuteU1Ajk,2
2876,nw North-Western Province,q5hODNmn021,2
3442,so Southern Province,oPLMrarKeEY,2
3819,we Western Province,g1bv2xjtV0w,2


In [55]:
province_unit_ids = ";".join([
    "AWn3s2RqgAN",  # Central Province
    "utIjliUdjp8",  # Copperbelt Province
    "J7PQPWAeRUk",  # Eastern Province
    "KozcEjeTyuD",  # Lusaka Province
    "B1u1bVtIA92",  # Luapula Province
    "dbTLdTi7s8F",  # Muchinga Province
    "SwwuteU1Ajk",  # Northern Province
    "q5hODNmn021",  # North-Western Province
    "oPLMrarKeEY",  # Southern Province
    "g1bv2xjtV0w"   # Western Province
])

print("Formatted Province unit IDs:", province_unit_ids)


Formatted Province unit IDs: AWn3s2RqgAN;utIjliUdjp8;J7PQPWAeRUk;KozcEjeTyuD;B1u1bVtIA92;dbTLdTi7s8F;SwwuteU1Ajk;q5hODNmn021;oPLMrarKeEY;g1bv2xjtV0w


In [56]:
# Extract and save districts (Level 3)
df_districts = df_org_units[df_org_units['level'] == 3][['id', 'displayName', 'parent']]
df_districts.to_csv(districts_file, index=False)
print(f"Districts saved to districts.csv")

# Extract and save provinces (Level 2)
df_provinces = df_org_units[df_org_units['level'] == 2][['id', 'displayName', 'parent']]
df_provinces.to_csv(provinces_file, index=False)
print(f"Provinces saved to provinces.csv")

Districts saved to districts.csv
Provinces saved to provinces.csv


### QUERY ND2

In [57]:
# ND2 Report dataset ID
nd2_report_id = "BtzVmjyJ6DZ"

# API endpoint
nd2_report_url = f"{DHIS2_URL}/api/dataSets/{nd2_report_id}.json"

# Parameters to retrieve metadata (data elements, category combos, etc.)
params = {
    'fields': 'id,displayName,dataSetElements[dataElement[id,displayName,valueType]],periodType',
    'paging': 'false'
}

# Request
response = requests.get(nd2_report_url, auth=(USERNAME, PASSWORD), params=params)

# Check if the request was successful
if response.status_code == 200:
    nd2_report_data = response.json()
    print("ND2 Report metadata retrieved successfully!")
else:
    print("Error fetching ND2 Report data:", response.status_code, response.text)


ND2 Report metadata retrieved successfully!


In [58]:
# print(json.dumps(nd2_report_data, indent=4))

In [59]:
# Extract data elements
data_elements = nd2_report_data.get('dataSetElements', [])

# Convert to DataFrame
df_data_elements = pd.DataFrame([
    {
        'dataElement_id': element['dataElement']['id'],
        'dataElement_name': element['dataElement']['displayName'],
        'valueType': element['dataElement']['valueType']
    }
    for element in data_elements
])

In [60]:
df_data_elements.head(20)

Unnamed: 0,dataElement_id,dataElement_name,valueType
0,J3nlyqwGFtS,Diarrhoea Non-Bloody death,NUMBER
1,IDEVv6Epwi9,Typhoid fever suspected,NUMBER
2,tVLiX3s4oIR,Food Poisoning sent to lab,NUMBER
3,qhTdkAWNjBG,Measles death,NUMBER
4,SdiSDjLQdds,Maternal Death,NUMBER
5,MFUSzvPE1vn,Trypanosomiasis sent to lab,NUMBER
6,ZO0ULwawUDZ,SARS confirmed,NUMBER
7,dZkt0zcIP8M,HIV death,NUMBER
8,s6TTSztuLfH,HIV confirmed,NUMBER
9,xH18qiCnErl,"VHF (Ebola, Marburg, Lassa Fever, RVF, Crimean...",NUMBER


In [61]:
df_data_elements.to_csv(data_dir / 'data_elements.csv', index=False)
print("ND2 Data Elements saved to df_data_elements.csv")


ND2 Data Elements saved to df_data_elements.csv


## ND2 Cholera Data

### Cholera Data Elements

In [62]:
# Filter rows where 'dataElement_name' contains 'Cholera'
cholera_data_elements = df_data_elements[df_data_elements['dataElement_name'].str.contains('Cholera', case=False, na=False)]

# Display filtered elements
cholera_data_elements


Unnamed: 0,dataElement_id,dataElement_name,valueType
47,AzkTqGTlreJ,Cholera confirmed,NUMBER
84,gMy3ugmaS8z,Cholera sent to lab,NUMBER
99,norzLTZKFeO,Cholera suspected,NUMBER
102,Ay3ZqJMukSP,Cholera death,NUMBER


In [63]:
cholera_data_elements.to_csv(data_dir / 'cholera_data_elements.csv', index=False)
print("Cholera data elements saved to cholera_data_elements.csv")

Cholera data elements saved to cholera_data_elements.csv


In [64]:
cholera_element_ids = ";".join(cholera_data_elements['dataElement_id'])

print("Cholera Data Element IDs:", cholera_element_ids)

Cholera Data Element IDs: AzkTqGTlreJ;gMy3ugmaS8z;norzLTZKFeO;Ay3ZqJMukSP


### Monthly Cholera Cases

In [65]:
# DHIS2 API URL for analytics data
analytics_url = f"{DHIS2_URL}/api/analytics.json"

# Cholera element IDs from the dataset
cholera_element_ids = "Ay3ZqJMukSP;norzLTZKFeO;gMy3ugmaS8z;AzkTqGTlreJ"

# Define the period of interest (January 2024 to December 2024)
period_range = ";".join([pd.to_datetime(date).strftime('%Y%m') for date in pd.date_range(start="2024-01-01", end="2024-12-31", freq='MS')])

# Define national-level organisation unit (Ministry of Health)
national_org_unit_id = "PS5JpkoHHio"

# Construct API query parameters
params = {
    'dimension': [
        f'dx:{cholera_element_ids}',  # Cholera data element IDs
        f'pe:{period_range}',  # Monthly periods for 2024
        f'ou:{national_org_unit_id}'  # National org unit
    ],
    'displayProperty': 'NAME'
}

# Request data from DHIS2 API
response = requests.get(analytics_url, auth=(USERNAME, PASSWORD), params=params)

# Check response status
if response.status_code == 200:
    cholera_data = response.json()
    print("Cholera data retrieved successfully!")
else:
    raise Exception(f"Error retrieving Cholera data: {response.status_code}")


Cholera data retrieved successfully!


In [66]:
# Convert API response to DataFrame
if 'rows' in cholera_data:
    df_cholera = pd.DataFrame(cholera_data['rows'], columns=['dataElement', 'period', 'orgUnit', 'value'])

    # Convert period to readable format (YYYYMM to Month-Year)
    df_cholera['Date'] = pd.to_datetime(df_cholera['period'], format='%Y%m').dt.strftime('%b-%Y')

    # Map element IDs to their respective names
    cholera_mapping = {
        "Ay3ZqJMukSP": "Cholera death",
        "norzLTZKFeO": "Cholera suspected",
        "gMy3ugmaS8z": "Cholera sent to lab",
        "AzkTqGTlreJ": "Cholera confirmed"
    }

    df_cholera['dataElement'] = df_cholera['dataElement'].map(cholera_mapping)

    # Convert 'value' column to numeric and ensure integer values
    df_cholera['value'] = pd.to_numeric(df_cholera['value'], errors='coerce').fillna(0).astype(int)

    # Pivot table to restructure the data
    df_cholera_pivot = df_cholera.pivot_table(index='Date', columns='dataElement', values='value', aggfunc='sum').reset_index()

    # Ensure correct column ordering and fill missing columns with 0
    required_columns = ['Date', 'Cholera suspected', 'Cholera sent to lab', 'Cholera death', 'Cholera confirmed']

    for col in required_columns:
        if col not in df_cholera_pivot.columns:
            df_cholera_pivot[col] = 0  # Fill missing columns with 0

    df_cholera_pivot = df_cholera_pivot[required_columns]

    # Convert Date column for sorting
    df_cholera_pivot['Date'] = pd.to_datetime(df_cholera_pivot['Date'], format='%b-%Y')

    # Sort by date and convert it back to string
    df_cholera_pivot = df_cholera_pivot.sort_values(by='Date').reset_index(drop=True)
    df_cholera_pivot['Date'] = df_cholera_pivot['Date'].dt.strftime('%b-%Y')

    # Convert all numerical columns to integers to remove decimals
    for col in df_cholera_pivot.columns:
        if col != 'Date':  # Exclude Date column from conversion
            df_cholera_pivot[col] = df_cholera_pivot[col].fillna(0).astype(int)

else:
    print("No Cholera data found for the selected period.")


In [67]:
df_cholera_pivot.head(12)

dataElement,Date,Cholera suspected,Cholera sent to lab,Cholera death,Cholera confirmed
0,Jan-2024,9117,1844,132,719
1,Feb-2024,4791,1697,31,1038
2,Mar-2024,1542,398,7,114
3,Apr-2024,751,213,12,38
4,May-2024,197,142,1,55
5,Jun-2024,133,70,0,99
6,Jul-2024,27,24,0,6
7,Aug-2024,45,5,0,0
8,Sep-2024,141,1,0,0
9,Oct-2024,49,9,0,0


## Facility-Level Weekly Data

### Proof of Concept: January Only

In [68]:
# Read data elements from dataframe
dx_ids = ';'.join(df_data_elements['dataElement_id'].tolist())

# Use facility level ID
facility_level_id = "MQLiogB9XBV"

# Parse parent column to extract IDs from string representation
def parse_parent_column(df):
    """ Extract parent ID from string dictionary in the 'parent' column. """
    df['parent'] = df['parent'].apply(lambda x: ast.literal_eval(x)['id'] if pd.notnull(x) else None)
    return df

# Fetch facility-level organisation units from DHIS2
def fetch_facilities():
    """Fetch all facilities (level 4) from DHIS2 and store them in a dataframe."""
    url = f"{DHIS2_URL}/api/organisationUnits.json"
    params = {"filter": "level:eq:4", "paging": "false", "fields": "id,displayName,parent[id]"}

    try:
        response = requests.get(url, params=params, auth=HTTPBasicAuth(USERNAME, PASSWORD))
        response.raise_for_status()
        facilities_data = response.json()
        facilities_df = pd.DataFrame(facilities_data['organisationUnits'])

        # Parse the parent ID from dictionary and rename displayName to facility_name
        facilities_df['parent'] = facilities_df['parent'].apply(lambda x: x['id'] if pd.notnull(x) else None)
        facilities_df.rename(columns={'displayName': 'facility_name'}, inplace=True)

        logger.info("Facilities data fetched successfully.")
        return facilities_df

    except Exception as e:
        logger.error(f"Error fetching facility data: {e}")
        return None

# Load organisation unit data for districts and provinces
def load_organisation_units():
    """ Load and clean organisation units. """
    try:
        districts = pd.read_csv(districts_file)
        provinces = pd.read_csv(provinces_file)

        # Extract parent IDs from string dictionary format
        districts = parse_parent_column(districts)
        provinces = parse_parent_column(provinces)

        logger.info("Organisation units loaded and cleaned successfully.")
        return districts, provinces
    except FileNotFoundError as e:
        logger.error(f"Organisation units file not found: {e}")
        return None, None
    except Exception as e:
        logger.error(f"Error loading organisation units: {e}")
        return None, None

# Convert week string to date
def week_to_date(week_str):
    year, week = int(week_str[:4]), int(week_str[5:])
    first_day_of_year = datetime(year, 1, 1)
    first_week_start = first_day_of_year - timedelta(days=first_day_of_year.weekday())
    return first_week_start + timedelta(weeks=week - 1)

# Fetch and process DHIS2 analytics data
def fetch_and_transform_data():
    logger.info("Fetching facility-level data from DHIS2...")
    
    districts, provinces = load_organisation_units()
    facilities_df = fetch_facilities()
    
    if districts is None or provinces is None or facilities_df is None:
        return None

    url = f'{DHIS2_URL}/api/analytics.json'
    params = {
        "dimension": f"dx:{dx_ids},pe:2024W01;2024W02;2024W03;2024W04,ou:LEVEL-{facility_level_id}"
    }

    try:
        response = requests.get(url, params=params, auth=HTTPBasicAuth(USERNAME, PASSWORD))
        response.raise_for_status()
        logger.info("Facility-level data fetched successfully.")
        data = response.json()
    except requests.exceptions.HTTPError as err:
        logger.error(f"HTTP error: {err}")
        return None
    except requests.exceptions.RequestException as err:
        logger.error(f"Request error: {err}")
        return None

    try:
        headers = data.get('headers', [])
        rows = data.get('rows', [])
        columns = [header['name'] for header in headers]
        df = pd.DataFrame(rows, columns=columns)

        df['date'] = df['pe'].apply(week_to_date)
        df.sort_values('date', ascending=False, inplace=True)

        df['value'] = pd.to_numeric(df['value'], downcast='integer', errors='coerce')
        df = df.merge(df_data_elements, left_on='dx', right_on='dataElement_id', how='left')
        df['dx'] = df['dataElement_name']
        df.drop(['dataElement_name', 'dataElement_id'], axis=1, inplace=True)

        # Merge facility with districts and include facility name
        df = df.merge(facilities_df[['id', 'facility_name', 'parent']], left_on='ou', right_on='id', how='left')
        df.rename(columns={'id': 'facility_id', 'parent': 'district_id'}, inplace=True)

        df = df.merge(districts[['id', 'displayName', 'parent']], left_on='district_id', right_on='id', how='left')
        df.rename(columns={'displayName': 'district_name', 'parent': 'province_id'}, inplace=True)

        # Merge districts with provinces
        df = df.merge(provinces[['id', 'displayName']], left_on='province_id', right_on='id', how='left')
        df.rename(columns={'displayName': 'province_name'}, inplace=True)

        # Pivot data including the new columns
        pivoted_df = df.pivot_table(index=['pe', 'facility_id', 'district_id', 'province_id', 'facility_name', 'district_name', 'province_name', 'date'], 
                                columns='dx', 
                                values='value', 
                                aggfunc='first')
        
        pivoted_df.reset_index(inplace=True)
        pivoted_df.fillna(0, inplace=True)
        pivoted_df.rename(columns={'pe': 'period'}, inplace=True)

        # Convert columns to integer where applicable
        excluded_columns = ['dx', 'period', 'facility_id', 'district_id', 'province_id', 'facility_name', 'district_name', 'province_name', 'date']
        columns_to_convert = [col for col in pivoted_df.columns if col not in excluded_columns]
        for column in columns_to_convert:
            pivoted_df[column] = pd.to_numeric(pivoted_df[column], downcast='integer', errors='coerce')

        return pivoted_df

    except Exception as e:
        logger.error(f"Error processing data: {e}")
        return None


# Process the data and save to CSV
def process_data():
    logger.info("Processing and saving facility-level data with organisation details...")

    df = fetch_and_transform_data()
    if df is None:
        logger.error("Data processing failed. Skipping saving steps.")
        return

    output_file = data_dir / 'facility_january_2024_with_org_details.csv'
    df.to_csv(output_file, index=False)
    logger.info("Facility-level data with organisation details saved.")

# Execute processing function
process_data()

INFO:__main__:Processing and saving facility-level data with organisation details...
INFO:__main__:Fetching facility-level data from DHIS2...
INFO:__main__:Organisation units loaded and cleaned successfully.
INFO:__main__:Facilities data fetched successfully.
INFO:__main__:Facility-level data fetched successfully.
INFO:__main__:Facility-level data with organisation details saved.


In [69]:
facility_data_january = pd.read_csv(data_dir / 'facility_january_2024_with_org_details.csv')

In [70]:
facility_data_january.head(10)

Unnamed: 0,period,facility_id,district_id,province_id,facility_name,district_name,province_name,date,AEFI sent to lab,AEFI suspected,...,Trypanosomiasis confirmed,Trypanosomiasis sent to lab,Trypanosomiasis suspected,Tuberculosis confirmed,Tuberculosis death,Tuberculosis sent to lab,Tuberculosis suspected,Typhoid fever confirmed,Typhoid fever sent to Lab,Typhoid fever suspected
0,2024W1,A2SSdvZe7ur,svOkUE0zEKJ,q5hODNmn021,nw Lumwe Health Post,nw Mufumbwe District,nw North-Western Province,2024-01-01,0,0,...,0,0,0,0,0,0,0,0,0,0
1,2024W1,A2mkoh0Qoh1,PlQiD2woAPw,SwwuteU1Ajk,no Pecha Health Post,no Lunte District,no Northern Province,2024-01-01,0,0,...,0,0,0,0,0,0,0,0,0,0
2,2024W1,A4vc3yQR2An,SJjjM0qNelL,SwwuteU1Ajk,no Chikwanda Health Post,no Senga District,no Northern Province,2024-01-01,0,0,...,0,0,0,0,0,0,0,0,0,0
3,2024W1,A6Zdi9v8spx,WUGwOojuXhn,dbTLdTi7s8F,mu Mukungule Rural Health Centre,mu Mpika District,mu Muchinga Province,2024-01-01,0,0,...,0,0,0,0,0,0,24,0,0,0
4,2024W1,A6hsoKzGQh5,SQT8xjbvWwf,oPLMrarKeEY,so Muunga Rural Health Centre,so Itezhi-tezhi District,so Southern Province,2024-01-01,0,0,...,0,0,0,0,0,0,0,0,0,0
5,2024W1,A6kk1S24wfa,DM5Sjew7MCn,oPLMrarKeEY,so Prison Urban Health Centre,so Livingstone District,so Southern Province,2024-01-01,0,0,...,0,0,0,0,0,29,29,0,0,0
6,2024W1,A8TmYeix1cv,H8e2u6ULtJn,g1bv2xjtV0w,we Mata Rural Health Centre,we Senanga District,we Western Province,2024-01-01,0,0,...,0,0,0,0,0,1,1,0,0,0
7,2024W1,A9BC0fz8cxX,SfFrfySzRgY,utIjliUdjp8,co Muchindushi Health Post,co Mpongwe District,co Copperbelt Province,2024-01-01,0,0,...,0,0,0,0,0,0,0,0,0,0
8,2024W1,ABOFImxcg03,oEBf29y8JP8,AWn3s2RqgAN,ce Shimukuni Rural Health Centre,ce Chibombo District,ce Central Province,2024-01-01,0,0,...,0,0,0,0,0,0,0,0,0,0
9,2024W1,ABcUyezNjiu,A84b9TjHfN4,B1u1bVtIA92,lu Mwansakombe Rural Health Centre,lu Chifunabuli District,lu Luapula Province,2024-01-01,0,0,...,0,0,0,0,0,0,0,0,0,0


In [1]:
from datetime import datetime
current_week = int(datetime.today().strftime("%V"))
print(current_week)

6


# TEST

In [71]:
# Read data elements from dataframe
dx_ids = ';'.join(df_data_elements['dataElement_id'].tolist())

# Use facility level ID
facility_level_id = "MQLiogB9XBV"

# Parse parent column to extract IDs from string representation
def parse_parent_column(df):
    """ Extract parent ID from string dictionary in the 'parent' column. """
    df['parent'] = df['parent'].apply(lambda x: ast.literal_eval(x)['id'] if pd.notnull(x) else None)
    return df

# Fetch facility-level organisation units from DHIS2
def fetch_facilities():
    """Fetch all facilities (level 4) from DHIS2 and store them in a dataframe."""
    url = f"{DHIS2_URL}/api/organisationUnits.json"
    params = {"filter": "level:eq:4", "paging": "false", "fields": "id,displayName,parent[id]"}

    try:
        response = requests.get(url, params=params, auth=HTTPBasicAuth(USERNAME, PASSWORD))
        response.raise_for_status()
        facilities_data = response.json()
        facilities_df = pd.DataFrame(facilities_data['organisationUnits'])

        # Parse the parent ID from dictionary and rename displayName to facility_name
        facilities_df['parent'] = facilities_df['parent'].apply(lambda x: x['id'] if pd.notnull(x) else None)
        facilities_df.rename(columns={'displayName': 'facility_name'}, inplace=True)

        logger.info("Facilities data fetched successfully.")
        return facilities_df

    except Exception as e:
        logger.error(f"Error fetching facility data: {e}")
        return None

# Load organisation unit data for districts and provinces
def load_organisation_units():
    """ Load and clean organisation units. """
    try:
        districts = pd.read_csv(districts_file)
        provinces = pd.read_csv(provinces_file)

        # Extract parent IDs from string dictionary format
        districts = parse_parent_column(districts)
        provinces = parse_parent_column(provinces)

        logger.info("Organisation units loaded and cleaned successfully.")
        return districts, provinces
    except FileNotFoundError as e:
        logger.error(f"Organisation units file not found: {e}")
        return None, None
    except Exception as e:
        logger.error(f"Error loading organisation units: {e}")
        return None, None

# Convert week string to date
def week_to_date(week_str):
    year, week = int(week_str[:4]), int(week_str[5:])
    first_day_of_year = datetime(year, 1, 1)
    first_week_start = first_day_of_year - timedelta(days=first_day_of_year.weekday())
    return first_week_start + timedelta(weeks=week - 1)

# Fetch and process DHIS2 analytics data
def fetch_and_transform_data():
    logger.info("Fetching facility-level data from DHIS2...")
    
    districts, provinces = load_organisation_units()
    facilities_df = fetch_facilities()
    
    if districts is None or provinces is None or facilities_df is None:
        return None

    url = f'{DHIS2_URL}/api/analytics.json'
    params = {
        "dimension": f"dx:{dx_ids},pe:LAST_12_MONTHS,ou:LEVEL-{facility_level_id}"
    }

    try:
        response = requests.get(url, params=params, auth=HTTPBasicAuth(USERNAME, PASSWORD))
        response.raise_for_status()
        logger.info("Facility-level data fetched successfully.")
        data = response.json()
    except requests.exceptions.HTTPError as err:
        logger.error(f"HTTP error: {err}")
        return None
    except requests.exceptions.RequestException as err:
        logger.error(f"Request error: {err}")
        return None

    try:
        headers = data.get('headers', [])
        rows = data.get('rows', [])
        columns = [header['name'] for header in headers]
        df = pd.DataFrame(rows, columns=columns)

        df['date'] = df['pe'].apply(week_to_date)
        df.sort_values('date', ascending=False, inplace=True)

        df['value'] = pd.to_numeric(df['value'], downcast='integer', errors='coerce')
        df = df.merge(df_data_elements, left_on='dx', right_on='dataElement_id', how='left')
        df['dx'] = df['dataElement_name']
        df.drop(['dataElement_name', 'dataElement_id'], axis=1, inplace=True)

        # Merge facility with districts and include facility name
        df = df.merge(facilities_df[['id', 'facility_name', 'parent']], left_on='ou', right_on='id', how='left')
        df.rename(columns={'id': 'facility_id', 'parent': 'district_id'}, inplace=True)

        df = df.merge(districts[['id', 'displayName', 'parent']], left_on='district_id', right_on='id', how='left')
        df.rename(columns={'displayName': 'district_name', 'parent': 'province_id'}, inplace=True)

        # Merge districts with provinces
        df = df.merge(provinces[['id', 'displayName']], left_on='province_id', right_on='id', how='left')
        df.rename(columns={'displayName': 'province_name'}, inplace=True)

        # Pivot data including the new columns
        pivoted_df = df.pivot_table(index=['pe', 'facility_id', 'district_id', 'province_id', 'facility_name', 'district_name', 'province_name', 'date'], 
                                columns='dx', 
                                values='value', 
                                aggfunc='first')
        
        pivoted_df.reset_index(inplace=True)
        pivoted_df.fillna(0, inplace=True)
        pivoted_df.rename(columns={'pe': 'period'}, inplace=True)

        # Convert columns to integer where applicable
        excluded_columns = ['dx', 'period', 'facility_id', 'district_id', 'province_id', 'facility_name', 'district_name', 'province_name', 'date']
        columns_to_convert = [col for col in pivoted_df.columns if col not in excluded_columns]
        for column in columns_to_convert:
            pivoted_df[column] = pd.to_numeric(pivoted_df[column], downcast='integer', errors='coerce')

        return pivoted_df

    except Exception as e:
        logger.error(f"Error processing data: {e}")
        return None


# Process the data and save to CSV
def process_data():
    logger.info("Processing and saving facility-level data with organisation details...")

    df = fetch_and_transform_data()
    if df is None:
        logger.error("Data processing failed. Skipping saving steps.")
        return

    output_file = data_dir / 'facility_12_months.csv'
    df.to_csv(output_file, index=False)
    logger.info("Facility-level data with organisation details saved.")

# Execute processing function
process_data()

INFO:__main__:Processing and saving facility-level data with organisation details...
INFO:__main__:Fetching facility-level data from DHIS2...
INFO:__main__:Organisation units loaded and cleaned successfully.
INFO:__main__:Facilities data fetched successfully.
INFO:__main__:Facility-level data fetched successfully.
INFO:__main__:Facility-level data with organisation details saved.


In [72]:
facility_12_months = pd.read_csv(data_dir / "facility_12_months.csv")

In [73]:
facility_12_months.tail()

Unnamed: 0,period,facility_id,district_id,province_id,facility_name,district_name,province_name,date,AEFI confirmed,AEFI death,...,Typhoid fever confirmed,Typhoid fever death,Typhoid fever sent to Lab,Typhoid fever suspected,"VHF (Ebola, Marburg, Lassa Fever, RVF, Crimean-Congo) death","VHF (Ebola, Marburg, Lassa Fever, RVF, Crimean-Congo) sent to lab","VHF (Ebola, Marburg, Lassa Fever, RVF, Crimean-Congo) suspected",Yellow fever death,Yellow fever sent to lab,Yellow fever suspected
35930,202412,zv11JO8uQyB,xu3OhvvaqO2,g1bv2xjtV0w,we Lewanika Referral Hospital,we Mongu District,we Western Province,2024-01-08,0,0,...,0,0,0,0,0,0,0,0,0,0
35931,202412,zvNTAGjh6dg,H8e2u6ULtJn,g1bv2xjtV0w,we Mutwa Health Post,we Senanga District,we Western Province,2024-01-08,0,0,...,0,0,0,0,0,0,0,0,0,0
35932,202412,zvdAaDU7yC6,EdEpJ41rhF6,SwwuteU1Ajk,no Masamba Rural Health Centre,no Mbala District,no Northern Province,2024-01-08,0,0,...,0,0,0,0,0,0,0,0,0,0
35933,202412,zvjggVwOhQP,ydyJb1RAy4U,J7PQPWAeRUk,ea Chimphamba Health Post,ea Chama District,ea Eastern Province,2024-01-08,0,0,...,0,0,0,0,0,0,0,0,0,0
35934,202412,zzJ3hvbee0l,RXl1ctHnd1A,B1u1bVtIA92,lu Shoti Health Post,lu Mansa District,lu Luapula Province,2024-01-08,0,0,...,0,0,0,0,0,0,0,0,0,0


### Fetch Weekly Data From 2020 to Current Week

### Training Instance

In [None]:
import requests
import json
import math
import pandas as pd
from datetime import datetime

# DHIS2 instance and API endpoints
DHIS2_BASE_URL = "https:"
API_ENDPOINT_ANALYTICS = "/api/40/analytics.json"
API_ENDPOINT_DATA_ELEMENTS = "/api/40/dataElements.json"

# Token authentication
TOKEN = ""
HEADERS = {"Authorization": f"ApiToken {TOKEN}"}

# Step 1: Generate Weekly Period List (from 2020W1 to current week)
def generate_weekly_periods(start_year=2020, batch_size=8):
    """Generate weekly periods grouped into chunks (starting with 8 weeks per request)."""
    today = datetime.today()
    current_year, current_week = int(today.strftime("%Y")), int(today.strftime("%V"))

    weekly_periods = []
    period_batches = []
    
    for year in range(start_year, current_year + 1):
        max_weeks = 53 if datetime(year, 12, 28).isocalendar()[1] == 53 else 52
        for week in range(1, max_weeks + 1):
            if year == current_year and week > current_week:
                break
            weekly_periods.append(f"{year}W{week}")
    
    # Group weeks into period chunks (default: 8 weeks per batch)
    for i in range(0, len(weekly_periods), batch_size):
        period_batches.append(weekly_periods[i:i + batch_size])
    
    return period_batches

weekly_period_batches = generate_weekly_periods()
print(f"Generated {len(weekly_period_batches)} period batches.")  # Debugging

# Step 2: Fetch all data elements dynamically and filter only numeric elements
def fetch_aggregatable_data_elements():
    response = requests.get(
        f"{DHIS2_BASE_URL}{API_ENDPOINT_DATA_ELEMENTS}?fields=id,valueType,aggregationType&paging=false",
        headers=HEADERS
    )
    
    if response.status_code == 200:
        data = response.json()
        
        # Filter only numeric data elements that allow aggregation
        valid_data_elements = [
            item["id"]
            for item in data.get("dataElements", [])
            if item.get("valueType") in ["INTEGER", "NUMBER", "PERCENTAGE", "UNIT_INTERVAL"]
            and item.get("aggregationType") in ["SUM", "AVERAGE", "COUNT"]
        ]
        
        return valid_data_elements
    else:
        print(f"Error fetching data elements: {response.status_code} - {response.text}")
        return []

# Get all valid data element UIDs
data_element_uids = fetch_aggregatable_data_elements()

if not data_element_uids:
    print("No valid data elements found. Exiting...")
    exit()

# Step 3: Split API Requests to Avoid DHIS2 Limits
BATCH_SIZE = 200  # Increase data elements per request (from 50 to 200)
num_batches = math.ceil(len(data_element_uids) / BATCH_SIZE)

# Output CSV file
csv_filename = "dhis2_20_25_facility_data.csv"
all_rows = []

# Step 4: Process Data in Batches with Dynamic Period Splitting
for batch in range(num_batches):
    batch_uids = data_element_uids[batch * BATCH_SIZE : (batch + 1) * BATCH_SIZE]
    dx_param = f"dx:{';'.join(batch_uids)}"

    for period_chunk in weekly_period_batches:
        period_param = f"pe:{';'.join(period_chunk)}"
        max_period_size = len(period_chunk)

        while max_period_size > 0:  # Adaptive period splitting loop
            print(f"Processing Batch {batch + 1}/{num_batches}, Period Batch: {period_chunk[:10]}...")

            params = {
                "dimension": [
                    dx_param,
                    "ou:LEVEL-4",  # Facility-level
                    period_param  # Weekly periods chunk
                ],
                "displayProperty": "NAME",
                "outputIdScheme": "UID",
                "includeMetadata": "true",
                "includeNames": "true",
                "limit": 10000,  # Limit results per request
                "paging": "true",
                "page": 1
            }

            response = requests.get(
                f"{DHIS2_BASE_URL}{API_ENDPOINT_ANALYTICS}",
                params=params,
                headers=HEADERS,
                stream=True
            )

            if response.status_code == 200:
                data = response.json()

                # Extract metadata (UID <-> Name mapping)
                metadata = data.get("metaData", {})
                data_elements = metadata.get("items", {})

                # Extract actual data values
                rows = data.get("rows", [])

                if not rows:
                    break  # No more pages, exit loop

                for row in rows:
                    data_element_id = row[0]  # Data Element UID
                    org_id = row[1]  # Org Unit UID
                    period = row[2]  # Period (week)
                    value = row[3]  # Reported value

                    # Get corresponding names from metadata
                    data_element_name = data_elements.get(data_element_id, {}).get("name", data_element_id)

                    # Append row to list for processing
                    all_rows.append([period, org_id, data_element_name, value])

                break  # Successfully retrieved data, move to next period batch

            elif response.status_code == 409 and "Query result set exceeded max limit" in response.text:
                max_period_size //= 2  # Reduce period batch size
                period_chunk = period_chunk[:max_period_size]  # Take smaller chunk
                period_param = f"pe:{';'.join(period_chunk)}"
                print(f"⚠️ Query limit exceeded. Retrying with smaller batch: {max_period_size} weeks.")

            else:
                print(f"Error in batch {batch + 1}, period batch {period_chunk[:10]}: {response.status_code} - {response.text}")
                break

print(f"Data successfully retrieved. Now processing transformation...")

# Convert list to DataFrame
df = pd.DataFrame(all_rows, columns=["period", "org_id", "data_element", "value"])

# Remove duplicates
df.drop_duplicates(inplace=True)

# Convert `period` (e.g., "2025W05") into proper `date` format (start of the week)
def convert_iso_week_to_date(iso_week):
    year, week = iso_week[:4], iso_week[5:]
    return datetime.strptime(year + week + '1', "%G%V%w")  # Monday as start of the week

df["date"] = df["period"].apply(lambda x: convert_iso_week_to_date(x).strftime("%Y-%m-%dT00:00:00"))

# Pivot the data: Convert `data_element` values into columns
df_pivoted = df.pivot_table(
    index=["period", "org_id", "date"],  # Keep these as row identifiers
    columns="data_element",  # Each data element becomes a column
    values="value",  # Fill with reported values
    aggfunc="sum"  # If duplicates exist, aggregate them
).reset_index()

# Sort Data by `period` (newest to oldest)
df_pivoted.sort_values(by="period", ascending=False, inplace=True)

# Save Pivoted Data to New CSV
pivoted_csv_filename = "dhis2_20_25_facility_pivoted.csv"
df_pivoted.to_csv(pivoted_csv_filename, index=False)

print(f"Data successfully pivoted, sorted, and saved to {pivoted_csv_filename}")


#### Production Instance

In [None]:
# Path
credentials_2_path = base_dir / '00_Local' / '01_Configs' / 'credentials2.txt'

In [None]:
#Prod Credentials
credentials_2 = {}
with credentials_2_path.open('r') as file:
    for line in file:
        if '=' in line:
            key, value = line.strip().split('=', 1)
            credentials_2[key.strip()] = value.strip()

# Extract values
DHIS2_URL = credentials_2.get('DHIS2_URL')
USERNAME = credentials_2.get('USERNAME')
PASSWORD = credentials_2.get('PASSWORD')
PAT = credentials_2.get('PAT')

print("DHIS2 instance URL:", DHIS2_URL)

In [None]:
# DHIS2 instance and API endpoints
API_ENDPOINT_ANALYTICS = "/api/40/analytics.json"
API_ENDPOINT_DATA_ELEMENTS = "/api/40/dataElements.json"

# Token authentication
# TOKEN = ""
HEADERS = {"Authorization": f"ApiToken {PAT}"}

# Step 1: Generate Weekly Period List (from 2020W1 to current week)
def generate_weekly_periods(start_year=2020, batch_size=8):
    """Generate weekly periods grouped into chunks (starting with 8 weeks per request)."""
    today = datetime.today()
    current_year, current_week = int(today.strftime("%Y")), int(today.strftime("%V"))

    weekly_periods = []
    period_batches = []
    
    for year in range(start_year, current_year + 1):
        max_weeks = 53 if datetime(year, 12, 28).isocalendar()[1] == 53 else 52
        for week in range(1, max_weeks + 1):
            if year == current_year and week > current_week:
                break
            weekly_periods.append(f"{year}W{week}")
    
    # Group weeks into period chunks (default: 8 weeks per batch)
    for i in range(0, len(weekly_periods), batch_size):
        period_batches.append(weekly_periods[i:i + batch_size])
    
    return period_batches

weekly_period_batches = generate_weekly_periods()
print(f"Generated {len(weekly_period_batches)} period batches.")  # Debugging

# Step 2: Fetch all data elements dynamically and filter only numeric elements
def fetch_aggregatable_data_elements():
    response = requests.get(
        f"{DHIS2_URL}{API_ENDPOINT_DATA_ELEMENTS}?fields=id,valueType,aggregationType&paging=false",
        headers=HEADERS
    )
    
    if response.status_code == 200:
        data = response.json()
        
        # Filter only numeric data elements that allow aggregation
        valid_data_elements = [
            item["id"]
            for item in data.get("dataElements", [])
            if item.get("valueType") in ["INTEGER", "NUMBER", "PERCENTAGE", "UNIT_INTERVAL"]
            and item.get("aggregationType") in ["SUM", "AVERAGE", "COUNT"]
        ]
        
        return valid_data_elements
    else:
        print(f"Error fetching data elements: {response.status_code} - {response.text}")
        return []

# Get all valid data element UIDs
data_element_uids = fetch_aggregatable_data_elements()

if not data_element_uids:
    print("No valid data elements found. Exiting...")
    exit()

# Step 3: Split API Requests to Avoid DHIS2 Limits
BATCH_SIZE = 200  # Increase data elements per request (from 50 to 200)
num_batches = math.ceil(len(data_element_uids) / BATCH_SIZE)

# Output CSV file
csv_filename = "ND2_Data_2020W1_to_2025W5.csv"
all_rows = []

# Step 4: Process Data in Batches with Dynamic Period Splitting
for batch in range(num_batches):
    batch_uids = data_element_uids[batch * BATCH_SIZE : (batch + 1) * BATCH_SIZE]
    dx_param = f"dx:{';'.join(batch_uids)}"

    for period_chunk in weekly_period_batches:
        period_param = f"pe:{';'.join(period_chunk)}"
        max_period_size = len(period_chunk)

        while max_period_size > 0:  # Adaptive period splitting loop
            print(f"Processing Batch {batch + 1}/{num_batches}, Period Batch: {period_chunk[:10]}...")

            params = {
                "dimension": [
                    dx_param,
                    "ou:LEVEL-4",  # Facility-level
                    period_param  # Weekly periods chunk
                ],
                "displayProperty": "NAME",
                "outputIdScheme": "UID",
                "includeMetadata": "true",
                "includeNames": "true",
                "limit": 10000,  # Limit results per request
                "paging": "true",
                "page": 1
            }

            response = requests.get(
                f"{DHIS2_URL}{API_ENDPOINT_ANALYTICS}",
                params=params,
                headers=HEADERS,
                stream=True
            )

            if response.status_code == 200:
                data = response.json()

                # Extract metadata (UID <-> Name mapping)
                metadata = data.get("metaData", {})
                data_elements = metadata.get("items", {})

                # Extract actual data values
                rows = data.get("rows", [])

                if not rows:
                    break  # No more pages, exit loop

                for row in rows:
                    data_element_id = row[0]  # Data Element UID
                    org_id = row[1]  # Org Unit UID
                    period = row[2]  # Period (week)
                    value = row[3]  # Reported value

                    # Get corresponding names from metadata
                    data_element_name = data_elements.get(data_element_id, {}).get("name", data_element_id)

                    # Append row to list for processing
                    all_rows.append([period, org_id, data_element_name, value])

                break  # Successfully retrieved data, move to next period batch

            elif response.status_code == 409 and "Query result set exceeded max limit" in response.text:
                max_period_size //= 2  # Reduce period batch size
                period_chunk = period_chunk[:max_period_size]  # Take smaller chunk
                period_param = f"pe:{';'.join(period_chunk)}"
                print(f"Query limit exceeded. Retrying with smaller batch: {max_period_size} weeks.")

            else:
                print(f"Error in batch {batch + 1}, period batch {period_chunk[:10]}: {response.status_code} - {response.text}")
                break

print(f"Data successfully retrieved. Now processing transformation...")

# Convert list to DataFrame
df = pd.DataFrame(all_rows, columns=["period", "org_id", "data_element", "value"])

# Remove duplicates
df.drop_duplicates(inplace=True)

# Convert `period` (e.g., "2025W05") into proper `date` format (start of the week)
def convert_iso_week_to_date(iso_week):
    year, week = iso_week[:4], iso_week[5:]
    return datetime.strptime(year + week + '1', "%G%V%w")  # Monday as start of the week

df["date"] = df["period"].apply(lambda x: convert_iso_week_to_date(x).strftime("%Y-%m-%dT00:00:00"))

# Pivot the data: Convert `data_element` values into columns
df_pivoted = df.pivot_table(
    index=["period", "org_id", "date"],  # Keep these as row identifiers
    columns="data_element",  # Each data element becomes a column
    values="value",  # Fill with reported values
    aggfunc="sum"  # If duplicates exist, aggregate them
).reset_index()

# Sort Data by `period` (newest to oldest)
df_pivoted.sort_values(by="period", ascending=False, inplace=True)

# Save Pivoted Data to New CSV
pivoted_csv_filename = "ND2_Data_2020W1_to_2025W5_Pivoted.csv"
df_pivoted.to_csv(pivoted_csv_filename, index=False)

print(f"Data successfully pivoted, sorted, and saved to {pivoted_csv_filename}")