# Tarea 3
Estudiante: Sebastián Porras

Dataset : 

Breast-Cancer.csv

El cáncer de mama es el cáncer más común entre las mujeres en el mundo. Representa el 25 % de todos los casos de cáncer y afectó a más de 2,1 millones de personas solo en 2015. Comienza cuando las células en el seno comienzan a crecer sin control. Estas células generalmente forman tumores que se pueden ver a través de rayos X o sentir como bultos en el área del seno.

**La varaible a clasificar será Diagnosis (M - Maligno y  B - Benigno)**

In [None]:
import findspark
findspark.init()

import pyspark


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructField,StringType,IntegerType,StructType,DateType,FloatType
from pyspark.sql.functions import col,isnan, when, count,monotonically_increasing_id
from pyspark.ml.stat import Correlation
from pyspark.ml import Pipeline
from pyspark.ml.classification import RandomForestClassifier,DecisionTreeClassifier
from pyspark.ml.feature import IndexToString, StringIndexer, VectorIndexer
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator

import seaborn as sns
import matplotlib.pyplot as plt
plt.rcParams['figure.figsize']=(15,15)
import numpy as np
import pandas as pd

In [None]:
spark = SparkSession \
    .builder \
    .appName("Tarea3") \
    .config("spark.driver.extraClassPath", "postgresql-42.2.14.jar") \
    .config("spark.executor.extraClassPath", "postgresql-42.2.14.jar") \
    .getOrCreate()

In [None]:
def EscribirDatosEnTabla(dataframe,NombreTabla):
    dataframe \
    .write \
    .format("jdbc") \
    .mode('overwrite') \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", NombreTabla) \
    .save()

def LeerDatosEnBD(NombreTabla):
    return spark \
    .read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://host.docker.internal:5433/postgres") \
    .option("user", "postgres") \
    .option("password", "testPassword") \
    .option("dbtable", NombreTabla) \
    .load()


#Carga de Datos

Se cargan los datos bajo el schema creado

In [None]:
data_schema = StructType([
                StructField("id",IntegerType()),
                StructField("diagnosis",StringType()),
                StructField("radius_mean",FloatType()),
                StructField("texture_mean",FloatType()),
                StructField("perimeter_mean",FloatType()),
                StructField("area_mean",FloatType()),
                StructField("smoothness_mean",FloatType()),
                StructField("compactness_mean",FloatType()),
                StructField("concavity_mean",FloatType()),
                StructField("concave points_mean",FloatType()),
                StructField("symmetry_mean",FloatType()),
                StructField("fractal_dimension_mean",FloatType()),
                StructField("radius_se",FloatType()),
                StructField("texture_se",FloatType()),
                StructField("perimeter_se",FloatType()),
                StructField("area_se",FloatType()),
                StructField("smoothness_se",FloatType()),
                StructField("compactness_se",FloatType()),
                StructField("concavity_se",FloatType()),
                StructField("concave points_se",FloatType()),
                StructField("symmetry_se",FloatType()),
                StructField("fractal_dimension_se",FloatType()),
                StructField("radius_worst",FloatType()),
                StructField("texture_worst",FloatType()),
                StructField("perimeter_worst",FloatType()),
                StructField("area_worst",FloatType()),
                StructField("smoothness_worst",FloatType()),
                StructField("compactness_worst",FloatType()),
                StructField("concavity_worst",FloatType()),
                StructField("concave points_worst",FloatType()),
                StructField("symmetry_worst",FloatType()),
                StructField("fractal_dimension_worst",FloatType())]
)
            
data = spark.read.csv(
    'breast-cancer.csv',
    sep = ',',
    header = True,
    schema = data_schema
    )

data.printSchema()

In [None]:
data.show(n=10,truncate=False)

#Limpieza de datos

Contamos los valores nulos

In [None]:

data.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in data.columns]
   ).show()

Se hace un encoder para la columna diagnosis ya que en estos momentos es una variable categórica

In [None]:
from pyspark.ml.feature import StringIndexer
indexer = StringIndexer(inputCol="diagnosis", outputCol="diagnosisIndex") 
indexed = indexer.fit(data).transform(data) 
indexed.select("diagnosis","diagnosisIndex").show()
data=indexed



# Creación de graficas



In [None]:
y=data.toPandas()["radius_mean"].values.tolist()
x = np.arange(0, len(y))
print(len(y))
plt.title("radio medio del cancer")

plt.plot(x, y, color ="green")








plt.show()

In [None]:
y=data.toPandas()["symmetry_mean"].values.tolist()
x = np.arange(0, len(y))
print(len(y))
plt.title("simetria media del cancer")

plt.plot(x, y, color ="green")

In [None]:
y=data.toPandas()["smoothness_mean"].values.tolist()
x = np.arange(0, len(y))
print(len(y))
plt.title("suavidad media del cancer")

plt.plot(x, y, color ="green")

In [None]:
y=data.toPandas()["compactness_se"].values.tolist()
x = np.arange(0, len(y))
print(len(y))
plt.title("compacidad del cancer")

plt.plot(x, y, color ="green")


In [None]:
x=data.toPandas()["diagnosis"].values.tolist()

pd.Series(x).value_counts(sort=False).plot(kind='bar')

De las columnas analizadas ninguna parece seguir una distribución clara

# Vectorización

Creamos vectores para poder crear la correlacion 

In [None]:
from pyspark.ml.feature import VectorAssembler

def  vectorizar(dataframe):
    assembler = VectorAssembler(
                    inputCols=[
                "radius_mean",
                "texture_mean",
                "perimeter_mean",
                "area_mean",
                "smoothness_mean",
                "compactness_mean",
                "concavity_mean",
                "concave points_mean",
                "symmetry_mean",
                "fractal_dimension_mean",
                "radius_se",
                "texture_se",
                "perimeter_se",
                "area_se",
                "smoothness_se",
                "compactness_se",
                "concavity_se",
                "concave points_se",
                "symmetry_se",
                "fractal_dimension_se",
                "radius_worst",
                "texture_worst",
                "perimeter_worst",
                "area_worst",
                "smoothness_worst",
                "compactness_worst",
                "concavity_worst",
                "concave points_worst",
                "symmetry_worst",
                "fractal_dimension_worst"],
                    outputCol='features')

    vector_df = assembler.transform(dataframe)
    vector_df = vector_df.select(['features', 'diagnosisIndex'])
    return vector_df
vector_df=vectorizar(data)

pearson_matrix = Correlation.corr(vector_df, 'features').collect()[0][0]

sns.heatmap(pearson_matrix.toArray(), annot=True, fmt=".2f", cmap='viridis')

# Estandarización


In [None]:
from pyspark.ml.feature import StandardScaler
def Escalador(dataframe):
    standard_scaler = StandardScaler(inputCol='features', outputCol='scaled')
    scale_model = standard_scaler.fit(dataframe)

    scaled_df = scale_model.transform(dataframe)
   
    return scaled_df
scaled_df=Escalador(vector_df)
scaled_df.show()

Escritura a la BD con los datos limpios sin vecotorizar ni escalar

In [None]:
EscribirDatosEnTabla(data,"tarea3")

Lectura de datos

In [None]:
data=LeerDatosEnBD("tarea3")
data.show()
data=data.drop("id")

Separación de datos en train y test

In [None]:
train, test = data.randomSplit([0.7, 0.3], seed = 2)
print("Training Dataset Count: " + str(train.count()))
print("Test Dataset Count: " + str(test.count()))
train.printSchema()

# Entranamiento de los modelos

In [None]:
train.printSchema()
train=vectorizar(train)

train=Escalador(train)

testData=vectorizar(test)
testData=Escalador(testData)


 ## Random Forest

### Sin usar CrossValidation

In [None]:



rf = RandomForestClassifier(featuresCol = 'scaled', labelCol = 'diagnosisIndex', numTrees=15)
rfModel = rf.fit(train)
predictions = rfModel.transform(testData)



evaluator = MulticlassClassificationEvaluator(
    labelCol="diagnosisIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

predictions.printSchema()
predictions=predictions.withColumnRenamed("prediction","Prediccion")
predictions.select("diagnosisIndex", "Prediccion").show(30)
predictions=predictions.select("Prediccion")
print(accuracy)
print("Test Error = %g" % (1.0 - accuracy))

### Usando CrossValidation

In [None]:
rf = RandomForestClassifier(featuresCol = 'scaled', labelCol = 'diagnosisIndex')
evaluator = MulticlassClassificationEvaluator(
    labelCol="diagnosisIndex", predictionCol="prediction", metricName="accuracy")
rfparamGrid = (ParamGridBuilder()
             #.addGrid(rf.maxDepth, [2, 5, 10, 20, 30])
               .addGrid(rf.maxDepth, [2, 5, 10])
             #.addGrid(rf.maxBins, [10, 20, 40, 80, 100])
               .addGrid(rf.maxBins, [5, 10, 20])
             #.addGrid(rf.numTrees, [5, 20, 50, 100, 500])
               .addGrid(rf.numTrees, [5, 20, 50])
             .build())

rfcv = CrossValidator(estimator = rf,
                      estimatorParamMaps = rfparamGrid,
                      evaluator = evaluator,
                      numFolds = 5)

rfModel = rfcv.fit(train)

predictions = rfModel.transform(testData)



accuracy = evaluator.evaluate(predictions)
predictions.printSchema()
predictions=predictions.withColumnRenamed("prediction","Prediccion")
predictions.select("diagnosisIndex", "Prediccion").show(30)
predictions=predictions.select("Prediccion")
print(accuracy)
print("Test Error = %g" % (1.0 - accuracy))

### Escritura del modelo 1 a postgres

In [None]:

results=test.withColumn("mid",monotonically_increasing_id()).\
join(predictions.withColumn("mid",monotonically_increasing_id()),["mid"]).\
drop("mid")
results.printSchema()
results.show(3)
EscribirDatosEnTabla(results,"modelo1")


## Decision Tree

### Sin usar CrossValidation

In [None]:
dt = DecisionTreeClassifier(featuresCol = 'scaled', labelCol = 'diagnosisIndex',  maxDepth=15)
dtModel = dt.fit(train)
predictions = dtModel.transform(testData)



evaluator = MulticlassClassificationEvaluator(
    labelCol="diagnosisIndex", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

predictions.printSchema()
predictions=predictions.withColumnRenamed("prediction","Prediccion")
predictions.select("diagnosisIndex", "Prediccion").show(30)
predictions=predictions.select("Prediccion") 

print(accuracy)
print("Test Error = %g" % (1.0 - accuracy))

### Usando CrossValidation

In [1]:
dt = DecisionTreeClassifier(featuresCol = 'scaled', labelCol = 'diagnosisIndex',  maxDepth=15)
dtparamGrid = (ParamGridBuilder()
             .addGrid(dt.maxDepth, [2, 5, 10, 20, 30])
             .addGrid(dt.maxBins, [10, 20, 40, 80, 100])
             .build())
evaluator = MulticlassClassificationEvaluator(
    labelCol="diagnosisIndex", predictionCol="prediction", metricName="accuracy")

dtcv = CrossValidator(estimator = dt,
                      estimatorParamMaps = dtparamGrid,
                      evaluator = evaluator,
                      numFolds = 5)   
dtcvModel = dtcv.fit(train)
print(dtcvModel)

predictions = dtcvModel.transform(testData)




accuracy = evaluator.evaluate(predictions)
predictions.printSchema()
predictions=predictions.withColumnRenamed("prediction","Prediccion")
predictions.select("diagnosisIndex", "Prediccion").show(30)
predictions=predictions.select("Prediccion")
print(accuracy)
print("Test Error = %g" % (1.0 - accuracy))

NameError: ignored

### Escritura del modelo 2 a postgres

In [None]:
results=test.withColumn("mid",monotonically_increasing_id()).\
join(predictions.withColumn("mid",monotonically_increasing_id()),["mid"]).\
drop("mid")
results.printSchema()
results.show(3)
EscribirDatosEnTabla(results,"modelo2")

# Analisis de resultados

Los algoritmos utilizados fueron Random Forest y Decision Trees.
Con cada uno de estos se hicieron dos pruebas, una utilizando cross validation y la otra con el algoritmo vanilla.

* Random Forest Vanilla: 0.935672514619883 Accuracy
* Random Forest Crossvalidation : 0.9239766081871345 Accuracy
* Decision Tree Vanilla: 0.935672514619883 Accuracy
* Decision Tree CrossValidation: 0.9122807017543859 Accuracy

Es interesante como utilizando cross validation en ambos algoritmos el resultado nos da un poco peor que con solo usar el algoritmo vanilla.  Podemos concluir que sin importar el algoritmo que escogimos la versión vanilla da un 93.5% de accuracy 