# Uncovering Healthcare Inefficiencies - SQL Database and Power BI Visualizations

This notebook details the process of setting up a SQL database and importing data obtained from CMS. It includes steps for creating additional views and tables within the SQL database. The refined views and tables are then exported to Power BI to develop comprehensive visualization dashboards.

---

## Import Libaries

In [36]:
# Import required libraries
import pandas as pd
import numpy as np
import os

import pymysql
from sqlalchemy import Table, Column, Integer, Float, String, Text, MetaData, VARCHAR, DECIMAL
from sqlalchemy import create_engine, MetaData, Table
from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text

from tqdm import tqdm
import os

from tabulate import tabulate

import powerbiclient

from powerbiclient import QuickVisualize, get_dataset_config, Report
from powerbiclient.authentication import DeviceCodeLoginAuthentication

from geopy.geocoders import Bing
import folium

import warnings
warnings.simplefilter(action='ignore', category=FutureWarning) # supress warning 

## Connect to Database (`fwa_healthcare`) 

In [67]:
# connection parameters
username = "root"
password = "Misty#78182"
hostName = "127.0.0.1"
database = "fwa_healthcare"

# Connect to the database
try:
    conn = pymysql.connect(host=hostName, 
                           port=3306,
                           user=username,
                           password=password, 
                           db=database,
                           local_infile=True)  # enable local file loading
    print("Connection to SQL was successful!")
except pymysql.MySQLError as e:
    print(f"Error connecting to MySQL: {e}")
    exit()

Connection to SQL was successful!


In [68]:
# define SQLAlchemy engine
engine = create_engine(f'mysql+pymysql://{username}:{password}@{hostName}/{database}')

# define metadata
metadata = MetaData()

### Import data to existing database 

In [50]:
# define the file path
file_path = "data/cms_data_sql.csv"

# function to count the number of lines in the file
def count_lines(file_path):
    with open(file_path, 'r') as file:
        return sum(1 for _ in file)

# count total lines in the file (excluding header)
total_lines = count_lines(file_path) - 1  # subtracting header row

cursor = conn.cursor()

try:
    # check if the table already contains data
    cursor.execute("SELECT COUNT(*) FROM healthcare_market_saturation_fraud;")
    row_count = cursor.fetchone()[0]

    if row_count > 0:
        print(f"Table already contains {row_count} rows. Skipping data import.")
    else:
        # create SQL command for importing data
        sql = f"""
        LOAD DATA LOCAL INFILE '{file_path}'
        INTO TABLE `healthcare_market_saturation_fraud`
        FIELDS TERMINATED BY ','
        ENCLOSED BY '"'
        LINES TERMINATED BY '\n'
        IGNORE 1 LINES
        (reference_period, type_of_service, aggregation_level, state, county, state_fips, county_fips, num_fee_service_benef, 
        num_providers, avg_users_per_provider, pct_users_out_ffs_benef, num_users, avg_providers_per_county, num_dual_eligible_users, 
        pct_dual_eligible_users_out_total_users, percent_dual_elig_ffs, total_payment, moratorium, num_fee_service_benef_dual_color, 
        num_fee_service_benef_desc, num_providers_dual_color, num_providers_desc, avg_users_per_provider_dual_color, avg_users_per_provider_desc, 
        pct_users_out_ffs_benef_dual_color, pct_users_out_ffs_benef_desc, num_users_dual_color, num_users_desc, avg_providers_per_county_dual_color, 
        avg_providers_per_county_desc, num_dual_eligible_users_dual_color, num_dual_eligible_users_desc, pct_dual_eligible_total_users_dual_color, 
        pct_dual_eligible_total_users_desc, pct_dual_eligible_ffs_dual_color, pct_dual_eligible_ffs_desc, total_payment_dual_color, total_payment_desc, 
        num_fee_service_benef_change, num_providers_change, avg_users_per_provider_change, pct_users_out_ffs_benef_change, num_users_change, 
        avg_providers_per_county_change, num_dual_eligible_users_change, pct_dual_eligible_users_out_total_users_change, pct_dual_eligible_ffs_change, 
        total_payment_change);
        """

        # ctart a progress bar
        with tqdm(total=total_lines, desc='Importing Data', unit='line') as pbar:
            cursor.execute(sql)
            conn.commit()
            print("Data imported successfully!")
            # update the progress bar
            pbar.update(total_lines)  

except pymysql.MySQLError as e:
    print(f"Error: {e}")
    conn.rollback()

Importing Data: 100%|██████████| 1044711/1044711 [00:18<00:00, 57809.87line/s]

Data imported successfully!





## Table Manipulation

In [51]:
# define tables

# Providers table
providers = Table('providers', metadata,
    Column('provider_id', Integer, primary_key=True, autoincrement=True),
    Column('state', VARCHAR(2)),  
    Column('county', VARCHAR(255)), 
    Column('state_fips', Integer),  
    Column('county_fips', Integer),  
    Column('total_beneficiaries', DECIMAL(20, 0)),  
    Column('total_providers', DECIMAL(20, 0)),  
    Column('avg_users_per_provider', DECIMAL(20, 2)), 
    Column('avg_pct_users_out_ffs', DECIMAL(5, 2)), 
    Column('avg_total_payment', DECIMAL(20, 2)),  
    extend_existing=True
)

# Beneficiaries table
beneficiaries = Table('beneficiaries', metadata,
    Column('beneficiary_id', Integer, primary_key=True, autoincrement=True),
    Column('reference_period', VARCHAR(255)),
    Column('num_fee_service_benef', DECIMAL(20, 0)),  
    Column('num_providers', DECIMAL(20, 0)),
    Column('num_users', DECIMAL(20, 0)),
    Column('total_payment', DECIMAL(20, 2)),
    Column('moratorium', VARCHAR(255)),
    Column('state', VARCHAR(2)), 
    Column('county', VARCHAR(255)),            
    extend_existing=True
)

# Provider Utilization table
provider_utilization = Table('provider_utilization', metadata,
    Column('utilization_id', Integer, primary_key=True, autoincrement=True),
    Column('state', VARCHAR(2)),
    Column('county', VARCHAR(255)),
    Column('state_fips', Integer),
    Column('county_fips', Integer),
    Column('total_beneficiaries', DECIMAL(20, 0)),
    Column('total_providers', DECIMAL(20, 0)),
    Column('avg_users_per_provider', DECIMAL(20, 2)),
    Column('avg_pct_users_out_ffs', DECIMAL(5, 2)),
    Column('avg_total_payment', DECIMAL(20, 2)),
    extend_existing=True
)

# Changes table
changes = Table('changes', metadata,
    Column('change_id', Integer, primary_key=True, autoincrement=True),
    Column('reference_period', VARCHAR(255)),
    Column('num_fee_service_benef_change', DECIMAL(20, 2)),
    Column('num_providers_change', DECIMAL(20, 2)),
    Column('avg_users_per_provider_change', DECIMAL(20, 2)),
    Column('pct_users_out_ffs_benef_change', DECIMAL(5, 2)),
    Column('num_users_change', DECIMAL(20, 0)),
    Column('avg_providers_per_county_change', DECIMAL(20, 2)),
    Column('num_dual_eligible_users_change', DECIMAL(20, 0)),
    Column('pct_dual_eligible_users_out_total_users_change', DECIMAL(5, 2)),
    Column('pct_dual_eligible_ffs_change', DECIMAL(5, 2)),
    Column('total_payment_change', DECIMAL(20, 2)),
    extend_existing=True
)

# Dual Eligible table
dual_eligible = Table('dual_eligible', metadata,
    Column('dual_id', Integer, primary_key=True, autoincrement=True),
    Column('reference_period', VARCHAR(255)),
    Column('num_dual_eligible_users', DECIMAL(20, 0)),
    Column('pct_dual_eligible_users_out_of_total_users', DECIMAL(5, 2)),
    Column('num_users_dual_color', VARCHAR(255)),
    Column('pct_dual_eligible_ffs_dual_color', VARCHAR(255)),
    extend_existing=True
)

# Create tables in the database
metadata.create_all(engine)

In [56]:
# Define SQL commands
commands = [
    """
    INSERT INTO beneficiaries (
        reference_period, 
        num_fee_service_benef, 
        num_providers, 
        num_users, 
        total_payment, 
        moratorium, 
        state, 
        county
    )
    SELECT 
        reference_period, 
        num_fee_service_benef, 
        num_providers, 
        num_users, 
        total_payment, 
        moratorium, 
        state, 
        county
    FROM `healthcare_market_saturation_fraud`;
    """,
    """
    INSERT INTO changes (
        reference_period, 
        num_fee_service_benef_change, 
        num_providers_change, 
        avg_users_per_provider_change, 
        pct_users_out_ffs_benef_change, 
        num_users_change, 
        avg_providers_per_county_change, 
        num_dual_eligible_users_change, 
        pct_dual_eligible_users_out_total_users_change, 
        pct_dual_eligible_ffs_change, 
        total_payment_change
    )
    SELECT 
        reference_period, 
        num_fee_service_benef_change, 
        num_providers_change, 
        avg_users_per_provider_change, 
        pct_users_out_ffs_benef_change, 
        num_users_change, 
        avg_providers_per_county_change, 
        num_dual_eligible_users_change, 
        pct_dual_eligible_users_out_total_users_change, 
        pct_dual_eligible_ffs_change, 
        total_payment_change
    FROM `healthcare_market_saturation_fraud`;
    """,
    """
    INSERT INTO dual_eligible (
        reference_period, 
        num_dual_eligible_users, 
        pct_dual_eligible_users_out_of_total_users, 
        num_users_dual_color, 
        pct_dual_eligible_ffs_dual_color
    )
    SELECT 
        reference_period, 
        num_dual_eligible_users, 
        pct_dual_eligible_users_out_total_users, 
        num_users_dual_color, 
        pct_dual_eligible_ffs_dual_color
    FROM `healthcare_market_saturation_fraud`;
    """,
    """
    INSERT INTO provider_utilization (
        state, 
        county, 
        state_fips, 
        county_fips, 
        total_beneficiaries, 
        total_providers, 
        avg_users_per_provider, 
        avg_pct_users_out_ffs, 
        avg_total_payment
    )
    SELECT 
        state, 
        county, 
        state_fips, 
        county_fips, 
        num_fee_service_benef, 
        num_providers, 
        avg_users_per_provider, 
        pct_users_out_ffs_benef, 
        total_payment
    FROM `healthcare_market_saturation_fraud`;
    """,
    """
    INSERT INTO providers (
        state, 
        county, 
        state_fips, 
        county_fips, 
        total_beneficiaries, 
        total_providers, 
        avg_users_per_provider, 
        avg_pct_users_out_ffs, 
        avg_total_payment
    )
    SELECT 
        state, 
        county, 
        state_fips, 
        county_fips, 
        num_fee_service_benef, 
        num_providers, 
        avg_users_per_provider, 
        pct_users_out_ffs_benef, 
        total_payment
    FROM `healthcare_market_saturation_fraud`;
    """
]

# Execute commands
try:
    with engine.connect() as connection:
        for command in commands:
            connection.execute(text(command))
    print("Data insertion successful")
except Exception as e:
    print(f"An error occurred: {e}")

Data insertion successful


In [57]:
# Load the metadata and reflect the tables
metadata.reflect(bind=engine)

# Create a session
Session = sessionmaker(bind=engine)
session = Session()

# Function to display table data
def display_table(table):
    query = table.select()
    result = session.execute(query)
    df = pd.DataFrame(result.fetchall(), columns=result.keys())
    print(f"Table: {table.name}")
    print(df.head())  # Display first few rows
    print("\n")

# Display data from all tables
for table in [providers, beneficiaries, provider_utilization, changes, dual_eligible]:
    display_table(table)

Table: providers
   provider_id state   county  state_fips  county_fips total_beneficiaries  \
0            1    --  --ALL--           0            0            36122263   
1            2    AL  --ALL--           1            0              547486   
2            3    AK  --ALL--           2            0               91480   
3            4    AZ  --ALL--           4            0              740278   
4            5    AR  --ALL--           5            0              437616   

  total_providers avg_users_per_provider avg_pct_users_out_ffs  \
0            8814                 495.69                 12.09   
1             146                 501.47                 13.37   
2              33                 254.97                  9.20   
3             170                 401.34                  9.22   
4              86                 628.07                 12.34   

  avg_total_payment  
0     4037494106.32  
1       74641552.63  
2        6904088.39  
3       49176136.26  
4      

In [58]:
# Query to list all tables in the database
cursor.execute("SHOW TABLES;")

# Fetch and print all table names
tables = cursor.fetchall()
print("Tables in the database:")
for table in tables:
    print(table[0])

Tables in the database:
beneficiaries
changes
dual_eligible
healthcare_market_saturation_fraud
provider_utilization
providers


## View Manipulation

In [70]:
from sqlalchemy import create_engine, text

# Define the views
views = [
    {
        'name': 'total_payments_by_reference_period_county',
        'create': """
            CREATE VIEW total_payments_by_reference_period_county AS
            SELECT 
                reference_period,
                county,
                SUM(total_payment) AS total_payments
            FROM 
                beneficiaries
            GROUP BY 
                reference_period, county;
        """
    },
    {
        'name': 'provider_utilization_summary',
        'create': """
            CREATE VIEW provider_utilization_summary AS
            SELECT 
                state,
                county,
                AVG(avg_users_per_provider) AS avg_users_per_provider,
                AVG(avg_pct_users_out_ffs) AS avg_pct_users_out_ffs,
                AVG(avg_total_payment) AS avg_total_payment
            FROM 
                provider_utilization
            GROUP BY 
                state, county;
        """
    },
    {
        'name': 'beneficiary_provider_changes_over_time',
        'create': """
            CREATE VIEW beneficiary_provider_changes_over_time AS
            SELECT 
                reference_period,
                AVG(num_fee_service_benef_change) AS avg_fee_service_benef_change,
                AVG(num_providers_change) AS avg_providers_change,
                AVG(avg_users_per_provider_change) AS avg_users_per_provider_change,
                AVG(pct_users_out_ffs_benef_change) AS avg_pct_users_out_ffs_benef_change,
                AVG(num_users_change) AS avg_users_change,
                AVG(avg_providers_per_county_change) AS avg_providers_per_county_change,
                AVG(num_dual_eligible_users_change) AS avg_dual_eligible_users_change,
                AVG(pct_dual_eligible_users_out_total_users_change) AS avg_pct_dual_eligible_users_out_total_users_change,
                AVG(pct_dual_eligible_ffs_change) AS avg_pct_dual_eligible_ffs_change,
                AVG(total_payment_change) AS avg_total_payment_change
            FROM 
                changes
            GROUP BY 
                reference_period;
        """
    },
    {
        'name': 'dual_eligible_users_summary',
        'create': """
            CREATE VIEW dual_eligible_users_summary AS
            SELECT 
                reference_period,
                AVG(num_dual_eligible_users) AS avg_dual_eligible_users,
                AVG(pct_dual_eligible_users_out_of_total_users) AS avg_pct_dual_eligible_users_out_of_total_users
            FROM 
                dual_eligible
            GROUP BY 
                reference_period;
        """
    }
]

# Execute view creation commands
try:
    with engine.connect() as connection:
        for view in views:
            # Check if the view exists
            result = connection.execute(text(f"""
                SELECT COUNT(*) 
                FROM information_schema.VIEWS 
                WHERE TABLE_NAME = '{view['name']}'
                  AND TABLE_SCHEMA = DATABASE();
            """)).fetchone()
            
            # If view does not exist, create it
            if result[0] == 0:
                connection.execute(text(view['create']))
                print(f"View {view['name']} created successfully!")
            else:
                print(f"View {view['name']} already exists.")
except Exception as e:
    print(f"An error occurred: {e}")

View total_payments_by_reference_period_county already exists.
View provider_utilization_summary already exists.
View beneficiary_provider_changes_over_time already exists.
View dual_eligible_users_summary already exists.


In [None]:
# close connection 
finally:
    cursor.close()
    conn.close()