**Setup**

In [None]:
import pandas as pd
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
import math

from google.colab import drivew
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!pip install s3fs
import s3fs

Collecting s3fs
  Downloading s3fs-2021.11.1-py3-none-any.whl (25 kB)
Collecting aiobotocore~=2.0.1
  Downloading aiobotocore-2.0.1.tar.gz (54 kB)
[K     |████████████████████████████████| 54 kB 2.2 MB/s 
[?25hCollecting aiohttp<=4
  Downloading aiohttp-3.8.1-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_12_x86_64.manylinux2010_x86_64.whl (1.1 MB)
[K     |████████████████████████████████| 1.1 MB 10.4 MB/s 
[?25hCollecting fsspec==2021.11.1
  Downloading fsspec-2021.11.1-py3-none-any.whl (132 kB)
[K     |████████████████████████████████| 132 kB 46.2 MB/s 
[?25hCollecting botocore<1.22.9,>=1.22.8
  Downloading botocore-1.22.8-py3-none-any.whl (8.1 MB)
[K     |████████████████████████████████| 8.1 MB 24.9 MB/s 
Collecting aioitertools>=0.5.1
  Downloading aioitertools-0.8.0-py3-none-any.whl (21 kB)
Collecting async-timeout<5.0,>=4.0.0a3
  Downloading async_timeout-4.0.2-py3-none-any.whl (5.8 kB)
Collecting yarl<2.0,>=1.0
  Downloading yarl-1.7.2-cp37-cp37m-manylinux

In [None]:
!pip install pandasql
import pandasql as ps

Collecting pandasql
  Downloading pandasql-0.7.3.tar.gz (26 kB)
Building wheels for collected packages: pandasql
  Building wheel for pandasql (setup.py) ... [?25l[?25hdone
  Created wheel for pandasql: filename=pandasql-0.7.3-py3-none-any.whl size=26781 sha256=0bd85e93dba9ff1102658a90257193d5eca9a9d91f7436ef0a939a3334ca4482
  Stored in directory: /root/.cache/pip/wheels/5c/4b/ec/41f4e116c8053c3654e2c2a47c62b4fca34cc67ef7b55deb7f
Successfully built pandasql
Installing collected packages: pandasql
Successfully installed pandasql-0.7.3


In [None]:
%%capture
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [None]:
%%capture
!apt install libkrb5-dev
!wget https://downloads.apache.org/spark/spark-3.1.1/spark-3.1.1-bin-hadoop3.2.tgz
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install findspark
!pip install sparkmagic
!pip install pyspark

! pip install pyspark --user
! pip install seaborn --user
! pip install plotly --user
! pip install imageio --user
! pip install folium --user

In [None]:
%%capture
!apt update
!apt install gcc python-dev libkrb5-dev

In [None]:
%%capture
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os

In [None]:
%load_ext sparkmagic.magics



In [None]:
%%capture
import os
os.environ['SPARK_HOME'] = "/content/spark-3.1.1-bin-hadoop3.2"
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
import pyspark
from pyspark.sql import SQLContext

In [None]:
try:
    if(spark == None):
        spark = SparkSession.builder.appName('Initial').getOrCreate()
        sqlContext=SQLContext(spark)
except NameError:
    spark = SparkSession.builder.appName('Initial').getOrCreate()
    sqlContext=SQLContext(spark)

FileNotFoundError: ignored

In [None]:
!pip install boto3
import boto3

**EMR Cluster Setup**

In [None]:
#%spark delete -s my_session

In [None]:
%spark add -s my_session -l python -u http://ec2-3-226-235-190.compute-1.amazonaws.com:8998

**Import Data from S3**



In [None]:
%%spark
main_sdf = spark.read.option('header','true').csv('s3://dspracticum/sdf/main_sdf/part-00000-5bbc07c6-a4b3-422e-82a3-8b3c225e0e26-c000.csv')

In [None]:
%%spark
main_sdf.createOrReplaceTempView('main_sdf')
main_sdf.show()

Condensing the spark dataframe above to include only those records where TOTALPMT is not null

In [None]:
%%spark

query = '''
SELECT SEQNO, ORIGYEAR, WORKSTAT, LICNSTAT, LICNFELD, PRACTAGE, ALEGATN1, ALEGATN2, OUTCOME, TOTALPMT_FLOAT, TOTALPMT_ADJ, TOTALPMT_GROUP, TOTALPMT_ADJ_GROUP, PRACTEXP_ADJ, PRACTEXP_GROUP
FROM main_sdf
WHERE TOTALPMT IS NOT NULL
'''

main_sdf_pmt_only = spark.sql(query)
main_sdf_pmt_only.createOrReplaceTempView('main_sdf_pmt_only')
main_sdf_pmt_only.show()

In [None]:
#%%spark
#sdf_1.coalesce(1).write.format('csv').option('header','true').save('s3://dspracticum/payment_ranges_sdf/sdf_1')

In [None]:
df = pd.read_csv('s3://dspracticum/payment_ranges_sdf/sdf_1/part-00000-dd53556e-169e-4983-93c4-e6a29c95cb7a-c000.csv')

In [None]:
# Need to rescale payment values as range is too high to visualize
df['total_payment_log'] = df['TOTALPMT_FLOAT'].apply(lambda x: math.log(x+1))
df['total_payment_adjusted_log'] = df['TOTALPMT_ADJ'].apply(lambda x: math.log(x+1))

In [None]:
df

In [None]:
#import alegatn1 labels

alegatn1_label_df = pd.read_csv('s3://dspracticum/labels/ALEGATN1.csv')

In [None]:
alegatn1_label_df

In [None]:
df = df.merge(alegatn1_label_df, how = 'left', left_on = 'ALEGATN1', right_on = 'Value')
df = df.rename(columns = {'Label': 'Allegation'})
df

In [None]:
# import licnfeld labels

licnfeld_label_df = pd.read_csv('s3://dspracticum/labels/LICNFELD.csv')

In [None]:
licnfeld_label_df

In [None]:
df = df.merge(licnfeld_label_df, how = 'left', left_on = 'LICNFELD', right_on = 'Value')
df = df.rename(columns = {'Label': 'License Field'})
df



In [None]:
# import outcome labels

outcome_label_df = pd.read_csv('s3://dspracticum/labels/OUTCOME.csv')

In [None]:
outcome_label_df

In [None]:
df = df.merge(outcome_label_df, how = 'left', left_on = 'OUTCOME', right_on = 'Value')
df = df.rename(columns = {'Label': 'Outcome'})
df

I. Check the distribution of TOTALPMT in the 223,926 reports which results in totalpmt greater than zero

In [None]:
sns.displot(data = df['total_payment_log'])

**FEATURES LIST**



1. LICNSTAT
2. WORKSTAT
3. AGE
4. PRACTEXP
5. ALEGATN1
6. LICNFELD
7. OUTCOME



Label Function

In [None]:
def label(x):
  label = str(x)
  if label == 'nan':
    label = 'na'
  else:
    if len(label) == 3:
      label = label[:1]
    else:
      label = label[:2]

  return label

**I. LICNSTAT and WORKSTAT**

In [None]:
%%spark

query = '''
SELECT LICNSTAT, COUNT(*) AS count, AVG(TOTALPMT_FLOAT) AS totalpmt_mean, AVG(TOTALPMT_ADJ) AS totalpmt_adj_mean
FROM main_sdf
WHERE TOTALPMT IS NOT NULL
GROUP BY LICNSTAT
ORDER BY totalpmt_adj_mean DESC
'''

sdf_2 = spark.sql(query)
sdf_2.show()

In [None]:
%%spark

query = '''
SELECT WORKSTAT, COUNT(*) AS count, AVG(TOTALPMT_FLOAT) AS totalpmt_mean, AVG(TOTALPMT_ADJ) AS totalpmt_adj_mean
FROM main_sdf
WHERE TOTALPMT IS NOT NULL
GROUP BY WORKSTAT
ORDER BY totalpmt_adj_mean DESC
'''

sdf_2 = spark.sql(query)
sdf_2.show()

In [None]:
%%spark

query = '''
WITH a AS (SELECT LICNSTAT, COUNT(*) AS count
FROM main_sdf_pmt_only
GROUP BY LICNSTAT
ORDER BY count DESC
LIMIT 20)

SELECT *
FROM main_sdf_pmt_only
WHERE LICNSTAT IN (SELECT LICNSTAT FROM a)
'''

sdf_3 = spark.sql(query)
sdf_3.createOrReplaceTempView('sdf_3')
sdf_3.show()

In [None]:
#check
"""

%%spark

query = '''
SELECT DISTINCT LICNSTAT
FROM sdf_3'''

check = spark.sql(query)
check.count()

"""

In [None]:
%%spark
sdf_3.coalesce(1).write.format('csv').option('header','true').save('s3://dspracticum/payment_ranges_sdf/top20_licnstat')

In [None]:
licnstat_df = pd.read_csv('s3://dspracticum/payment_ranges_sdf/top20_licnstat/part-00000-bfff0a98-f28d-4947-90a8-616149611385-c000.csv')

In [None]:
a = licnstat_df.groupby(by = ['LICNSTAT']).median()
a = a.sort_values(by = ['TOTALPMT_FLOAT'], ascending = False)
a = a[['TOTALPMT_FLOAT']]
a['ORDER'] = a.index.copy()

a

In [None]:
vis_order = a['ORDER']

In [None]:
# Need to rescale payment values as range is too high to visualize
licnstat_df['total_payment_log'] = licnstat_df['TOTALPMT_FLOAT'].apply(lambda x: math.log(x+1))
licnstat_df['total_payment_adjusted_log'] = licnstat_df['TOTALPMT_ADJ'].apply(lambda x: math.log(x+1))

In [None]:
fig, ax = plt.subplots(figsize = (16, 8))
sns.boxplot(x = "total_payment_log", y = "LICNSTAT", data = licnstat_df, order = vis_order)

The above shows only top 20 counts. Lets try to do all states

In [None]:
df

LICNSTAT - total_payment_log

In [None]:
a = df.groupby(by = ['LICNSTAT']).median()
a = a.sort_values(by = ['total_payment_log'], ascending = False)
a = a[['total_payment_log']]
a['ORDER'] = a.index.copy()

vis_order = a['ORDER']
a

LICNSTAT - total_payment_log

In [None]:
fig, ax = plt.subplots(figsize = (16, 24))
sns.boxplot(x = "total_payment_log", y = "LICNSTAT", data = df, order = vis_order, palette = 'crest_r')

LICNSTAT - total_payment_adjusted_log - median

In [None]:
a = df.groupby(by = ['LICNSTAT']).median()
a = a.sort_values(by = ['total_payment_adjusted_log'], ascending = False)
a = a[['total_payment_adjusted_log']]
a['ORDER'] = a.index.copy()

vis_order = a['ORDER']
a

In [None]:
fig, ax = plt.subplots(figsize = (16, 24))
sns.boxplot(x = "total_payment_adjusted_log", y = "LICNSTAT", data = df, order = vis_order, palette = 'flare_r')

**WORKSTAT**

WORKSTAT - total_payment_log - median

In [None]:
a = df.groupby(by = ['WORKSTAT']).median()
a = a.sort_values(by = ['total_payment_log'], ascending = False)
a = a[['total_payment_log']]
a['ORDER'] = a.index.copy()

vis_order = a['ORDER']

In [None]:
fig, ax = plt.subplots(figsize = (16, 24))
sns.boxplot(x = "total_payment_log", y = "WORKSTAT", data = df, order = vis_order, palette = 'crest_r')

WORKSTAT - total_payment_adjusted_log - median

In [None]:
a = df.groupby(by = ['WORKSTAT']).median()
a = a.sort_values(by = ['total_payment_adjusted_log'], ascending = False)
a = a[['total_payment_adjusted_log']]
a['ORDER'] = a.index.copy()

vis_order = a['ORDER']

In [None]:
fig, ax = plt.subplots(figsize = (16, 24))
sns.boxplot(x = "total_payment_adjusted_log", y = "WORKSTAT", data = df, order = vis_order, palette = 'flare_r')

**PRACTAGE**

PRACTAGE - total_payment_log - median

In [None]:
df_copy = df.copy()

In [None]:
df_copy

In [None]:
df_copy['Practitioner Age Group'] = df_copy['PRACTAGE'].apply(lambda x: label(x))

In [None]:
a = df_copy.groupby(by = ['Practitioner Age Group']).median()

a = a[['total_payment_log']]
a['ORDER'] = a.index.copy()
a = a.sort_values(by = ['ORDER'], ascending = False)

vis_order = a['ORDER']

In [None]:
a

In [None]:
fig, ax = plt.subplots(figsize = (16, 8))
sns.boxplot(x = "total_payment_log", y = "Practitioner Age Group", data = df_copy, order = vis_order, palette = 'crest_r')

PRACTAGE - total_payment_adjusted_log - median

In [None]:
a = df_copy.groupby(by = ['Practitioner Age Group']).median()

a = a[['total_payment_adjusted_log']]
a['ORDER'] = a.index.copy()
a = a.sort_values(by = ['ORDER'], ascending = False)

vis_order = a['ORDER']

In [None]:
fig, ax = plt.subplots(figsize = (16, 8))
sns.boxplot(x = "total_payment_adjusted_log", y = "Practitioner Age Group", data = df_copy, order = vis_order, palette = 'flare_r')

**PRACTEXP**

PRACTEXP - total_payment_log - median

In [None]:
df_copy = df.copy()
df_copy['Practitioner Experience'] = df_copy['PRACTEXP_GROUP'].apply(lambda x: label(x))

In [None]:
a = df_copy.groupby(by = ['Practitioner Experience']).median()

a = a[['total_payment_log']]
a['ORDER'] = a.index.copy()
a = a.sort_values(by = ['ORDER'], ascending = False)

vis_order = a['ORDER']

In [None]:
a

In [None]:
fig, ax = plt.subplots(figsize = (16, 8))
sns.boxplot(x = "total_payment_log", y = "Practitioner Experience", data = df_copy, order = vis_order, palette = 'crest_r')

PRACTEXP - total_payment_log - median

In [None]:
a = df_copy.groupby(by = ['Practitioner Experience']).median()

a = a[['total_payment_adjusted_log']]
a['ORDER'] = a.index.copy()
a = a.sort_values(by = ['ORDER'], ascending = False)

vis_order = a['ORDER']

In [None]:
fig, ax = plt.subplots(figsize = (16, 8))
sns.boxplot(x = "total_payment_adjusted_log", y = "Practitioner Experience", data = df_copy, order = vis_order, palette = 'flare_r')

**ALEGATN1**

ALEGATN1 - total_payment_log - median

In [None]:
df_copy = df.copy()
df_copy['Allegation'] = df_copy['Label']

In [None]:
a = df_copy.groupby(by = ['Allegation']).median()
a = a.sort_values(by = ['total_payment_log'], ascending = False)
a = a[['total_payment_log']]
a['ORDER'] = a.index.copy()

vis_order = a['ORDER']

In [None]:
a

In [None]:
fig, ax = plt.subplots(figsize = (16, 24))
sns.boxplot(x = "total_payment_log", y = "Allegation", data = df_copy, order = vis_order, palette = 'crest_r')

ALEGATN1 - total_payment_adjusted_log - median

In [None]:
a = df_copy.groupby(by = ['Allegation']).median()
a = a.sort_values(by = ['total_payment_adjusted_log'], ascending = False)
a = a[['total_payment_log']]
a['ORDER'] = a.index.copy()

vis_order = a['ORDER']

In [None]:
fig, ax = plt.subplots(figsize = (16, 24))
sns.boxplot(x = "total_payment_adjusted_log", y = "Allegation", data = df_copy, order = vis_order, palette = 'flare_r')

**LICNFELD**

LICNFELD - total_payment_log - median

In [None]:
df

In [None]:
df.groupby(by = ['License Field']).median()

In [None]:
a = df.groupby(by = ['License Field']).median()
a = a.sort_values(by = ['total_payment_log'], ascending = False)
a = a[['total_payment_log']]
a['ORDER'] = a.index.copy()

vis_order = a['ORDER']

In [None]:
vis_order

In [None]:
fig, ax = plt.subplots(figsize = (16, 36))
sns.boxplot(x = "total_payment_log", y = "License Field", data = df, order = vis_order, palette = 'crest_r')

LICNFELD - total_payment_adjusted_log - median

In [None]:
a = df.groupby(by = ['License Field']).median()
a = a.sort_values(by = ['total_payment_adjusted_log'], ascending = False)
a = a[['total_payment_adjusted_log']]
a['ORDER'] = a.index.copy()

vis_order = a['ORDER']

In [None]:
fig, ax = plt.subplots(figsize = (16, 36))
sns.boxplot(x = "total_payment_adjusted_log", y = "License Field", data = df, order = vis_order, palette = 'flare_r')

**OUTCOME**

OUTCOME - total_payment_log - median

In [None]:
a = df.groupby(by = ['Outcome']).median()
a = a.sort_values(by = ['total_payment_log'], ascending = False)
a = a[['total_payment_log']]
a['ORDER'] = a.index.copy()

vis_order = a['ORDER']

In [None]:
fig, ax = plt.subplots(figsize = (16, 10))
sns.boxplot(x = "total_payment_log", y = "Outcome", data = df, order = vis_order, palette = 'crest_r')

OUTCOME - total_payment_adjusted_log - median

In [None]:
a = df.groupby(by = ['Outcome']).median()
a = a.sort_values(by = ['total_payment_adjusted_log'], ascending = False)
a = a[['total_payment_adjusted_log']]
a['ORDER'] = a.index.copy()

vis_order = a['ORDER']

In [None]:
fig, ax = plt.subplots(figsize = (16, 10))
sns.boxplot(x = "total_payment_adjusted_log", y = "Outcome", data = df, order = vis_order, palette = 'flare_r')