In [70]:
import pandas as pd
import sqlite3
import os
# Create and configure logger
logging.basicConfig(filename="system_logs.log",
                    format='%(asctime)s %(message)s',
                    filemode='w')

# Creating an object
logger = logging.getLogger()

In [71]:
con = sqlite3.connect("db/irs.db")

In [72]:
def pull_data(con):
    try:
        logger.info("pulling data from Database tables")
        stage3_df = pd.read_sql_query("SELECT * from stage4_aggregate_irs_data", con)
        org_unit_df = pd.read_sql_query("SELECT * from org_units", con)
        microplan_df = pd.read_sql_query("SELECT * from microplan_data_tbl", con)
        logger.info("Data pulled successfully")
        return stage3_df, org_unit_df, microplan_df, "success"
    except Exception as e:
        logging.exception("Exception occurred: %s", str(e))
        return "fail"

In [73]:
stage3_df, org_unit_df, microplan_df,pull_status = pull_data(con)

In [74]:
stage3_df.head()

Unnamed: 0,level_0,orgUnit,index,event,eventDate,created,lastUpdated,programStage,deleted,IRS-Reason Rooms Not Sprayed: Refused,...,IRS-Total Population Protected,IRS-Pregnant Women Protected,IRS-Insecticide Units Received,IRS-Total Structures Sprayed,IRS-Team Number,IRS-Males Protected,IRS-Reason Rooms Not Sprayed: Locked,IRS-Insecticide Units Missing,IRS-Eligible Rooms Found,IRS-Insecticide
0,0,ACa6vpZuNzf,313,ojzXeIkkK4o,2024-01-02 00:00:00,2024-01-03 12:42:07.300000,2024-01-03 12:42:07.300000,TX5i6nl6WPD,0,1.0,...,186,6,25,36,3,94,0,0,133,Fludora Fusion
1,1,CPVAHHBf5qI,386,ycTiEmSev1W,2024-01-13 00:00:00,2024-01-17 16:38:43.614000,2024-01-17 16:38:43.614000,TX5i6nl6WPD,0,0.0,...,171,11,15,29,3,59,0,0,69,Fludora Fusion
2,2,EbJJI4tcfFV,38,GAzNwdXL6TR,2024-01-22 00:00:00,2024-02-13 16:07:49.903000,2024-02-13 16:07:49.903000,TX5i6nl6WPD,0,0.0,...,191,10,10,54,3,71,0,0,197,Fludora Fusion
3,3,FW1dl31uuzV,46,HAUjWoxkM97,2024-01-14 00:00:00,2024-01-17 15:22:49.293000,2024-01-17 15:22:49.293000,TX5i6nl6WPD,0,0.0,...,258,17,15,75,1,123,0,0,128,Fludora Fusion
4,4,GWePuV5XPW0,392,zV7tVtKeXhC,2024-01-12 00:00:00,2024-01-17 16:03:12.186000,2024-01-17 16:03:12.186000,TX5i6nl6WPD,0,0.0,...,172,7,15,77,3,85,0,0,207,Fludora Fusion


In [75]:
org_unit_df.head()

Unnamed: 0,index,country,province,district,facility,facility_uid
0,0,Zambia,Central,Chibombo,ce Chamakubi Health Post,pXhz0PLiYZX
1,1,Zambia,Central,Chibombo,ce Chibombo Rural Health Centre,di3U5u7F8Y3
2,2,Zambia,Central,Chibombo,ce Chikobo Rural Health Centre,B5LpoYehUfI
3,3,Zambia,Central,Chibombo,ce Chikumbi ZNS Rural Health Centre,zAuN8bxMOMB
4,4,Zambia,Central,Chibombo,ce Chilochabalenge Health Post,g42i3akwlpj


In [76]:
microplan_df.head()

Unnamed: 0,index,orgUnit,IRS-Total Eligible Structures,IRS- Campaign End Date,IRS-Total Targeted Eligible Structures,IRS- Campaign Start Date,IRS-Total Targeted Population,IRS-Total Population
0,0,f6Xl2OH5Dw2,262,2024-01-11 00:00:00,262,2024-01-04 00:00:00,1444,144
1,1,toia1cMpj1m,2855,2024-01-31 00:00:00,2400,2023-12-29 00:00:00,12500,13689


In [77]:
print(pull_status)

success


In [78]:
# CLEAN ALL DATA
def clean_df(stage3_df, org_unit_df, microplan_df):
    try:
        print("Cleaning data up")
        stage3_df.reset_index(drop=True)
        irs_data =stage3_df[['orgUnit', 'IRS-Total Population Protected','IRS-Total Structures Found','IRS-Total Structures Not Sprayed','IRS-Total Structures Sprayed']].copy()
        # AGGREGATE IRS DATA BY FACILITY
        irs_aggregate_df = irs_data.groupby('orgUnit', as_index =False).sum()

        # CLEAN MICROPLAN DATA
        microplan_df.drop(["IRS- Campaign End Date","IRS- Campaign Start Date"], axis = 1, inplace=True)
        microplan_df.reset_index(drop=True)

        merged_df = pd.merge(irs_aggregate_df, microplan_df, on="orgUnit", how="left")
        merged_df.rename(columns = {'orgUnit':'facility_uid'}, inplace = True)

        final_df = pd.merge(org_unit_df, merged_df, on="facility_uid", how="left").reset_index(drop=True)
        final_df.dropna(subset=['IRS-Total Population Protected','IRS-Total Structures Found','IRS-Total Structures Not Sprayed', 'IRS-Total Structures Sprayed'],inplace=True)
        final_df.drop(["index_x"], axis = 1, inplace=True)
        final_df.drop(["index_y"], axis = 1, inplace=True)
        print("cleaning data completed")
        return final_df, 'success'
    except Exception as e:
        logging.exception("Exception occurred: %s", str(e))
        return "fail"

In [79]:
def provincial_aggregation(final_df):
    try:
        print("Provincial picture aggregation")
        provincial = final_df.copy()
        provincial.drop(["country", "district", "facility", "facility_uid"], axis=1, inplace=True)
        provincial = provincial.groupby('province').sum()

        # Ensure numeric values for division operation
        provincial["IRS-Total Structures Sprayed"] = pd.to_numeric(provincial["IRS-Total Structures Sprayed"], errors='coerce')
        provincial["IRS-Total Structures Found"] = pd.to_numeric(provincial["IRS-Total Structures Found"], errors='coerce')

        # Calculate % population protected
        provincial["pop_protected_perc"] = round((provincial["IRS-Total Population Protected"] / provincial["IRS-Total Targeted Population"].fillna(0)) * 100)
        provincial.loc[provincial['IRS-Total Targeted Population'] == 0, 'pop_protected_perc'] = 0

        # Calculate Spray progress
        provincial["spray_progress"] = round((provincial["IRS-Total Structures Sprayed"] / provincial["IRS-Total Targeted Eligible Structures"].fillna(0)) * 100)
        provincial.loc[provincial['IRS-Total Targeted Eligible Structures'] == 0, 'spray_progress'] = 0

        # Calculate spray coverage
        provincial["spray_coverage"] = round((provincial["IRS-Total Structures Sprayed"] / provincial["IRS-Total Structures Found"]) * 100)

        print("Provincial picture aggregation, COMPLETE")
        print("Loading Provincial picture aggregation to DB")
        con = sqlite3.connect("db/irs.db")
        provincial.to_sql("provincial_progress", con, if_exists="replace")
        print("Loading Provincial picture aggregation to DB...., COMPLETE")

        return provincial
    except Exception as e:
        logging.exception("Exception occurred: %s", str(e))
        return "fail"

In [80]:
def district_aggregation(final_df):
    try:
        print("District picture aggregation")
        district = final_df.copy()
        district.drop(["country", "facility", "facility_uid"], axis=1, inplace=True)

        district = district.groupby(['province', 'district'], as_index=False).sum()

        # Ensure numeric values for division operation
        district["IRS-Total Structures Sprayed"] = pd.to_numeric(district["IRS-Total Structures Sprayed"], errors='coerce')
        district["IRS-Total Structures Found"] = pd.to_numeric(district["IRS-Total Structures Found"], errors='coerce')

        # Calculate % population protected.
        district["pop_protected_perc"] = round((district["IRS-Total Population Protected"] / district["IRS-Total Targeted Population"].fillna(0)) * 100)
        district.loc[district['IRS-Total Targeted Population'] == 0, 'pop_protected_perc'] = 0

        # Calculate Spray progress
        district["spray_progress"] = round((district["IRS-Total Structures Sprayed"] / district["IRS-Total Targeted Eligible Structures"].fillna(0)) * 100)
        district.loc[district['IRS-Total Targeted Eligible Structures'] == 0, 'spray_progress'] = 0

        # Calculate spray coverage
        district["spray_coverage"] = round((district["IRS-Total Structures Sprayed"] / district["IRS-Total Structures Found"]) * 100)

        print("District picture aggregation, COMPLETE")
        print("Loading District picture aggregation to DB")
        con = sqlite3.connect("db/irs.db")
        district.to_sql("district_progress", con, if_exists="replace")
        print("Loading District picture aggregation to DB...., COMPLETE")
        return district
    except Exception as e:
        logging.exception("Exception occurred: %s", str(e))
        return "fail"


In [81]:
def process_plus(stage3_df, org_unit_df, microplan_df , pull_status):
    if pull_status == 'success':
        try:
            final_df = clean_df(stage3_df, org_unit_df, microplan_df)
            return final_df, "success"
        except Exception as e:
            return "fail"

In [84]:
def run_aggregation():
    stage3_df, org_unit_df, microplan_df , pull_status= pull_data(con)
    print(pull_status)
    print(len(stage3_df))
    print(len(org_unit_df))
    print(len(microplan_df))
    final_df, process_status = process_plus(stage3_df, org_unit_df, microplan_df , pull_status)

    if process_status == 'success':
        provincial_aggregation(final_df)
        district_aggregation(final_df)
        return 'success'
    else:
        return 'fail'

In [85]:
run_aggregation()

success
48
3201
2
Cleaning data up
cleaning data completed
Provincial picture aggregation
District picture aggregation


'success'