In [1]:
# Importamos los modulos necesarios
import pyspark  #python package

In [2]:
# objeto principal o la base a partir de la cual cuelga toda la funcionalidad de Apache Spark
from pyspark.sql import SparkSession

# Se trata del context básico de Spark, desde donde se crean el resto de variables 
# que maneja el framework. Sólo un SparkContext puede estar activo por JVM.
from pyspark import SparkContext

# el objeto functions del modulo sql define las funciones estándar incorporadas 
# para trabajar con (valores producidos) columnas.
import pyspark.sql.functions as func

In [3]:
# create a new spark session, que sera la base para nuestra aplicacion

spark = SparkSession.builder\
                    .appName("Test")\
                    .getOrCreate()

#spark sera el punto de entrada para la aplicacion

In [4]:

ls prueba_cientifico_datos/data_test/

test_beneficiario.csv             test_pacientes_aprobados.csv
test_labels_aprovisionadores.csv  test_pacientes_rechazados.csv


In [5]:
df_beneficiario = spark.read.format("csv")\
                            .option("header", "true")\
                            .option("inferSchema", "true")\
                            .load("prueba_cientifico_datos/data_train/train_beneficiario.csv")
df_aprovisionadores = spark.read.format("csv")\
                            .option("header", "true")\
                            .option("inferSchema", "true")\
                            .load("prueba_cientifico_datos/data_train/train_labels_aprovisionadores.csv")
df_pacientes_aprobados = spark.read.format("csv")\
                            .option("header", "true")\
                            .option("inferSchema", "true")\
                            .load("prueba_cientifico_datos/data_train/train_pacientes_aprobados.csv")
df_pacientes_rechazados = spark.read.format("csv")\
                            .option("header", "true")\
                            .option("inferSchema", "true")\
                            .load("prueba_cientifico_datos/data_train/train_pacientes_rechazados.csv")

In [6]:
df_beneficiario.printSchema()

root
 |-- alzheimer: integer (nullable = true)
 |-- artritis_reumatoide: integer (nullable = true)
 |-- cancer: integer (nullable = true)
 |-- ciudad: integer (nullable = true)
 |-- cod_beneficiario: string (nullable = true)
 |-- corazon_isquemico: integer (nullable = true)
 |-- depresion: integer (nullable = true)
 |-- diabetes: integer (nullable = true)
 |-- enfermedad_renal: string (nullable = true)
 |-- genero: integer (nullable = true)
 |-- infarto_cerebral: integer (nullable = true)
 |-- ips_deducible_anual: integer (nullable = true)
 |-- ips_reembolso_anual: integer (nullable = true)
 |-- meses_parte_a: integer (nullable = true)
 |-- meses_parte_b: integer (nullable = true)
 |-- muerte: timestamp (nullable = true)
 |-- municipio: integer (nullable = true)
 |-- nacimiento: timestamp (nullable = true)
 |-- obstruccion_pulmonar: integer (nullable = true)
 |-- op_deducible_anual: integer (nullable = true)
 |-- op_reembolso_anual: integer (nullable = true)
 |-- osteoporosis: integer 

In [7]:
df_aprovisionadores.printSchema()

root
 |-- aprovisionador: string (nullable = true)
 |-- es_fraudulento: string (nullable = true)



In [8]:
df_pacientes_aprobados.printSchema()

root
 |-- aprovisionador: string (nullable = true)
 |-- cod_beneficiario: string (nullable = true)
 |-- deducible: double (nullable = true)
 |-- diagnostico_a: string (nullable = true)
 |-- diagnostico_admision: string (nullable = true)
 |-- diagnostico_b: string (nullable = true)
 |-- diagnostico_c: string (nullable = true)
 |-- diagnostico_d: string (nullable = true)
 |-- diagnostico_e: string (nullable = true)
 |-- diagnostico_f: string (nullable = true)
 |-- diagnostico_g: string (nullable = true)
 |-- diagnostico_h: string (nullable = true)
 |-- diagnostico_i: string (nullable = true)
 |-- diagnostico_j: string (nullable = true)
 |-- fecha_admision: timestamp (nullable = true)
 |-- fecha_desembolso: timestamp (nullable = true)
 |-- fin_peticion: timestamp (nullable = true)
 |-- grupo_diagnostico: string (nullable = true)
 |-- id_peticion: string (nullable = true)
 |-- inicio_peticion: timestamp (nullable = true)
 |-- medico_atiende: string (nullable = true)
 |-- medico_operacion: 

In [9]:
df_pacientes_rechazados.printSchema()

root
 |-- aprovisionador: string (nullable = true)
 |-- cod_beneficiario: string (nullable = true)
 |-- deducible: integer (nullable = true)
 |-- diagnostico_a: string (nullable = true)
 |-- diagnostico_admision: string (nullable = true)
 |-- diagnostico_b: string (nullable = true)
 |-- diagnostico_c: string (nullable = true)
 |-- diagnostico_d: string (nullable = true)
 |-- diagnostico_e: string (nullable = true)
 |-- diagnostico_f: string (nullable = true)
 |-- diagnostico_g: string (nullable = true)
 |-- diagnostico_h: string (nullable = true)
 |-- diagnostico_i: string (nullable = true)
 |-- diagnostico_j: string (nullable = true)
 |-- fin_peticion: timestamp (nullable = true)
 |-- id_peticion: string (nullable = true)
 |-- inicio_peticion: timestamp (nullable = true)
 |-- medico_atiende: string (nullable = true)
 |-- medico_operacion: string (nullable = true)
 |-- monto_reembolso: integer (nullable = true)
 |-- otros_medicos: string (nullable = true)
 |-- procedimiento_u: string (

In [10]:
# agregamos columna aprobado en los df_pacientes_aprobados y df_pacientes_rechazados
df_pacientes_aprobados = df_pacientes_aprobados.withColumn("aprobado", func.lit(True))
df_pacientes_rechazados = df_pacientes_rechazados.withColumn("aprobado", func.lit(False))

In [11]:
# agregamos columnas faltantes en df_pacientes_rechazados
# df_pacientes_rechazados = df_pacientes_rechazados.withColumn("fecha_admision", func.lit(None).cast('timestamp'))
# df_pacientes_rechazados = df_pacientes_rechazados.withColumn("fecha_desembolso", func.lit(None).cast('timestamp'))
# df_pacientes_rechazados = df_pacientes_rechazados.withColumn("grupo_diagnostico", func.lit(None).cast('string'))
df_pacientes_aprobados = df_pacientes_aprobados.drop("fecha_admision","fecha_desembolso","grupo_diagnostico")

In [12]:
# damos el mismo formato de columna en df_pacientes_rechazados
df_pacientes_rechazados = df_pacientes_rechazados.withColumn("deducible", func.col("deducible").cast("double"))
df_pacientes_rechazados = df_pacientes_rechazados.withColumn("procedimiento_v", func.col("procedimiento_v").cast("double"))

In [13]:
# ordenamos el df_pacientes_rechazados en el mismo orden de df_pacientes aprobados para realizar el union
df_pacientes_rechazados = df_pacientes_rechazados.select(df_pacientes_aprobados.columns)

In [14]:
# creames el df_pacientes de la union de df_pacientes_aprobados con df_pacientes_rechazados
df_pacientes = df_pacientes_aprobados.union(df_pacientes_rechazados)

In [15]:
# a cada paciente agregamos la informacion del beneficiario
df_pacientes_beneficiario = df_pacientes.join(df_beneficiario, on=["cod_beneficiario"], how="left")

In [16]:
# agregamos la columna es_fraudulento en funcion de cada aprovisionador
df_data = df_pacientes_beneficiario.join(df_aprovisionadores, on=["aprovisionador"], how="left")

In [18]:
df_data = df_data.drop("cod_beneficiario","aprovisionador","id_peticion","fin_peticion","inicio_peticion")

In [19]:
df_data.printSchema()

root
 |-- deducible: double (nullable = true)
 |-- diagnostico_a: string (nullable = true)
 |-- diagnostico_admision: string (nullable = true)
 |-- diagnostico_b: string (nullable = true)
 |-- diagnostico_c: string (nullable = true)
 |-- diagnostico_d: string (nullable = true)
 |-- diagnostico_e: string (nullable = true)
 |-- diagnostico_f: string (nullable = true)
 |-- diagnostico_g: string (nullable = true)
 |-- diagnostico_h: string (nullable = true)
 |-- diagnostico_i: string (nullable = true)
 |-- diagnostico_j: string (nullable = true)
 |-- medico_atiende: string (nullable = true)
 |-- medico_operacion: string (nullable = true)
 |-- monto_reembolso: integer (nullable = true)
 |-- otros_medicos: string (nullable = true)
 |-- procedimiento_u: string (nullable = true)
 |-- procedimiento_v: double (nullable = true)
 |-- procedimiento_w: double (nullable = true)
 |-- procedimiento_x: double (nullable = true)
 |-- procedimiento_y: double (nullable = true)
 |-- procedimiento_z: double (

In [27]:
import h2o
from h2o.automl import H2OAutoML

In [21]:
h2o.init()

Checking whether there is an H2O instance running at http://localhost:54321 ..... not found.
Attempting to start a local H2O server...
  Java Version: java version "1.8.0_221"; Java(TM) SE Runtime Environment (build 1.8.0_221-b11); Java HotSpot(TM) 64-Bit Server VM (build 25.221-b11, mixed mode)
  Starting server from /Users/josedavidarevaloespinosa/anaconda3/lib/python3.7/site-packages/h2o/backend/bin/h2o.jar
  Ice root: /var/folders/kw/3d202bj112j_n1fzxg4hbw_00000gn/T/tmpzrgljud8
  JVM stdout: /var/folders/kw/3d202bj112j_n1fzxg4hbw_00000gn/T/tmpzrgljud8/h2o_josedavidarevaloespinosa_started_from_python.out
  JVM stderr: /var/folders/kw/3d202bj112j_n1fzxg4hbw_00000gn/T/tmpzrgljud8/h2o_josedavidarevaloespinosa_started_from_python.err
  Server is running at http://127.0.0.1:54321
Connecting to H2O server at http://127.0.0.1:54321 ... successful.


0,1
H2O cluster uptime:,01 secs
H2O cluster timezone:,America/Bogota
H2O data parsing timezone:,UTC
H2O cluster version:,3.26.0.6
H2O cluster version age:,4 days
H2O cluster name:,H2O_from_python_josedavidarevaloespinosa_vmxudk
H2O cluster total nodes:,1
H2O cluster free memory:,3.556 Gb
H2O cluster total cores:,4
H2O cluster allowed cores:,4


In [None]:
# !pip install h2o_pysparkling_2.2

In [22]:
pdf = df_data.toPandas()

In [23]:
hdf = h2o.H2OFrame(pdf)

Parse progress: |█████████████████████████████████████████████████████████| 100%


In [25]:
x = ['deducible', 'diagnostico_a', 'diagnostico_admision', 'diagnostico_b',
       'diagnostico_c', 'diagnostico_d', 'diagnostico_e', 'diagnostico_f',
       'diagnostico_g', 'diagnostico_h', 'diagnostico_i', 'diagnostico_j',
       'medico_atiende', 'medico_operacion', 'monto_reembolso',
       'otros_medicos', 'procedimiento_u', 'procedimiento_v',
       'procedimiento_w', 'procedimiento_x', 'procedimiento_y',
       'procedimiento_z', 'aprobado', 'alzheimer', 'artritis_reumatoide',
       'cancer', 'ciudad', 'corazon_isquemico', 'depresion', 'diabetes',
       'enfermedad_renal', 'genero', 'infarto_cerebral', 'ips_deducible_anual',
       'ips_reembolso_anual', 'meses_parte_a', 'meses_parte_b', 'muerte',
       'municipio', 'nacimiento', 'obstruccion_pulmonar', 'op_deducible_anual',
       'op_reembolso_anual', 'osteoporosis', 'problemas_corazon',
       'problemas_rinon', 'raza']
y = 'es_fraudulento'

In [28]:
aml = H2OAutoML(max_models=5)

In [29]:
train, test, valid = hdf.split_frame(ratios=[0.8,0.1])

In [30]:
aml.train(x=x,y=y,training_frame=train, validation_frame=valid )

AutoML progress: |████████████████████████████████████████████████████████| 100%


In [31]:
aml.leaderboard

model_id,auc,logloss,mean_per_class_error,rmse,mse
XGBoost_1_AutoML_20191006_234426,0.86724,0.464225,0.214695,0.384306,0.147691




In [32]:
model = aml.leader

In [None]:
model.model_id = 'fraud_model'