#### Lo primero que vamos a hacer en nuestro notebook es el esquema, el esquema es la forma en la que queremos que nuestros datos aparezcan, escribimos el nombre de la columna y el tipo de dato que contiene, realizamos esto con todas las columnas y lo añadimos a la variable labels, y medante la función Struct type, crearemos el esuqema que mas adelante infereriremos.

In [1]:
import pyspark.ml.classification as cl


In [2]:
import pyspark.sql.types as typ


labels = [
    ('country', typ.StringType()),
    ('year', typ.IntegerType()),
    ('sex', typ.StringType()),
    ('age', typ.StringType()),
    ('suicide_no', typ.IntegerType()),
    ('population', typ.IntegerType()),
    ('suicides/100k pop', typ.DoubleType()),
    ('country-year', typ.StringType()),
    ('HDI for year', typ.DoubleType()),
    (' gdp_for_year ($) ', typ.StringType()),
    ('gdp_per_capita ($)', typ.IntegerType()),
    ('generation', typ.StringType())
]

mi_schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
])

Una vez creado el esquema, lo que vamos a hacer es importarnos el DataSet e implementar nuestro esquema, lo que conseguimos con esto es que todas las columnas se adopten el tipo de dato que nosotros hemos definido.

In [3]:
suicides =spark.read.csv('master.csv',header='true', inferSchema='false',schema = mi_schema)


In [4]:
suicides.head()

Row(country='Albania', year=1987, sex='male', age='15-24 years', suicide_no=21, population=312900, suicides/100k pop=6.71, country-year='Albania1987', HDI for year=None,  gdp_for_year ($) ='2,156,624,900', gdp_per_capita ($)=796, generation='Generation X')

Mediante la funcion withColumnRenamed, vamos a cambiar el nombre de ciertas variables ya que la informacion que nos dan con ese nombre nos parece confusa, en nustro caso, hemos cambiado las variables 'numero de suicidios por cada 100.000 habitantes', 'Indice de desarrollo humano por año', 'Producto interior bruto anual' y 'Producto interior bruto per capita'

In [5]:
suicides = suicides.withColumnRenamed('suicides/100k pop','Num_Suicides_100k')
suicides = suicides.withColumnRenamed('HDI for year','IDH_Y')
suicides = suicides.withColumnRenamed(' gdp_for_year ($) ','PIB_Y')
suicides = suicides.withColumnRenamed('gdp_per_capita ($)','PIB_PerCapita')

## Probando/Jugando con las funciones de pyspark (count , filter , distinct , collect ,etc)

In [6]:
#from pyspark.sql import functions as F
#df_null=midata.select(*(F.sum(F.col(c).isNull().cast('Double')).alias(c) for c in midata.columns))

In [7]:
pandaDf = suicides.toPandas()


In [8]:
pandaDf['country'].count()

27820

In [9]:
pandaDf.isnull().sum()

country                  0
year                     0
sex                      0
age                      0
suicide_no               0
population               0
Num_Suicides_100k        0
country-year             0
IDH_Y                19456
PIB_Y                    0
PIB_PerCapita            0
generation               0
dtype: int64

In [10]:
19456/len(pandaDf)

0.699352983465133

Viendo que hay 19456 null, es decir un 70% de los datos, consideramos que lo mejor es eliminar o dejar de utilizar esa columna

In [11]:
corr = pandaDf.corr()
corr.style.background_gradient(cmap='coolwarm')

Unnamed: 0,year,suicide_no,population,Num_Suicides_100k,IDH_Y,PIB_PerCapita
year,1.0,-0.004546,0.00885,-0.039037,0.366786,0.339134
suicide_no,-0.004546,1.0,0.616162,0.306604,0.151399,0.06133
population,0.00885,0.616162,1.0,0.008285,0.102943,0.08151
Num_Suicides_100k,-0.039037,0.306604,0.008285,1.0,0.074279,0.001785
IDH_Y,0.366786,0.151399,0.102943,0.074279,1.0,0.771228
PIB_PerCapita,0.339134,0.06133,0.08151,0.001785,0.771228,1.0


In [12]:
Num_Country1 = suicides.groupBy('country').count()
Num_Country2 = suicides.select('country').distinct().count()
Num_Country3 = Num_Country1.select('count').count()
Num_Country2

101

In [13]:
SpainSuicides = suicides.select("country","year","suicide_no").filter("country = 'Spain'")
SpainSuicides.show(5)

+-------+----+----------+
|country|year|suicide_no|
+-------+----+----------+
|  Spain|1985|       305|
|  Spain|1985|       624|
|  Spain|1985|       131|
|  Spain|1985|       497|
|  Spain|1985|       219|
+-------+----+----------+
only showing top 5 rows



In [14]:
corr = suicides.corr( 'suicide_no','Num_Suicides_100k' )
corr

0.30660445126778024

Observamos la corelacion entre la variable objetivo 'Numero de suicidios por 100.000 habitantes' y la variables 'suicidios totales'

In [15]:
maxSuicide = max(suicides.select('suicide_no').collect())[0]
PaisxSuicides = suicides.select('country' ,'year' ,'sex' , 'age' , 'Num_Suicides_100k', 'suicide_no','population').filter("suicide_no = 22338").show()

+------------------+----+----+-----------+-----------------+----------+----------+
|           country|year| sex|        age|Num_Suicides_100k|suicide_no|population|
+------------------+----+----+-----------+-----------------+----------+----------+
|Russian Federation|1994|male|35-54 years|            117.3|     22338|  19044200|
+------------------+----+----+-----------+-----------------+----------+----------+



In [16]:
max(suicides.select('Num_Suicides_100k').collect())[0]

224.97

In [17]:
max(suicides.select('suicide_no').collect())

Row(suicide_no=22338)

In [18]:
suicides.select('year').distinct().count()

32

### Aquí empezamos

### -------------------------------------------------------------------------------------------------------------------------

# Creamos el transformer

In [19]:
edades = []
for i in range(suicides.select('age').distinct().count()):
    edades.append(suicides.select('age').distinct().collect()[i][0])

In [20]:
edades

['55-74 years',
 '25-34 years',
 '5-14 years',
 '75+ years',
 '15-24 years',
 '35-54 years']

In [21]:
import pyspark.sql.types as typ
from pyspark.sql.functions import col, expr, when
#suicides = suicides.withColumn('sexBin', when((col("sex") == "male").otherwise(0)))
new_column = when(col("sex") == 'male' , 0).when(col("sex")=='female', 1)
suicides = suicides.withColumn("sex" , new_column)

new_column_1 = when(col("age") == edades[2] , 0).when(col("age") == edades[4] , 1).when(col("age") == edades[1] , 2).when(col("age") == edades[5] , 3).when(col("age") == edades[0] , 4).when(col("age") == edades[3] , 5)
suicides = suicides.withColumn("age" , new_column_1)




In [22]:
#suicides = suicides.withColumn('suicides_num', suicides['suicide_no'].cast(typ.DoubleType()))
#suicides = suicides.withColumn('population', suicides['population'].cast(typ.DoubleType()))
#suicides = suicides.withColumn('PIB_PerCapita', suicides['PIB_PerCapita'].cast(typ.DoubleType()))
#suicides = suicides.withColumn('sex', suicides['sex'].cast(typ.DoubleType()))
suicides = suicides.withColumn('age', suicides['age'].cast(typ.DoubleType()))



In [23]:
import pyspark.ml.feature as ft

encoder = ft.OneHotEncoder(
    inputCol='age', 
    outputCol='age_vec')

In [24]:
vectorizer = ft.VectorAssembler(inputCols=['suicide_no' , 'PIB_PerCapita'], outputCol= 'continuous_vec')

In [25]:
normalizer = ft.StandardScaler(
    inputCol=vectorizer.getOutputCol(), 
    outputCol='normalized', 
    withMean=True,
    withStd=True
)

Nuestro obejtivo es que esta variable sea de tipo binario, en el caso del sexo, la variable viene dada como un string 'male' o 'female' y nostros lo que queremos es que esta variable nos aparezca en binario

Con el comando withColum, vamos a sobreescribir la variable 'sex', ya que si el nombre no se cambia aplicando dicho comando, la variable se sustituye por el nuevo valor.

In [26]:
import pyspark.ml.feature as ft

labels = ['year','population','Num_Suicides_100k']
featuresCreator = ft.VectorAssembler(inputCols = labels + [encoder.getOutputCol() ] ,outputCol = 'features')

In [27]:
from pyspark.ml.feature import PCA

pca = PCA(k=6, inputCol = 'features' , outputCol = 'pcaFeature')


### Creamos el Estimator - REGRESIÓN LOGÍSTICA

In [28]:

logistic = cl.LogisticRegression(
    maxIter=10, 
    regParam=0.01,
    labelCol='sex'
    )

### Creamos el Pipeline

El Pipeline es una funcion que se encarga de ejecutar los trasformadores y los algoritmos en un solo paso. Es como una tubería que transporta estimators y transformers hasta su ejecucion.

In [29]:
from pyspark.ml import Pipeline

pipeline = Pipeline(stages=[encoder, vectorizer , normalizer, featuresCreator,pca, logistic])
#pipeline = Pipeline(stages=[encoder, featuresCreator, logistic])

In [30]:
suicides_train, suicides_test = suicides.randomSplit([0.8, 0.2], seed=666) 

#seed numero aleatorio a partir del cual hace la particion del DataFrame

In [31]:
suicides.select('Num_Suicides_100k').filter("Num_Suicides_100k > 150").count()

22

###### Entrenamos el modelo

In [32]:
model = pipeline.fit(suicides_train)

In [33]:
test_model = model.transform(suicides_test)

In [34]:
test_model.take(1)

[Row(country='Albania', year=1987, sex=0, age=5.0, suicide_no=1, population=21800, Num_Suicides_100k=4.59, country-year='Albania1987', IDH_Y=None, PIB_Y='2,156,624,900', PIB_PerCapita=796, generation='G.I. Generation', age_vec=SparseVector(5, {}), continuous_vec=DenseVector([1.0, 796.0]), normalized=DenseVector([-0.2642, -0.8496]), features=SparseVector(8, {0: 1987.0, 1: 21800.0, 2: 4.59}), pcaFeature=DenseVector([-21800.0, -40.3562, 1986.5945, 0.4259, -0.024, 0.1916]), rawPrediction=DenseVector([-0.5902, 0.5902]), probability=DenseVector([0.3566, 0.6434]), prediction=1.0)]

#### PARTE DE EVALUACIÓN: Vamos a ver cómo de bueno es el modelo

In [35]:
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='sex')

print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model, {evaluator.metricName: 'areaUnderPR'}))

0.7904848798440914
0.7343022107630246


In [36]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='sex', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(test_model)
print('Accuracy = %g'  % accuracy)

Accuracy = 0.728798


# Probando con Random Forest

In [37]:
from pyspark.ml.feature import StringIndexer

indexer = StringIndexer(inputCol="sex", outputCol="sex_indexer").fit(suicides)

In [38]:
suicides_train, suicides_test = suicides.randomSplit([0.8 , 0.2], seed=666)

In [39]:
from pyspark.ml import Pipeline

classifier = cl.RandomForestClassifier(
    numTrees=5, 
    maxDepth=5, 
    labelCol='sex_indexer')


pipeline = Pipeline(
    stages=[encoder, indexer ,vectorizer , normalizer, featuresCreator,pca, classifier])

model1 = pipeline.fit(suicides_train)
test = model1.transform(suicides_test)

y_true = test.select(['Num_Suicides_100k_indexer']).collect()
y_pred = test.select(['prediction']).collect()

from sklearn.metrics import classification_report, confusion_matrix
print(classification_report(y_true, y_pred))

In [40]:
test.take(1)

[Row(country='Albania', year=1987, sex=0, age=5.0, suicide_no=1, population=21800, Num_Suicides_100k=4.59, country-year='Albania1987', IDH_Y=None, PIB_Y='2,156,624,900', PIB_PerCapita=796, generation='G.I. Generation', age_vec=SparseVector(5, {}), sex_indexer=0.0, continuous_vec=DenseVector([1.0, 796.0]), normalized=DenseVector([-0.2642, -0.8496]), features=SparseVector(8, {0: 1987.0, 1: 21800.0, 2: 4.59}), pcaFeature=DenseVector([-21800.0, -40.3562, 1986.5945, 0.4259, -0.024, 0.1916]), rawPrediction=DenseVector([1.6586, 3.3414]), probability=DenseVector([0.3317, 0.6683]), prediction=1.0)]

In [41]:
import pyspark.ml.evaluation as ev

evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability', 
    labelCol='sex_indexer')

print(evaluator.evaluate(test, 
     {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test, {evaluator.metricName: 'areaUnderPR'}))

0.7846154944684469
0.7389757445399499


In [42]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

evaluator = MulticlassClassificationEvaluator(labelCol='sex_indexer', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(test)
print('Accuracy = %g'  % accuracy)

Accuracy = 0.72255


# Hyper_Tunning Random Forest

In [43]:
import pyspark.ml.tuning as tune
import pyspark.ml.evaluation as ev

In [44]:
classifier = cl.RandomForestClassifier(numTrees=5, maxDepth=5, labelCol='sex_indexer')

grid = tune.ParamGridBuilder().addGrid(classifier.numTrees, [2,4,6,8]).addGrid(classifier.maxDepth, [3,5,8,12]).build()

In [45]:
evaluator = ev.BinaryClassificationEvaluator(rawPredictionCol='probability', labelCol='sex_indexer')

In [46]:
cv = tune.CrossValidator(estimator=classifier, estimatorParamMaps=grid, evaluator=evaluator)

In [47]:
pipeline = Pipeline(stages=[encoder, indexer ,vectorizer , normalizer, featuresCreator,pca])
data_transformer = pipeline.fit(suicides_train)

In [48]:
cvModel = cv.fit(data_transformer.transform(suicides_train))

In [49]:
data_train = data_transformer.transform(suicides_test)
results = cvModel.transform(data_train)

print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results, {evaluator.metricName: 'areaUnderPR'}))

0.836805227930784
0.8170722564641595


In [50]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
evaluator = MulticlassClassificationEvaluator(labelCol='sex_indexer', predictionCol='prediction', metricName='accuracy')
accuracy = evaluator.evaluate(results)
print('Accuracy = %g'  % accuracy)

Accuracy = 0.753973


In [51]:
results = [
    (
        [
            {key.name: paramValue} 
            for key, paramValue 
            in zip(
                params.keys(), 
                params.values())
        ], metric
    ) 
    for params, metric 
    in zip(
        cvModel.getEstimatorParamMaps(), 
        cvModel.avgMetrics
    )
]

sorted(results, key=lambda el: el[1], reverse=True)[0]

([{'numTrees': 8}, {'maxDepth': 12}], 0.8325260658141422)

# Hyper_Tunning LogisticRegression