# Examen PySpark

Instrucciones: Lea cuidadosamente las preguntas, escriba el código correspondiente y ejecútelo para mostrar sus resultados.

#### Importante: Todos los ejercicios deberán realizarse con funciones de NumPy, Pandas o PySpark (no podrán crearse vistas temporales para realizarse en SQL, salvo que se indique lo contrario).

## Bloque 1: Spark Core

1.1 Utilizando NumPy, construya un arreglo con 50 elementos aleatorios distribuidos de forma normal con media 50 y desviación estándar 10. Imprima el arreglo.

In [3]:
import numpy as np

mean = 50
std = 10

dist = np.random.normal(mean, std, 50)


array([44.0251056 , 41.80131532, 65.59643072, 54.46644056, 59.07774891,
       62.16234455, 58.87508386, 57.60313253, 51.20836849, 58.58433125,
       38.33169469, 43.7330064 , 37.91027826, 46.47981224, 60.59702034,
       42.32264255, 45.10334199, 29.74525001, 60.57731582, 47.71479966,
       41.94094009, 27.85930919, 27.66496622, 60.57031692, 60.7169846 ,
       57.12572408, 30.3324811 , 52.96444014, 46.29664653, 42.6286651 ,
       62.26153014, 61.16674644, 38.23954255, 45.17568383, 44.6544177 ,
       39.8588725 , 47.7528372 , 57.88631696, 59.08660794, 46.25934663,
       53.02125492, 47.52741738, 41.41978578, 45.17683168, 37.75888656,
       57.59616115, 19.98408932, 24.34224681, 43.61710942, 57.3577041 ])

[link text](https://)1.2. Construya el objeto de Spark (Core) que le permita trabajar con objetos RDD.

In [6]:
from pyspark import SparkContext
sc = SparkContext(appName="test")

1.3. Convierta el arreglo de NumPy a un RDD con 2 particiones. Muestre los primeros 5 elementos.

In [8]:
rdd = sc.parallelize(dist, numSlices=2)
print(rdd.take(5))

[44.02510560246526, 41.80131532451206, 65.5964307230606, 54.466440563568625, 59.07774890739168]


1.4. Suponiendo que los datos de la lista miden grados Fahrenheit, aplique una función lambda al RDD que convierta las mediciones a grados Centígrados. Muestre los primeros 5 elementos.

In [10]:
# cels = (farenheit - 32) * 5/9

rdd_cels = rdd.map(lambda f: (f - 32) * 5/9)

rdd_cels.take(5)

[6.680614223591811,
 5.445175180284478,
 18.664683735033673,
 12.481355868649237,
 15.043193837439823]

C = (F - 32) * 5 / 9

1.5. Utilice una función Lambda para mostrar únicamente las temperaturas mayores a 15 grados Centigrados.

In [14]:
rdd_filt = rdd_cels.filter(lambda t: t> 15)
rdd_filt.collect()

[18.664683735033673,
 15.043193837439823,
 16.75685808223599,
 15.887233521055773,
 15.876286567288266,
 15.872398290455514,
 15.953880331280383,
 16.811961188562798,
 16.20374802046699,
 15.048115521196015]

1.6. Calcule la temperatura media en grados Centígrados.




In [15]:
rdd_cels.mean()

8.7128814081594

8.712881408159403

1.7. Obtenga las 3 temperaturas más altas en grados Centígrados.

In [20]:
rdd_cels.top(3)

[18.664683735033673, 16.811961188562798, 16.75685808223599]

## Bloque 2: Spark SQL

2.1. Utilizando Numpy, construya un arreglo con 50 números enteros entre 1 y 3 (1 y 3 incluidos).

In [24]:
dist1 =  np.random.randint(1, 4, size=50)
dist1

array([3, 3, 2, 1, 1, 2, 2, 2, 3, 2, 2, 1, 3, 2, 2, 1, 1, 1, 2, 1, 1, 2,
       2, 3, 2, 1, 2, 3, 3, 3, 3, 2, 1, 2, 1, 3, 3, 2, 3, 2, 3, 2, 2, 2,
       1, 3, 2, 3, 1, 1])

2.2. Construya un dataframe en Pandas utilizando los arreglos de 2.1 y 1.1. Asigne los nombres "dia" y "temp". Muestre los primeros 5 elementos.

In [27]:
import pandas as pd

df = pd.DataFrame({'dia': dist1, 'temp': dist})

2.3. Construya el objeto de Spark (SQL) que le permita trabajar con los dataframes de Spark.

In [28]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("test2").getOrCreate()

2.4. Convierta el dataframe de Pandas a un dataframe de Spark, definiendo explícitamente el esquema/estructura (utilice el tipo entero para el día y el tipo doble para la temperatura). Muestre los primeros 5 registros.

In [30]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType

schema = StructType([
    StructField("dia", IntegerType(), True),
    StructField("temp", DoubleType(), True)
])

df_spark = spark.createDataFrame(df, schema=schema)

2.5. Partiendo del dataframe en Spark, construya un dataframe con el promedio de temperatura agrupado por día. El dataframe deberá contener únicamente las columnas "dia" y "temp_prom" (con esos nombres). Muestre la tabla resultante.

In [34]:
import pyspark.sql.functions as F

df_prom = df_spark.groupBy('dia').agg(F.avg('temp').alias('temp_prom'))

df_prom.show()

+---+------------------+
|dia|         temp_prom|
+---+------------------+
|  1|45.918396756442476|
|  3| 48.08830980166919|
|  2| 48.57033929138635|
+---+------------------+



2.6. Repita el ejercicio anterior registrando una vista temporal y ejecutando el código SQL correspondiente. Muestre la tabla resultante.

In [38]:
df_spark.createOrReplaceTempView("vw_df")
view = spark.sql("select dia,mean(temp) as temp_prom from vw_df group by dia")
view.show()

+---+------------------+
|dia|         temp_prom|
+---+------------------+
|  1|45.918396756442476|
|  3| 48.08830980166919|
|  2| 48.57033929138635|
+---+------------------+



2.7. Combine los valores del dataframe anterior con el original. El dataframe resultante no deberá contener columnas repetidas y tendrá que estar ordenado de forma ascendente por día y temperatura. Muestre los primeros 5 elementos.

In [52]:
df_join = df_spark.join(view, 'dia','inner')
df_join = df_join.orderBy(F.asc("dia"), F.asc("temp"))
df_join.show(5)

+---+------------------+------------------+
|dia|              temp|         temp_prom|
+---+------------------+------------------+
|  1|29.745250013573642|45.918396756442476|
|  1| 37.75888656179124|45.918396756442476|
|  1| 38.23954255079114|45.918396756442476|
|  1| 41.94094008650316|45.918396756442476|
|  1| 42.32264254973991|45.918396756442476|
+---+------------------+------------------+
only showing top 5 rows



2.8. Añada una columna adicicional con la diferencia entre la temperatura y su media. Asigne el nombre "resid". Muestre los primeros 5 elementos.

In [55]:
df_join2 = df_join.withColumn('resid', (F.col('temp') - F.col('temp_prom')))
df_join2.show(5)

+---+------------------+------------------+-------------------+
|dia|              temp|         temp_prom|              resid|
+---+------------------+------------------+-------------------+
|  1|29.745250013573642|45.918396756442476|-16.173146742868834|
|  1| 37.75888656179124|45.918396756442476| -8.159510194651233|
|  1| 38.23954255079114|45.918396756442476| -7.678854205651334|
|  1| 41.94094008650316|45.918396756442476| -3.977456669939315|
|  1| 42.32264254973991|45.918396756442476|-3.5957542067025656|
+---+------------------+------------------+-------------------+
only showing top 5 rows



2.9. Construya un dataframe con todos los registros que posean residuales negativos. Muestre los primeros 5 elementos.

In [63]:
df_join3 = df_join2.filter(F.col('resid') < 0)
df_join3.show(5)

+---+------------------+------------------+-------------------+
|dia|              temp|         temp_prom|              resid|
+---+------------------+------------------+-------------------+
|  1|29.745250013573642|45.918396756442476|-16.173146742868834|
|  1| 37.75888656179124|45.918396756442476| -8.159510194651233|
|  1| 38.23954255079114|45.918396756442476| -7.678854205651334|
|  1| 41.94094008650316|45.918396756442476| -3.977456669939315|
|  1| 42.32264254973991|45.918396756442476|-3.5957542067025656|
+---+------------------+------------------+-------------------+
only showing top 5 rows



2.10. Guarde el dataframe resultante en formato JSON. En caso de que el archivo ya exista, deberá sobreescribirse.

In [64]:
df_join3.write.mode("overwrite").json("df_resultante.json")

## Bloque 3: Spark MLlib

#### En esta sección se evalúan los conocimientos de Spark MLlib. Si bien son necesarios los conociemientos en Machine Learning, el candidato no será evaluado por la calidad del modelo producido.

3.1. Cargue los datos del archivo 'data.csv'.

In [65]:
df = spark.read.csv("data.csv", header=True, inferSchema=True)

3.2. Realicé un análisis exploratorio preliminar de los datos (estadísticos básicos de las columnas).

In [74]:
df.printSchema()

root
 |-- Names: string (nullable = true)
 |-- Age: double (nullable = true)
 |-- Total_Purchase: double (nullable = true)
 |-- Account_Manager: integer (nullable = true)
 |-- Years: double (nullable = true)
 |-- Num_Sites: double (nullable = true)
 |-- Onboard_date: timestamp (nullable = true)
 |-- Location: string (nullable = true)
 |-- Company: string (nullable = true)
 |-- Churn: integer (nullable = true)



In [67]:
df.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                 900|                 900|                900|
|   mean|         NULL|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|                NULL|                NULL|0.16666666666666666|
| stddev|         NULL|6.127560416916251|2408.644531858096|0.4999208935073339|1.274449013194616|1.764835592035

In [120]:
df.groupBy('Churn').count().show() # Un problema imbalanceado

+-----+-----+
|Churn|count|
+-----+-----+
|    1|  150|
|    0|  750|
+-----+-----+



3.3. Obtenga el conjunto de datos con el vector de variables independientes y la variable dependiente (churn). Por simplicidad, es suficiente que seleccione únicamente las variables numéricas. Muestre los primeros 5 elementos.

In [84]:
x = df.select([c for c in df.columns if c != 'Churn']) # Para coger todas las columnas
y = df.select([c for c in df.columns if c == 'Churn'])
x.show(5)
y.show(5)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|
|     Eric Lozano|38.0|      12884.75|              0| 6.67|     12.0|2016-06-29 06:20:07|1331 Keith Court ...|Miller, Johnson a...|
|   Phillip White|42.0|       8010.76|              0| 6.71|     10.0|2014-04-22 12:43:12|13120 Daniel Moun...|           Smith Inc|
|  Cynthia Norton|37.0|       9191.58|              0| 5.56|      9.0

In [82]:
col_numericas = [c for c in x.columns if isinstance(x.schema[c].dataType, (IntegerType, DoubleType))] # Para seleccionar solo las numéricas

df_dep_num = x.select([F.col(col_name) for col_name in col_numericas])
df_dep_num.show(5)

+----+--------------+---------------+-----+---------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|
+----+--------------+---------------+-----+---------+
|42.0|       11066.8|              0| 7.22|      8.0|
|41.0|      11916.22|              0|  6.5|     11.0|
|38.0|      12884.75|              0| 6.67|     12.0|
|42.0|       8010.76|              0| 6.71|     10.0|
|37.0|       9191.58|              0| 5.56|      9.0|
+----+--------------+---------------+-----+---------+
only showing top 5 rows



In [99]:
from pyspark.ml.feature import *
vectorAssembler = VectorAssembler(inputCols = df_dep_num.columns, outputCol = "indep_vector") # Vector de pyspark ML con todas las features independientes numéricas

3.4. Realicé la separación en los conjuntos de entrenamiento y prueba con una proporción 70-30. Muestre los primeros 5 elementos de cada conjunto de datos.

In [91]:
df_train, df_test = df.randomSplit([0.7, 0.3], seed=123)

df_train = df_train.cache() # Las dejo en caché para mejorar la velocidad
df_test = df_test.cache()

3.5. Ajuste un modelo de regresión logística con los hiperparámetros por defecto. Muestre los estadísticos descriptivos de las predicciones contenidas en el resumen del modelo.

In [129]:
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression

LogisticRegression = LogisticRegression(labelCol="Churn", featuresCol="indep_vector") # Utilizando Pyspark ML, que está más enfocado en el uso de dataframes que MLlib

pipeline = Pipeline(stages=[vectorAssembler, LogisticRegression])

model = pipeline.fit(df_train)
preds = model.transform(df_test)

In [113]:
preds.describe(["prediction", "Churn", "probability"]).show()

+-------+-------------------+-------------------+
|summary|         prediction|              Churn|
+-------+-------------------+-------------------+
|  count|                290|                290|
|   mean|0.12758620689655173|0.15517241379310345|
| stddev|0.33420519951075905| 0.3626948414649638|
|    min|                0.0|                  0|
|    max|                1.0|                  1|
+-------+-------------------+-------------------+



3.6. Evalúe los resultados en el conjunto de prueba. Muestre las primeras 5 predicciones.

In [133]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

eval = MulticlassClassificationEvaluator(labelCol="Churn")
acc = eval.evaluate(preds, {eval.metricName: "accuracy"})
acc

0.9172413793103448

3.7. Para evaluar el desempeño del modelo, obtenga el valor del indicador auROC (área debajo de la curva ROC).

In [112]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

eval = BinaryClassificationEvaluator(labelCol="Churn")
auROC = eval.evaluate(preds, {eval.metricName: "areaUnderROC"})
auROC

0.9248979591836719

3.8. Cargue los datos del archivo 'data_new.csv' y obtenga las predicciones sobre ese conjunto de datos utilizando los objetos construidos previamente. Muestre los primeros 5 elementos.

In [128]:
df_new = spark.read.csv("data.csv", header=True, inferSchema=True)


preds_new = model.transform(df_new)
preds_new.show(5)

+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|           Names| Age|Total_Purchase|Account_Manager|Years|Num_Sites|       Onboard_date|            Location|             Company|Churn|        indep_vector|       rawPrediction|         probability|prediction|
+----------------+----+--------------+---------------+-----+---------+-------------------+--------------------+--------------------+-----+--------------------+--------------------+--------------------+----------+
|Cameron Williams|42.0|       11066.8|              0| 7.22|      8.0|2013-08-30 07:00:40|10265 Elizabeth M...|          Harvey LLC|    1|[42.0,11066.8,0.0...|[2.56889492156846...|[0.92883268204289...|       0.0|
|   Kevin Mueller|41.0|      11916.22|              0|  6.5|     11.0|2013-08-13 00:38:46|6157 Frank Garden...|          Wilson PLC|    1|[41.0,1191

# Fin de la evaluación

In [None]:
sc.stop()