## Crear Sesión de Spark


In [78]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").getOrCreate()

## Cargar y mostrar la data

In [79]:
file_csv = 'data/UCI_Credit_Card.csv'

data_file = spark.read.csv(file_csv, header=True)

In [80]:
data_file.show()

+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_2|PAY_3|PAY_4|PAY_5|PAY_6|BILL_AMT1|BILL_AMT2|BILL_AMT3|BILL_AMT4|BILL_AMT5|BILL_AMT6|PAY_AMT1|PAY_AMT2|PAY_AMT3|PAY_AMT4|PAY_AMT5|PAY_AMT6|default.payment.next.month|
+---+---------+---+---------+--------+---+-----+-----+-----+-----+-----+-----+---------+---------+---------+---------+---------+---------+--------+--------+--------+--------+--------+--------+--------------------------+
|  1|    20000|  2|        2|       1| 24|    2|    2|   -1|   -1|   -2|   -2|     3913|     3102|      689|        0|        0|        0|       0|     689|       0|       0|       0|       0|                         1|
|  2|   120000|  2|        2|       2| 26|   -1|    2|    0|    0|    0|    2|     2682|     1725|     2682|     3272|  

### Se requiere fundamentación básica de SQL para las siguientes operaciones (el manejo de los dataframes es análogo)

#### Agregar columna al dataframe

- data_file = data_file.withColumn("colname", new_column)

#### Filtrar datos con una string SQL

- filtered_data = data_file.filter("colname > value")

#### Filtrar datos con una columna booleana

- filtered_data2 = data_file.filter(data.colname > value)

#### Seleccionar el primer conjunto de columnas

- selected1 = data_file.select("colname1", "colname2", "colname3")

#### Agrupar por columna 

- grouped_data = data_file.groupBy("colname")

#### Unir dataframes

- joined_data = data1.join(data2, on="common_colname", how="join_type")


## Machine Learning Pipeline


In [81]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler

### Cast columnas a enteros

In [64]:
# cuando tenga nombres de columnas con puntos usar backticks: df.select("`col0.1`")
# data_file = data_file.select('`default.payment.next.month`').show(5)
#data_file.withColumnRenamed('`default.payment.next.month`', 'label')

+--------------------------+
|default.payment.next.month|
+--------------------------+
|                         1|
|                         1|
|                         0|
|                         0|
|                         0|
+--------------------------+
only showing top 5 rows



### Es mejor eliminar los puntos, esto solo es util cuando se trabajan con nested schemas

In [82]:
clean_df = data_file.toDF(*(c.replace('.', '_') for c in data_file.columns))


In [85]:
clean_df = clean_df.withColumn("LIMIT_BAL", clean_df.LIMIT_BAL.cast("Integer"))
clean_df = clean_df.withColumn("SEX", clean_df.SEX.cast("Integer"))
clean_df = clean_df.withColumn("EDUCATION", clean_df.EDUCATION.cast("Integer"))
clean_df = clean_df.withColumn("MARRIAGE", clean_df.MARRIAGE.cast("Integer"))

### Crear labels

In [86]:
clean_df = clean_df.withColumn("label", clean_df.default_payment_next_month.cast("Integer"))

In [87]:
clean_df.select('label').show(5)

+-----+
|label|
+-----+
|    1|
|    1|
|    0|
|    0|
|    0|
+-----+
only showing top 5 rows



### Eliminar valores faltantes (Nan)

In [88]:
clean_df = clean_df.filter("LIMIT_BAL is not NULL and SEX is not NULL and EDUCATION is not NULL and MARRIAGE is not NULL")

### Hacer un VectorAssembler

In [91]:
vec_assembler = VectorAssembler(inputCols=["LIMIT_BAL", "SEX", "EDUCATION", "MARRIAGE"],
                                outputCol="features")
pipeline = Pipeline(stages=[vec_assembler])

### Crear un StringIndexer
- string_indexer = StringIndexer(inputCol=”inputCol”, outputCol=”outputCol”)
### Crear un OneHotEncoder
- one_encoder = OneHotEncoder(inputCol=”inputCol”, outputCol=”outputCol”)

### Particionar la data en train y test

In [92]:
pipe_data = pipeline.fit(clean_df).transform(clean_df)

train, test = pipe_data.randomSplit([0.8, 0.2])

## Finalmente ajustar el modelo y ponerlo a prueba


In [93]:
from pyspark.ml.classification import LogisticRegression
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune

### Crear el estimador

In [94]:
logistic_reg = LogisticRegression()

best_log_reg = logistic_reg.fit(train)

predictions = best_log_reg.transform(test)