In [1]:
# Install PySpark
!pip install pyspark

!apt-get install openjdk-8-jdk-headless -qq > /dev/null

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"



# Ejercicios Spark DataFrames

Vamos a practicar un poco con tus nuevas habilidades de Spark DataFrame, se te harán algunas preguntas básicas sobre algunos datos del mercado de valores, en este caso Walmart Stock de los años 2012-2017.

Responde a las preguntas y completa las tareas de abajo.

#### ¡Utiliza el archivo walmart_stock.csv para responder y completar las tareas siguientes!

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, format_number, max, min, avg, corr, year, month, when

In [3]:
# Start a Spark session
spark = SparkSession.builder.appName("WalmartStock").getOrCreate()

#### Cargar el archivo CSV de Walmart Stock, hacer que Spark infiera los tipos de datos.

In [4]:
# Load the CSV file into a Spark DataFrame
df = spark.read.csv("walmart_stock.csv", header=True, inferSchema=True)

# Display the first 5 rows of the the DataFrame
df.show(5)

+----------+------------------+---------+---------+------------------+--------+------------------+
|      Date|              Open|     High|      Low|             Close|  Volume|         Adj Close|
+----------+------------------+---------+---------+------------------+--------+------------------+
|2012-01-03|         59.970001|61.060001|59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|60.349998|59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|59.619999|58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|59.450001|58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|59.549999|58.919998|             59.18| 6679300|51.616215000000004|
+----------+------------------+---------+---------+------------------+--------+------------------+
only showing top 5 rows



#### ¿Cuáles son los nombres de las columnas?

In [5]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

#### ¿Qué aspecto tiene el esquema?

In [6]:
# Display the schema of the DataFrame
df.printSchema()

root
 |-- Date: date (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



#### Imprime las 5 primeras columnas.

In [7]:
#MY CODE
# Select first 5 rows
first_five_rows = df.limit(5).collect()

# Print first 5 rows
for row in first_five_rows:
    print(row)

Row(Date=datetime.date(2012, 1, 3), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996)
Row(Date=datetime.date(2012, 1, 4), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475)
Row(Date=datetime.date(2012, 1, 5), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539)
Row(Date=datetime.date(2012, 1, 6), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922)
Row(Date=datetime.date(2012, 1, 9), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)


#### Utiliza describe() para conocer el DataFrame.

In [8]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

#### Hay demasiados decimales para la media y el stddev en describe(). Formatea los números para que sólo se muestren hasta dos decimales. Presta atención a los tipos de datos que devuelve .describe()

In [None]:
#Proposed Atrium solution

+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519781|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



In [9]:
# MY CODE
df_rounded = df.describe().select(
    col("summary"),
    format_number(col("Open").cast("float"), 2).alias("Open"),
    format_number(col("High").cast("float"), 2).alias("High"),
    format_number(col("Low").cast("float"), 2).alias("Low"),
    format_number(col("Close").cast("float"), 2).alias("Close"),
    col("Volume").cast("int"),
)

df_rounded.show()

+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



#### Crea un nuevo dataframe con una columna llamada HV Ratio que es la relación entre el precio máximo y el volumen de las acciones negociadas durante un día.

In [10]:
hv_ratio = df.withColumn("HV Ratio", col("High") / col("Volume")).select("HV Ratio")
hv_ratio.show(20)

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



#### ¿Qué día hubo el pico máximo en el precio?

In [11]:
max_high_price = df.agg(max("High")).collect()[0][0]

df.filter(df["High"] == max_high_price).select("Date").collect()[0][0]

datetime.date(2015, 1, 13)

#### ¿Cuál es la media de la columna Close?

In [12]:
df.agg(avg("Close")).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



#### ¿Cuál es el máximo y el mínimo de la columna Volumen?

In [13]:
df.agg(max("Volume"), min("Volume")).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



#### ¿Cuántos días estuvo el cierre por debajo de los 60 dólares?

In [14]:
df.filter(df["Close"] < 60).count()

81

#### ¿Qué porcentaje de veces el Máximo fue superior a 80 dólares?
#### En otras palabras, (Número de días de máximos>80)/(Días totales en el conjunto de datos)

In [15]:
days_above_80 = df.filter(df["High"] > 80).count()

total_days = df.count()

solution = (days_above_80 / total_days) * 100
solution

9.141494435612083

#### ¿Cuál es la correlación de Pearson entre High y Volume?

In [16]:
df.agg(corr("High", "Volume")).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



#### ¿Cuál es el valor máximo de High por año?

In [17]:
# add year column
df = df.withColumn("Year", year(df["Date"]))

In [18]:
# Calculate max from "High" per year
max_high_by_year = df.groupBy("Year").agg(max("High")).orderBy("Year")
max_high_by_year.show()

+----+---------+
|Year|max(High)|
+----+---------+
|2012|77.599998|
|2013|81.370003|
|2014|88.089996|
|2015|90.970001|
|2016|75.190002|
+----+---------+



#### ¿Cuál es el cierre medio de cada mes del calendario?
#### En otras palabras, a lo largo de todos los años, ¿cuál es el precio medio de cierre para enero, febrero, marzo, etc.? Su resultado tendrá un valor para cada uno de estos meses.

In [19]:
# add month column
df = df.withColumn("Month", month(df["Date"]))

In [20]:
# Calculate average price from "Close" per month
avg_closeprice_by_month = df.groupBy("Month").agg(avg("Close")).orderBy("Month")
avg_closeprice_by_month.show()

+-----+-----------------+
|Month|       avg(Close)|
+-----+-----------------+
|    1|71.44801958415842|
|    2|  71.306804443299|
|    3|71.77794377570092|
|    4|72.97361900952382|
|    5|72.30971688679247|
|    6| 72.4953774245283|
|    7|74.43971943925233|
|    8|73.02981855454546|
|    9|72.18411785294116|
|   10|71.57854545454543|
|   11| 72.1110893069307|
|   12|72.84792478301885|
+-----+-----------------+



In [21]:
# Map month numbers to month names
avg_closeprice_by_month = avg_closeprice_by_month.select(
    when(col("Month") == 1, "Jan")
    .when(col("Month") == 2, "Feb")
    .when(col("Month") == 3, "Mar")
    .when(col("Month") == 4, "Apr")
    .when(col("Month") == 5, "May")
    .when(col("Month") == 6, "Jun")
    .when(col("Month") == 7, "Jul")
    .when(col("Month") == 8, "Aug")
    .when(col("Month") == 9, "Sep")
    .when(col("Month") == 10, "Oct")
    .when(col("Month") == 11, "Nov")
    .when(col("Month") == 12, "Dec")
    .alias("Month"),
    col("avg(Close)").alias("Average Close Price")
)

avg_closeprice_by_month.show()

+-----+-------------------+
|Month|Average Close Price|
+-----+-------------------+
|  Jan|  71.44801958415842|
|  Feb|    71.306804443299|
|  Mar|  71.77794377570092|
|  Apr|  72.97361900952382|
|  May|  72.30971688679247|
|  Jun|   72.4953774245283|
|  Jul|  74.43971943925233|
|  Aug|  73.02981855454546|
|  Sep|  72.18411785294116|
|  Oct|  71.57854545454543|
|  Nov|   72.1110893069307|
|  Dec|  72.84792478301885|
+-----+-------------------+

