# Ejercicios Spark DataFrames 

Vamos a practicar un poco con tus nuevas habilidades de Spark DataFrame, se te harán algunas preguntas básicas sobre algunos datos del mercado de valores, en este caso Walmart Stock de los años 2012-2017. 

Responde a las preguntas y completa las tareas de abajo.

#### ¡Utiliza el archivo walmart_stock.csv para responder y completar las tareas siguientes!

#### Iniciar una sesión de Spark

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
import datetime

spark = SparkSession.builder.appName("Basic spark df exercise").getOrCreate()       # Creamos una sesion de spark

#### Cargar el archivo CSV de Walmart Stock, hacer que Spark infiera los tipos de datos.

In [2]:
df = spark.read.csv("data/walmart_stock.csv", header=True, inferSchema=True)
df.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             5

#### ¿Cuáles son los nombres de las columnas?

In [3]:
print(df.columns)

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']


#### ¿Qué aspecto tiene el esquema?

In [4]:
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



#### Imprime las 5 primeras columnas.

In [5]:
df.take(5)

[Row(Date=datetime.date(2012, 1, 3), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date=datetime.date(2012, 1, 4), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date=datetime.date(2012, 1, 5), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date=datetime.date(2012, 1, 6), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
 Row(Date=datetime.date(2012, 1, 9), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)]

#### Utiliza describe() para conocer el DataFrame.

In [6]:
df.describe().show()    

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

#### Hay demasiados decimales para la media y el stddev en describe(). Formatea los números para que sólo se muestren hasta dos decimales. Presta atención a los tipos de datos que devuelve .describe()

In [7]:
summary = df.describe()

summary = summary.withColumn("Open", format_number(col("Open").cast("float"), 2))       # Damos formato a los numeros casteandolos con 2 decimales
summary = summary.withColumn("High", format_number(col("High").cast("float"), 2))
summary = summary.withColumn("Low", format_number(col("Low").cast("float"), 2))
summary = summary.withColumn("Close", format_number(col("Close").cast("float"), 2))
summary = summary.withColumn("Volume", format_number(col("Volume").cast("float"), 2))

summary.select("summary", "Open", "High", "Low", "Close", "Volume").show()              # adecuamos el formato de salida 

+-------+--------+--------+--------+--------+-------------+
|summary|    Open|    High|     Low|   Close|       Volume|
+-------+--------+--------+--------+--------+-------------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|     1,258.00|
|   mean|   72.36|   72.84|   71.92|   72.39| 8,222,093.50|
| stddev|    6.77|    6.77|    6.74|    6.76| 4,519,781.00|
|    min|   56.39|   57.06|   56.30|   56.42| 2,094,900.00|
|    max|   90.80|   90.97|   89.25|   90.47|80,898,096.00|
+-------+--------+--------+--------+--------+-------------+



#### Crea un nuevo dataframe con una columna llamada HV Ratio que es la relación entre el precio máximo y el volumen de las acciones negociadas durante un día.

In [8]:
df = df.withColumn("HV Ratio", df["High"]/df["Volume"])
df.select("HV Ratio").show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



#### ¿Qué día hubo el pico máximo en el precio?

In [9]:
df_cierre = df.orderBy(df['High'].desc()).select('Date').first()                                # Seleccionamos la fecha del dia con el cierre mas alto

dia_cierre_max = datetime.datetime.combine(df_cierre['Date'], datetime.datetime.min.time())     # convertimos la fecha a datetime

dia_cierre_max                                                                                  # me ha costado dar con el formato correcto para mostrar la fecha :D                           

datetime.datetime(2015, 1, 13, 0, 0)

#### ¿Cuál es la media de la columna Close?

In [10]:
df.select(avg("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



#### ¿Cuál es el máximo y el mínimo de la columna Volumen?

In [11]:
df.select(max("Volume"), min("Volume")).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



#### ¿Cuántos días estuvo el cierre por debajo de los 60 dólares?

In [12]:
menos_60 = df.filter(df["Close"] < 60).count()
print(menos_60)

81


#### ¿Qué porcentaje de veces el Máximo fue superior a 80 dólares?
#### En otras palabras, (Número de días de máximos>80)/(Días totales en el conjunto de datos)

In [13]:
mas_80 = df.filter(df["High"] > 80).count()
dias_totales = df.count()

perc = (mas_80/dias_totales)*100
print(perc)

9.141494435612083


#### ¿Cuál es la correlación de Pearson entre High y Volume?

In [28]:
corr = df.stat.corr('High', 'Volume')
print(corr)

-0.3384326061737161


#### ¿Cuál es el valor máximo de High por año?

In [20]:
df = df.withColumn('Year', year(df['Date']))                # Añadimos una nueva columna con el año

df_max_high_por_año = df.groupBy('Year').max('High')        # Agrupamos por año y calculamos el máximo

df_max_high_por_año.show()

+----+---------+
|Year|max(High)|
+----+---------+
|2015|90.970001|
|2013|81.370003|
|2014|88.089996|
|2012|77.599998|
|2016|75.190002|
+----+---------+



#### ¿Cuál es el cierre medio de cada mes del calendario?
#### En otras palabras, a lo largo de todos los años, ¿cuál es el precio medio de cierre para enero, febrero, marzo, etc.? Su resultado tendrá un valor para cada uno de estos meses. 

In [21]:
df = df.withColumn('Month', month(df['Date']))                                  # Selecionamos el mes de la columna fecha

avg_close_por_mes = df.groupBy('Month').avg('Close')                            # agrupamos por mes y calculamos la media del cierre
avg_close_por_mes = avg_close_por_mes.orderBy('Month', ascending=True)          # ordenamos por mes

avg_close_por_mes.show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+

