# Examen PySpark

Instrucciones: Lea cuidadosamente las preguntas, escriba el código correspondiente y ejecútelo para mostrar sus resultados.

#### Importante: Todos los ejercicios deberán realizarse con funciones de NumPy, Pandas o PySpark (no podrán crearse vistas temporales para realizarse en SQL, salvo que se indique lo contrario).

## Bloque 1: Spark Core

1.1 Utilizando NumPy, construya un arreglo con 50 elementos aleatorios distribuidos de forma normal con media 50 y desviación estándar 10. Imprima el arreglo.

In [20]:
import numpy as np
a = np.random.normal(50,10,50)
print(a)

[46.68597356 60.27889096 51.4991626  53.85460247 51.42483403 41.32348817
 52.33762403 40.76814762 51.20321107 47.55103376 67.21286043 48.91218279
 60.72341828 44.45399277 48.81834741 51.41674533 42.43241915 43.86230974
 50.49106629 53.11868731 42.68446032 48.3285161  53.77199998 67.65491547
 70.69740588 28.77237048 69.55877671 46.87041925 47.8080914  62.89884659
 41.41749378 50.68619923 59.92524241 45.75604801 44.88180883 53.99683791
 51.21355136 48.03194075 63.88806166 47.30527233 62.45584082 69.22371252
 50.7975978  51.35721223 42.24371811 36.71406685 62.64319399 52.28815142
 62.93426919 39.33521336]


1.2. Construya el objeto de Spark (Core) que le permita trabajar con objetos RDD.

In [3]:
!pip install pyspark

Collecting pyspark
[?25l  Downloading https://files.pythonhosted.org/packages/f0/26/198fc8c0b98580f617cb03cb298c6056587b8f0447e20fa40c5b634ced77/pyspark-3.0.1.tar.gz (204.2MB)
[K     |████████████████████████████████| 204.2MB 64kB/s 
[?25hCollecting py4j==0.10.9
[?25l  Downloading https://files.pythonhosted.org/packages/9e/b6/6a4fb90cd235dc8e265a6a2067f2a2c99f0d91787f06aca4bcf7c23f3f80/py4j-0.10.9-py2.py3-none-any.whl (198kB)
[K     |████████████████████████████████| 204kB 41.4MB/s 
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.0.1-py2.py3-none-any.whl size=204612242 sha256=d1a1e98f51c0ac69e02c0c79c821ef6efd22ac7b276be2826379f298a9bda936
  Stored in directory: /root/.cache/pip/wheels/5e/bd/07/031766ca628adec8435bb40f0bd83bb676ce65ff4007f8e73f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9 pyspark-3.0.1


In [19]:
from pyspark.sql import SQLContext
from pyspark import SparkContext
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("Python Spark SQL") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
#print(spark.version)
sqlContext = SQLContext(spark)
#print(sqlContext)
sc = spark.sparkContext
print(sc)

<SparkContext master=local[*] appName=Python Spark SQL basic example>


1.3. Convierta el arreglo de NumPy a un RDD con 2 particiones. Muestre los primeros 5 elementos.

In [27]:
# Convert list to RDD
rdd = sc.parallelize(a)

rdd.top(5)

[70.69740588157727,
 69.55877670948394,
 69.22371251958063,
 67.65491546838084,
 67.21286043416487]

1.4. Suponiendo que los datos de la lista miden grados Fahrenheit, aplique una función lambda al RDD que convierta las mediciones a grados Centígrados. Muestre los primeros 5 elementos. 
$$
C = (F - 32) * 5 / 9
$$

In [30]:
rdd = rdd.map(lambda degree: (degree-32)*5/9)

rdd.top(5)

[21.49855882309848,
 20.86598706082441,
 20.679840288655907,
 19.808286371322687,
 19.562700241202705]

1.5. Utilice una función Lambda para mostrar únicamente las temperaturas mayores a 15 grados Centigrados.

In [40]:
rdd.filter(lambda degree: degree>15).collect()

[15.710494976670573,
 19.562700241202705,
 15.957454600267674,
 19.808286371322687,
 21.49855882309848,
 20.86598706082441,
 17.16602588066187,
 15.514023561704285,
 17.71558981312119,
 16.919911567083236,
 20.679840288655907,
 17.023996661949997,
 17.185705103208576]

1.6. Calcule la temperatura media en grados Centígrados.

In [43]:
rdd.mean()

10.939002583177448

1.7. Obtenga las 3 temperaturas más altas en grados Centígrados.

In [50]:
rdd.sortBy(lambda x: -x).top(3)

[21.49855882309848, 20.86598706082441, 20.679840288655907]

## Bloque 2: Spark SQL

2.1. Utilizando Numpy, construya un arreglo con 50 números enteros entre 1 y 3 (1 y 3 incluidos).

In [53]:
b = np.random.uniform(1,3+1,50).astype(int)
print(b)

[1 1 1 3 2 1 1 1 3 1 1 1 3 1 1 1 2 3 1 1 2 1 1 2 2 1 2 3 2 3 1 1 1 2 3 3 2
 3 2 3 2 3 2 1 3 3 3 2 1 2]


2.2. Construya un dataframe en Pandas utilizando los arreglos de 2.1 y 1.1. Asigne los nombres "dia" y "temp". Muestre los primeros 5 elementos.

In [59]:
import pandas as pd
df = pd.DataFrame({'dia':b, 'temp':a})
df.head(5)

Unnamed: 0,dia,temp
0,1,46.685974
1,1,60.278891
2,1,51.499163
3,3,53.854602
4,2,51.424834


2.3. Construya el objeto de Spark (SQL) que le permita trabajar con los dataframes de Spark.

In [74]:

#from pyspark import SparkContext
from pyspark.sql.types import *
from pyspark.sql import SparkSession
spark = SparkSession \
        .builder \
        .appName("Python Spark SQL") \
        .config("spark.some.config.option", "some-value") \
        .getOrCreate()
#print(spark.version)
#sqlContext = SQLContext(spark)

2.4. Convierta el dataframe de Pandas a un dataframe de Spark, definiendo explícitamente el esquema/estructura (utilice el tipo entero para el día y el tipo doble para la temperatura). Muestre los primeros 5 registros.

In [78]:
schemaString = "dia temp"
fields = [StructField(field_name, StringType(), True) for field_name in schemaString.split()]
schema = StructType(fields)
spark.createDataFrame(df, schema).show()

+---+------------------+
|dia|              temp|
+---+------------------+
|  1| 46.68597355698461|
|  1| 60.27889095800703|
|  1| 51.49916259850273|
|  3| 53.85460246606196|
|  2| 51.42483403115909|
|  1| 41.32348816662592|
|  1|52.337624032830476|
|  1| 40.76814762025096|
|  3| 51.20321107476273|
|  1| 47.55103375589605|
|  1| 67.21286043416487|
|  1| 48.91218278568232|
|  3|60.723418280481816|
|  1|44.453992773368725|
|  1|48.818347406069904|
|  1| 51.41674533222415|
|  2| 42.43241915019395|
|  3| 43.86230974086338|
|  1| 50.49106629058457|
|  1|  53.1186873065882|
+---+------------------+
only showing top 20 rows



In [82]:
from pyspark import SparkContext
#sc = SparkContext()
from pyspark.sql import SQLContext
#from pyspark.sql import SQLContext
sqlCtx = SQLContext(sc)
sdf = sqlCtx.createDataFrame(df)

2.5. Partiendo del dataframe en Spark, construya un dataframe con el promedio de temperatura agrupado por día. El dataframe deberá contener únicamente las columnas "dia" y "temp_prom" (con esos nombres). Muestre la tabla resultante.

In [86]:
sdf.groupBy('dia').mean().select('dia','avg(temp)').withColumnRenamed('avg(temp)', 'temp_prom').show()

+---+------------------+
|dia|         temp_prom|
+---+------------------+
|  1|50.093704836872284|
|  3|51.746668477497714|
|  2| 54.14252624212942|
+---+------------------+



2.6. Repita el ejercicio anterior registrando una vista temporal y ejecutando el código SQL correspondiente. Muestre la tabla resultante.

In [90]:
sdf.createGlobalTempView("temp")

In [95]:
spark.sql("SELECT dia, AVG(temp) as temp_prom FROM global_temp.temp GROUP BY dia").show()

+---+------------------+
|dia|         temp_prom|
+---+------------------+
|  1|50.093704836872284|
|  3|51.746668477497714|
|  2| 54.14252624212942|
+---+------------------+



2.7. Combine los valores del dataframe anterior con el original. El dataframe resultante no deberá contener columnas repetidas y tendrá que estar ordenado de forma ascendente por día y temperatura. Muestre los primeros 5 elementos.

In [106]:
spark.sql("SELECT T.dia, T.temp FROM global_temp.temp as T LEFT JOIN (SELECT dia, AVG(temp) FROM global_temp.temp GROUP BY dia) A ORDER BY dia, temp ASC").show(5)

+---+-----------------+
|dia|             temp|
+---+-----------------+
|  1|28.77237047932717|
|  1|28.77237047932717|
|  1|28.77237047932717|
|  1|40.76814762025096|
|  1|40.76814762025096|
+---+-----------------+
only showing top 5 rows



2.8. Añada una columna adicicional con la diferencia entre la temperatura y su media. Asigne el nombre "resid". Muestre los primeros 5 elementos.

2.9. Construya un dataframe con todos los registros que posean residuales negativos. Muestre los primeros 5 elementos.

In [108]:
df_temp = sqlContext.sql("SELECT * FROM global_temp.temp")

2.10. Guarde el dataframe resultante en formato JSON. En caso de que el archivo ya exista, deberá sobreescribirse.

In [109]:
df_temp.select("temp", "dia").write.save("temp.json",format="json")

## Bloque 3: Spark MLlib

#### En esta sección se evalúan los conocimientos de Spark MLlib. Si bien son necesarios los conociemientos en Machine Learning, el candidato no será evaluado por la calidad del modelo producido.

3.1. Cargue los datos del archivo 'data.csv'.

In [113]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [157]:
df_data = spark.read.format("csv").option("header", "true").load("data.csv")

3.2. Realicé un análisis exploratorio preliminar de los datos (estadísticos básicos de las columnas).

In [116]:
df_data.describe().show()

+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+--------------------+--------------------+-------------------+
|summary|        Names|              Age|   Total_Purchase|   Account_Manager|            Years|         Num_Sites|       Onboard_date|            Location|             Company|              Churn|
+-------+-------------+-----------------+-----------------+------------------+-----------------+------------------+-------------------+--------------------+--------------------+-------------------+
|  count|          900|              900|              900|               900|              900|               900|                900|                 900|                 900|                900|
|   mean|         null|41.81666666666667|10062.82403333334|0.4811111111111111| 5.27315555555555| 8.587777777777777|               null|                null|                null|0.16666666666666666|
| stddev| 

3.3. Obtenga el conjunto de datos con el vector de variables independientes y la variable dependiente (churn). Por simplicidad, es suficiente que seleccione únicamente las variables numéricas. Muestre los primeros 5 elementos.

In [138]:
X = df_data.select('Age','Total_Purchase','Account_Manager','Years', 'Num_Sites')
X.show(5)

+----+--------------+---------------+-----+---------+
| Age|Total_Purchase|Account_Manager|Years|Num_Sites|
+----+--------------+---------------+-----+---------+
|42.0|       11066.8|              0| 7.22|      8.0|
|41.0|      11916.22|              0|  6.5|     11.0|
|38.0|      12884.75|              0| 6.67|     12.0|
|42.0|       8010.76|              0| 6.71|     10.0|
|37.0|       9191.58|              0| 5.56|      9.0|
+----+--------------+---------------+-----+---------+
only showing top 5 rows



In [139]:
df_data.select('Churn')
y.show(5)

+-----+
|Churn|
+-----+
|    1|
|    1|
|    1|
|    1|
|    1|
+-----+
only showing top 5 rows



3.4. Realicé la separación en los conjuntos de entrenamiento y prueba con una proporción 70-30. Muestre los primeros 5 elementos de cada conjunto de datos.

In [148]:
train, test = df_data.randomSplit([0.7, 0.3], seed=12345)
X_train = train.select('Names','Onboard_date','Company','Location','Age','Total_Purchase','Account_Manager','Years', 'Num_Sites')
X_test = train.select('Names','Onboard_date','Company','Location','Age','Total_Purchase','Account_Manager','Years', 'Num_Sites')
y_train = train.select('Churn')
y_test= train.select('Churn')

3.5. Ajuste un modelo de regresión logística con los hiperparámetros por defecto. Muestre los estadísticos descriptivos de las predicciones contenidas en el resumen del modelo.

In [165]:
from pyspark.ml.classification import LogisticRegression

In [172]:
X_train.toPandas().values.tolist()

[['Aaron King',
  '2007-05-13 20:42:11',
  'Hernandez PLC',
  '38346 Smith Prairie Arnoldside, MD 33861-4885',
  '46.0',
  '7504.79',
  '0',
  '5.98',
  '8.0'],
 ['Aaron Meyer',
  '2010-07-17 03:30:38',
  'Steele, Bates and Lane',
  '35821 Bailey Skyway Alexisstad, NH 81472',
  '45.0',
  '9598.03',
  '0',
  '5.0',
  '7.0'],
 ['Aaron West',
  '2006-09-01 06:11:47',
  'Cruz, Russell and Boyd',
  '071 Schmidt Locks Apt. 070 West Jessica, OK 60879-6593',
  '55.0',
  '10056.55',
  '0',
  '4.98',
  '8.0'],
 ['Abigail Gonzalez',
  '2014-01-09 19:52:37',
  'Nelson PLC',
  '325 Lawrence Crossing Suite 269 South Josephhaven, OK 53193-5178',
  '55.0',
  '8243.28',
  '0',
  '3.54',
  '6.0'],
 ['Abigail Jennings',
  '2012-03-11 17:10:26',
  'Simpson, Byrd and Miller',
  'USS Mcgrath FPO AP 33786-7308',
  '36.0',
  '12309.23',
  '1',
  '4.76',
  '9.0'],
 ['Adam Gomez',
  '2013-02-06 05:15:06',
  'Thompson Group',
  '2889 Coffey Parks Browningmouth, VT 14924-9744',
  '48.0',
  '6495.01',
  '1',
  '5.

In [174]:
lr = LogisticRegression()
lrModel = lr.fit(X_train.toPandas().values.tolist(), y_train.toPandas().values.tolist())

ValueError: ignored

3.6. Evalúe los resultados en el conjunto de prueba. Muestre las primeras 5 predicciones.

3.7. Para evaluar el desempeño del modelo, obtenga el valor del indicador auROC (área debajo de la curva ROC).

3.8. Cargue los datos del archivo 'data_new.csv' y obtenga las predicciones sobre ese conjunto de datos utilizando los objetos construidos previamente. Muestre los primeros 5 elementos.

# Fin de la evaluación