In [None]:
%matplotlib inline
import matplotlib.pyplot as plt
from pyspark.sql import Row
from pyspark.mllib.linalg import Vectors
from pyspark.ml.feature import StandardScaler

import week4 as w4
sc.setLogLevel('DEBUG')

## Lectura del fichero CSV

Vamos a cargar el fichero, que previamente hemos ingestado en HDFS, en un RDD de Spark, con el formato conveniente para nuestros objetivos.

In [None]:
lines = sc.textFile('/user/cloudera/T_F_DR14_ZooSpec_10000.csv')

In [None]:
# Vemos que hay 10.001 filas en el RDD. Esto quiere decir que incluye el header o cabecera
lines.count()

In [None]:
# Vamos a desechar el header
lines_f = lines.zipWithIndex().filter(lambda tup: tup[1] > 0).map(lambda x: x[0])

In [None]:
# Convertimos las lineas de texto separado por comas en un DataFrame
rows = lines_f.map(lambda l: l.split(","))

def build_features_bis(p):
    return (p[0], int(p[1]), Vectors.dense([float(e) for e in p[2:]]),)

data = rows.map(build_features_bis)
df = sqlContext.createDataFrame(data, ['dr7objid', 'target', 'features'])

Un DataFrame es un objeto similar a una tabla con filas y columnas. En este caso, tiene 10.000 filas y tres columnas:
* dr7objid: texto, id de la galaxia
* target: número entero, tipo de galaxia
* features: vector de 4096 posiciones con los píxeles

In [None]:
df.show(1)

In [None]:
# Vemos el número de filas y de columnas que tiene el DataFrame
print('Número de filas (imágenes): {}'.format(df.count()))
print('Número de columnas (id + target + features): {}'.format(len(df.columns)))

## Filtrado de imágenes no clasificadas

Para el entrenamiento no nos hacen falta todos los datos, nos basta con los datos de aquellas imágenes que se han clasificado satisfactoriamente. Así pues, vamos a descartar aquellas imágenes cuyo campo `target` tiene valor `0`.

In [None]:
# Vemos cuántas imágenes hay de cada tipo
# 0 = incierto
# 1 = elíptica
# 2 = espiral
df.groupBy('target').count().show()

In [None]:
# Construímos un nuevo DataFrame solamente con las imágenes clasificadas
labeled_df = df.filter(df['target'] != 0)

In [None]:
# Vemos que el número de imágenes seleccionadas es coherente con la query anterior
labeled_df.count()

Vemos que valores toman los datos de este DataFrame

In [None]:
w4.describe_n(labeled_df, 4, 'features', 'feat_{0}')

## Reducción de los datos: Principal Component Analysis (PCA)

Los datos de los atributos, no es el ideal para entrenar un algoritmo de clasificación:
* **es muy grande** 3701 filas * 4096 columnas ~ 15M de celdas
* **es poco denso** hay pocas muestras (3701 imágenes) para el número de atributos (4096 píxeles). De intentar aplicar algunos algoritmos de clasificación sobre este conjunto de datos, podríamos incurrir en la [maldición de la dimensión (en inglés)](https://en.wikipedia.org/wiki/Curse_of_dimensionality)

Para solucionar ambos problemas utilizaremos el método PCA para reducir el número de atributos.

### Estandarización de los datos

Para poder aplicar la PCA, previamente tenemos que estandarizar los datos, eso es que todos los atributos esten centrados en 0 (tengan media 0).

In [None]:
scaler_1 = StandardScaler(inputCol="features", outputCol="std_features",
                        withStd=False, withMean=True)
scalerModel_1 = scaler_1.fit(labeled_df)
std_features = scalerModel_1.transform(labeled_df)

In [None]:
w4.describe_n(std_features, 4, 'std_features', 'std_feat_{0}')

### Aplicación de la PCA

Ahora sí, aplicaremos la PCA.

En este caso utilizaremos una implementación propia de la PCA, puesto que a la implementación que se puede encontrar en spark 1.6 le faltan algunas características que nos serán de utilidad

In [None]:
import PCA as _pca

La función `pca` nos devuelve tres objetos:
* **comp**: numpy array de dimensiones n_features * k, con los componentes principales de nuestro set de datos
* **score**: dataframe con la columna 'score' añadida, que contiene los k primeros coeficientes de los datos transformados al nuevo sistema de coordenadas
* **eigVals**: numpy array de dimensión n_features, con valores propios de la matriz de covarianzas de nuestro set de datos

Esta función puede tardar un rato: entre 10 y 15 minutos.

In [None]:
comp, score, eigVals = _pca.pca(std_features, k=64, features_col='std_features')

### Número de componentes a conservar

Vamos a ver qué porcentaje de información conservamos. Debemos seleccionar un número (k) lo más pequeño posible de 
componentes intentando conservar la mayor cantidad de información.

In [None]:
n_atr = range(1, 1000, 5)
info_perc = [_pca.info_perc(eigVals, i) for i in n_atr]
plt.plot(n_atr, info_perc)
plt.xlabel('Número de componentes seleccionados (k)')
plt.ylabel('Porcentaje de información conservado')
plt.axhline(95, color='k', linestyle='--')
plt.grid()

Tomamos `k=64`, lo cual nos permite retener más del 95% de la información mientras que reducimos 64 veces el número de atributos

In [None]:
_pca.info_perc(eigVals, 64)

### Análisis de los componentes conservados

Los componentes conservados suelen capturar patrones significativos de los datos. Vemos en nuestro caso què son estos patrones.

In [None]:
n_comp_x = 5
n_comp_y = 5
g_size=1.
fig = plt.figure(figsize=(g_size*n_comp_x, g_size*n_comp_y))
for i in range(n_comp_x):
    for j in range(n_comp_y):
        comp_id = i*n_comp_y + j
        ax = fig.add_subplot(n_comp_y, n_comp_x, comp_id + 1)
        ax.imshow(comp[:, comp_id].reshape(64, 64), cmap='gist_heat')
        ax.set_title(comp_id)
        ax.axis('off')
plt.tight_layout(pad=0.)

Vemos que los componentes con mayor peso (aquellos con índices pequeños: 0, 1, 2, 3, ...) realmente contienen patrones semejantes a galaxias. A medida que seleccionamos componentes menos significativos, cada ves se asemejan menos a  patrones de galaxias.

Vamos a ver qué valores toman los primeros atributos de los datos reducidos mediante PCA

In [None]:
w4.describe_n(score, 4, 'score', 'score_{0}')

## Grabamos los datos en formato parquet

In [None]:
score.select('dr7objid', 'target', 'score').\
    write.save('pca_features.parquet', format='parquet', mode='overwrite')

Ahora, si abrimos un terminal y hacemos
```
$ hdfs dfs -ls -h
```
Veremos que el fichero parquet ya está en disco
```
-rw-r--r--   1 cloudera cloudera    605.0 M 2018-03-28 04:12 T_F_DR14_ZooSpec_10000.csv
drwxr-xr-x   - cloudera cloudera          0 2018-05-04 02:21 pca_features.parquet
```

De hecho no es un fichero, sino un directorio con varios ficheros en su interior. También podemos ver el contenido de este directorio:
```
$ hdfs dfs -ls -h pca_features.parquet

-rw-r--r--   1 cloudera cloudera          0 2018-05-04 02:21 pca_features.parquet/_SUCCESS
-rw-r--r--   1 cloudera cloudera        982 2018-05-04 02:21 pca_features.parquet/_common_metadata
-rw-r--r--   1 cloudera cloudera      4.6 K 2018-05-04 02:21 pca_features.parquet/_metadata
-rw-r--r--   1 cloudera cloudera    378.5 K 2018-05-04 02:20 pca_features.parquet/part-r-00000-43a02924-fb62-4858-8f29-876abd66795b.gz.parquet
-rw-r--r--   1 cloudera cloudera    374.5 K 2018-05-04 02:21 pca_features.parquet/part-r-00001-43a02924-fb62-4858-8f29-876abd66795b.gz.parquet
-rw-r--r--   1 cloudera cloudera    386.3 K 2018-05-04 02:21 pca_features.parquet/part-r-00002-43a02924-fb62-4858-8f29-876abd66795b.gz.parquet
-rw-r--r--   1 cloudera cloudera    398.5 K 2018-05-04 02:21 pca_features.parquet/part-r-00003-43a02924-fb62-4858-8f29-876abd66795b.gz.parquet
-rw-r--r--   1 cloudera cloudera    274.8 K 2018-05-04 02:21 pca_features.parquet/part-r-00004-43a02924-fb62-4858-8f29-876abd66795b.gz.parquet
```


En la próxima lección veremos como entrenar un algoritmo de clasificación con estos datos.

Gracias