In [None]:
Fetch data and import necessary libraries
import pandas as pd
import logging
import os
logging.getLogger().setLevel(logging.INFO)

donation_facilities_url = "https://raw.githubusercontent.com/MoH-Malaysia/data-darah-public/main/donations_facility.csv"
donation_state_url = "https://raw.githubusercontent.com/MoH-Malaysia/data-darah-public/main/donations_state.csv"
newdonors_facility_url = "https://raw.githubusercontent.com/MoH-Malaysia/data-darah-public/main/newdonors_facility.csv"
newdonors_state_url = "https://raw.githubusercontent.com/MoH-Malaysia/data-darah-public/main/newdonors_state.csv"

def get_data_url(url_link):
    try:
        data = pd.read_csv(url_link) # , parse_dates=['date'], index_col='date'
        logging.info(f"Getting data from {url_link}")
        return data
    except Exception as e:
        logging.error(f"Unexpected error fetching data from {url_link}: {e}")
        return None

donation_facilities = get_data_url(donation_facilities_url)
donation_state = get_data_url(donation_state_url)
newdonors_facility = get_data_url(newdonors_facility_url)
newdonors_state = get_data_url(newdonors_state_url)

blood_donation_retention = pd.read_parquet("https://dub.sh/ds-data-granular") # In this parquet, there is no date


The data then will merge between 2 DataFrames which are the existing DataFrame and the current DataFrame that are being extracted. The data then will make comparison to avoid merging the same data into the file. Return the updated data for analysis and this will be used for creating analysis of the data.
def append_data(data, file_path):
    if os.path.exists(file_path):
        existing_data = pd.read_csv(file_path)
        updated_data = pd.merge(existing_data, data, how='outer', indicator=True).query('_merge == "right_only"').drop('_merge', axis=1)
    else:
        updated_data = data
        logging.info(f"File not exist. Create new file")

    updated_data.to_csv(file_path, index=False, mode='a', header=not os.path.exists(file_path))
    logging.info(f"Data appended to {file_path}.")

    return pd.read_csv(file_path)

donation_facilities_updated = append_data(donation_facilities, "donation_facilities.csv")
donation_state_updated = append_data(donation_state, "donation_state.csv")
newdonors_facility_updated = append_data(newdonors_facility,"newdonors_facility.csv")
newdonors_state_updated = append_data(newdonors_state, "newdonors_state.csv")
blood_donation_retention_updated = append_data(blood_donation_retention, "blood_donation_retention.csv")
Check on the info of each dataframe. See the variable that can be used.
print(donation_facilities_updated.info()) #145090 is the original count on 21/01/2024 #145112 as of 23/01/2024
print(donation_state_updated.info())
print(newdonors_facility_updated.info())
print(newdonors_state_updated.info())
print(blood_donation_retention_updated.info())
EDA Processes to identify outliers, missing values, and noisy data. Lets go.
print("Blood donation retention csv file ",blood_donation_retention_updated.isnull().sum())
print("Blood donation facilities csv file ",donation_facilities_updated.isnull().sum())
print("donation state updated csv files : ",donation_state_updated.isnull().sum())
print("newdonors facilities file : ",newdonors_facility_updated.isnull().sum())
print("newdonors state updated file : ",newdonors_state_updated.isnull().sum())
blood_donation_retention_updated.head(10)
Now, lets clean the data before we visualize it. 
def change_dataframe_timeindex(dataframe):
    dataframe['date'] = pd.to_datetime(dataframe['date'],errors='coerce')
    dataframe = dataframe.set_index('date', inplace=True)


change_dataframe_timeindex(donation_facilities_updated)
change_dataframe_timeindex(donation_state_updated)
change_dataframe_timeindex(newdonors_facility_updated)
change_dataframe_timeindex(newdonors_state_updated)

Lets check every dataframe on what data does it contain
import matplotlib.pyplot as plt
oldest_date = donation_state_updated.index.min()
newest_date = donation_state_updated.index.max()
print(oldest_date)
print(newest_date)

# Use .loc to select a range of dates
donation_state_updated_resampled = donation_state_updated.resample('Y').sum()
donation_state_updated_resampled = donation_state_updated_resampled.loc[oldest_date:newest_date]

# Plotting
donation_state_updated_resampled['blood_a'].plot(label='Blood Type A', color='red')
donation_state_updated_resampled['blood_ab'].plot(label='Blood Type AB', color='green')
donation_state_updated_resampled['blood_b'].plot(label='Blood Type B', color='blue')
donation_state_updated_resampled['blood_o'].plot(label='Blood Type O', color='orange')

plt.title("Yearly Blood Donation Trends")
plt.xlabel("Year")
plt.ylabel("Total Blood Donation Count")
plt.legend()
plt.show()

These lines of code will analyse the data to see the blood donation retention trend in Malaysia.
import matplotlib.pyplot as plt

blood_donation_retention_updated['age'] = int(newest_date.year) - blood_donation_retention_updated['birth_date']
plt.figure(figsize=(16,8))
idcount = blood_donation_retention_updated['donor_id'].value_counts()
oldest = blood_donation_retention_updated['birth_date'].min()
youngest = blood_donation_retention_updated['birth_date'].max()
age_group_list = newdonors_state_updated.columns[1:11].to_list()
#print(idcount)
print(age_group_list)
print(oldest)
print(youngest)

bins = [16, 24, 29, 34, 39, 44, 49, 54, 59, 64, 150]

count_age = blood_donation_retention_updated.groupby('age_group').size().reset_index(name='count')
blood_donation_retention_updated['age_group'] = pd.cut(blood_donation_retention_updated['age'], bins=bins, labels=age_group_list, right=False)
plt.figure(figsize=(12, 6))
count_age.plot(x='age_group',y='count',kind='bar',color='skyblue')
plt.title('Age Distribution of Donors')
plt.xlabel('Age Group')
plt.ylabel('Number of Donors')
plt.show()
import matplotlib.pyplot as plt
from matplotlib.pyplot import ScalarFormatter

blood_donation_retention_updated['age'] = int(newest_date.year) - blood_donation_retention_updated['birth_date']
plt.figure(figsize=(16,8))
idcount = blood_donation_retention_updated['donor_id'].value_counts()
oldest = blood_donation_retention_updated['birth_date'].min()
youngest = blood_donation_retention_updated['birth_date'].max()
age_group_list = newdonors_state_updated.columns[1:11].to_list()
#print(idcount)
print(age_group_list)
print(oldest)
print(youngest)
bins = [16, 24, 29, 34, 39, 44, 49, 54, 59, 64, 150]

blood_donation_retention_updated['age_group'] = pd.cut(blood_donation_retention_updated['age'], bins=bins, labels=age_group_list, right=False)
blood_donation_grouped = blood_donation_retention_updated.groupby(by=['age_group']).count()


fig, ax = plt.subplots()

ax.bar(blood_donation_grouped.index,pd.to_numeric(blood_donation_grouped['age']))
ax.set_ylabel('Total Donation by Millions')
plt.show()
# blood_donation_pivoted = blood_donation_retention_updated.pivot(columns='age_group')
# blood_donation_pivoted.head()
# fig,ax = plt.subplots()

# fig = plt.figure(figsize=(16,8))
# plt.show()
blood_donation_grouped.head(30)
donor_grouped = blood_donation_retention_updated.groupby(by = ['donor_id']).count()
donor_grouped.head(10)
average_per_person = donor_grouped['visit_date'].mean()
print(average_per_person)

These lines of code is to get the list of facilities that are available in Malaysia, then we preprocess the data to get the latitude and longitude of the location for future visualization. (Might be used or for future purposes)
# Get the list of the hospital
from geopy.geocoders import Nominatim
import requests
import urllib.parse

list_hospital = donation_facilities_updated['hospital'].unique()
print(list_hospital)

# calling the Nominatim tool and create Nominatim class
locator = Nominatim(user_agent="testingNominatimhow")
latitude = []
longitude = []

for place in list_hospital:
    parse_place = urllib.parse.quote(place)
    url = f'https://nominatim.openstreetmap.org/search?q={parse_place}&format=json'

    response = requests.get(url).json()
# Check if the response has any data
    if len(response) > 0:
        lat = response[0]["lat"]
        lon = response[0]["lon"]
        latitude.append(lat)
        longitude.append(lon)

        # printing latitude and longitude
        print("Place = ", place)
        print("Latitude = ", lat)
        print("Longitude = ", lon, "\n")
    else:
        latitude.append("")
        longitude.append("")
        print(f"No data found for {place}. Skipping...\n")

location_latlon = pd.DataFrame({
    'location' : list_hospital,
    'latitude' : latitude,
    'longitude' : longitude,
})
These lines of code will create a csv file that store the longitude and latitude of the facilities in Malaysia for blood donation.
# Assuming 'Hospital Tuanku Jaafar' is in the 'location' column
hospital_name = 'Hospital Tuanku Jaafar'

# Find the index of the row where 'location' is 'Hospital Tuanku Jaafar'
index_to_update = location_latlon.index[location_latlon['location'] == hospital_name].tolist()[0]

# Update latitude and longitude for 'Hospital Tuanku Jaafar'
location_latlon.at[index_to_update, 'latitude'] = 2.7098
location_latlon.at[index_to_update, 'longitude'] = 101.9448

# Display the updated DataFrame
location_latlon.to_csv('hospital_coordinates.csv')
blooddonation_statebased = donation_state_updated.pivot_table(values=['blood_a', 'blood_ab', 'blood_b', 'blood_o'], index='date',columns=['state'], aggfunc='sum').resample('Y').sum()
plt.plot(blooddonation_statebased)
plt.show()

import matplotlib.pyplot as plt
oldest_date = donation_state_updated.index.min()
newest_date = donation_state_updated.index.max()
print(oldest_date)
print(newest_date)

# Assuming your datetime index is named 'date'
donation_state_updated_resampled = donation_state_updated.groupby(['state']).resample('Y').sum()

plt.figure(figsize=(16, 8))

# Plotting
donation_state_updated_resampled['blood_a'].unstack().plot(label='Blood Type A', color='red')
donation_state_updated_resampled['blood_ab'].unstack().plot(label='Blood Type AB', color='green')
donation_state_updated_resampled['blood_b'].unstack().plot(label='Blood Type B', color='blue')
donation_state_updated_resampled['blood_o'].unstack().plot(label='Blood Type O', color='orange')

plt.title("Yearly Blood Donation Trends")
plt.xlabel("Year")
plt.ylabel("Total Blood Donation Count")
plt.legend(title='Blood Types', bbox_to_anchor=(1.05, 1), loc='upper left')
plt.show()
Lets perform some visualization and analysis!
import matplotlib.pyplot as plt

# Preprocess the data to a date time instead of an object
# Convert 'date' to datetime
donation_facilities_updated['date'] = pd.to_datetime(donation_facilities_updated['date'])
donation_facilities['date']['2006':'2024']



# Lets find out the trend of every blood types.
plt.figure(figsize=(12, 6))

plt.plot(donation_facilities_updated['date'].values, donation_facilities_updated['blood_a'].values, label='Blood Type A', color='green', marker='o')
plt.title('Blood Type Trends Over Time')
plt.xlabel('Date')
plt.ylabel('Blood Count')
plt.legend()
plt.show()
plt.plot(donation_facilities_updated['date'].values, donation_facilities_updated['blood_ab'].values, label='Blood Type AB', color='blue', marker='s')
plt.title('Blood Type Trends Over Time')
plt.xlabel('Date')
plt.ylabel('Blood Count')
plt.legend()
plt.show()
plt.plot(donation_facilities_updated['date'].values, donation_facilities_updated['blood_b'].values, label='Blood Type B', color='red', marker='^')
plt.title('Blood Type Trends Over Time')
plt.xlabel('Date')
plt.ylabel('Blood Count')
plt.legend()
plt.show()
plt.plot(donation_facilities_updated['date'].values, donation_facilities_updated['blood_o'].values, label='Blood Type O', color='orange', marker='d')
plt.title('Blood Type Trends Over Time')
plt.xlabel('Date')
plt.ylabel('Blood Count')
plt.legend()
plt.show()
Store the data into a csv and append the data daily. This will act as staging phase before we transform the data and load into a database.
# donation_facilities.to_csv("donation_facilities.csv",mode='a',index =False, header=not os.path.exists("donation_facilities.csv"))
# donation_state.to_csv("donation_state.csv",mode='a',index =False, header=not os.path.exists("donation_state.csv"))
# newdonors_facility.to_csv("newdonors_facility.csv",mode='a',index =False, header=not os.path.exists("newdonors_facility.csv"))
# newdonors_state.to_csv("newdonors_state.csv",mode='a',index =False, header=not os.path.exists("newdonors_state.csv"))
# blood_donation_retention.to_csv("newdonors_state.csv",mode='a',index =False, header=not os.path.exists("blood_donation_retention.csv"))

From the csv file that we have append, the csv will have duplicate values. In order to do automation of analysis, we will clean the data first to not contain any duplicate values.
# This will check first the data from the csv file that we have stored. Our intention is to clean the data

clean_donation_facilities = pd.read_csv('donation_facilities.csv')
clean_donation_facilities.head()


Create table with column and data type in the malaysia blood donation database.
import duckdb

db_file_path = "malaysia_blood_donation.db"

db_connection = duckdb.connect(database=db_file_path, read_only=False)

db_connection.sql(f"CREATE TABLE Donation_Facilities AS date varchar,hospital varchar, daily varchar")
Extract the data from the csv file that will contain daily updates to transform it for pushing in a database.
import duckdb
donation_facilities_df = pd.read_csv("donation_facilities.csv")
results = duckdb.sql("SELECT * FROM donation_facilities_df") # This will read the csv file
print(results)


import duckdb
logging.getLogger().setLevel(logging.INFO)

db_file_path = "malaysia_blood_donation.db"

db_connection = duckdb.connect(database=db_file_path, read_only=False)


def store_db(table_name,db_connection,dataframe_name):
    try:
        checktable = db_connection.sql(f"SELECT 1 FROM information_schema.tables WHERE table_name = '{table_name}'")
        if not checktable:
            query_select = duckdb.sql(f"SELECT * FROM {dataframe_name}")
            result_df = query_select.to_df()
            result_df.to_sql(table_name,db_connection,index = False,if_exists='replace')
            db_connection.sql(f"CREATE TABLE {table_name} AS {query_select}")
            logging.info(f"Data saved to {table_name} table in the database.")
        else:
            logging.info(f"Table {table_name} already exists.")
    except Exception as e:
        print(f"Unexpected error when storing to a database : {e}")

store_db("Donation_Facilities",db_connection=db_connection,dataframe_name="donation_facilities")
store_db("Donation_State",db_connection=db_connection,dataframe_name="donation_state")
store_db("NewDonors_Facility",db_connection=db_connection,dataframe_name="newdonors_facility")
store_db("NewDonors_State",db_connection=db_connection,dataframe_name="newdonors_state")
store_db("BloodDonor_Retention",db_connection=db_connection,dataframe_name="blood_donation_retention")

db_connection.close()
Extract all data from the staging to transform the data.
import duckdb

db_file_path = "malaysia_blood_donation.db"

db_connection = duckdb.connect(database=db_file_path, read_only=False)


def store_db(data, table_name, db_connection):
    try:
        # You can directly use the DataFrame in the CREATE TABLE statement
        db_connection.sql(f"CREATE TABLE {table_name} AS {data.to_df()}")
        logging.info(f"Data saved to {table_name} table in the database.")
    except Exception as e:
        print(f"Unexpected error when storing to a database: {e}")

store_db(donation_facilities, "Donation_Facilities", db_connection=db_connection)
store_db(donation_state, "Donation_State", db_connection=db_connection)
store_db(newdonors_facility, "NewDonors_Facility", db_connection=db_connection)
store_db(newdonors_state, "NewDonors_State", db_connection=db_connection)
store_db(blood_donation_retention, "BloodDonor_Retention", db_connection=db_connection)

db_connection.close()

def extract_data(table_name, db_connection):
    try:
        query = f"SELECT * FROM {table_name}"
        extracted_data = pd.read_sql(query, db_connection)
        logging.info(f"Data extracted from {table_name} table in the database.")
        return extracted_data
    except Exception as e:
        print(f"Unexpected error when extracting data: {e}")
        return None

# Example usage:
extracted_data = extract_data("BloodDonor_Retention", db_connection=db_connection)

donation_facilities.head()
donation_state.head()
newdonors_facility.head()
newdonors_state.head()