# Spark DataFrames - Ejemplos

Vamos a usar el archivo de Walmart Stock (2012-2017) para practicar DataFrames.


#### 1. Iniciar una Spark Session y cargar el fichero CSV.

In [2]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName("walmart").getOrCreate()
df = spark.read.csv('walmart_stock.csv',header=True,inferSchema=True)

#### 2. Mostrar las columnas

In [3]:
df.columns

['Date', 'Open', 'High', 'Low', 'Close', 'Volume', 'Adj Close']

#### 3. Mostrar el esquema inferido

In [4]:
df.printSchema()

root
 |-- Date: timestamp (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



#### 4. Mostrar los primeros 5 registros

In [5]:
df.head(5)

[Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996),
 Row(Date=datetime.datetime(2012, 1, 4, 0, 0), Open=60.209998999999996, High=60.349998, Low=59.470001, Close=59.709998999999996, Volume=9593300, Adj Close=52.078475),
 Row(Date=datetime.datetime(2012, 1, 5, 0, 0), Open=59.349998, High=59.619999, Low=58.369999, Close=59.419998, Volume=12768200, Adj Close=51.825539),
 Row(Date=datetime.datetime(2012, 1, 6, 0, 0), Open=59.419998, High=59.450001, Low=58.869999, Close=59.0, Volume=8069400, Adj Close=51.45922),
 Row(Date=datetime.datetime(2012, 1, 9, 0, 0), Open=59.029999, High=59.549999, Low=58.919998, Close=59.18, Volume=6679300, Adj Close=51.616215000000004)]

#### 5. Breve analisis estadistico del dataframe

In [7]:
df.describe().show()

+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|summary|              Open|             High|              Low|            Close|           Volume|        Adj Close|
+-------+------------------+-----------------+-----------------+-----------------+-----------------+-----------------+
|  count|              1258|             1258|             1258|             1258|             1258|             1258|
|   mean| 72.35785375357709|72.83938807631165| 71.9186009594594|72.38844998012726|8222093.481717011|67.23883848728146|
| stddev|  6.76809024470826|6.768186808159218|6.744075756255496|6.756859163732991|  4519780.8431556|6.722609449996857|
|    min|56.389998999999996|        57.060001|        56.299999|        56.419998|          2094900|        50.363689|
|    max|         90.800003|        90.970001|            89.25|        90.470001|         80898100|84.91421600000001|
+-------+------------------+-----------------+--

#### 6. Reducir a 2 la salida de decimales del la funcion df.describe() para los valores *mean* y *stddev*.

[Pista](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.cast)


In [8]:
df.describe().printSchema()

root
 |-- summary: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Adj Close: string (nullable = true)



In [9]:
from pyspark.sql.functions import format_number

In [10]:
result = df.describe()
result.select(result['summary'],
              format_number(result['Open'].cast('float'),2).alias('Open'),
              format_number(result['High'].cast('float'),2).alias('High'),
              format_number(result['Low'].cast('float'),2).alias('Low'),
              format_number(result['Close'].cast('float'),2).alias('Close'),
              result['Volume'].cast('int').alias('Volume')
             ).show()

+-------+--------+--------+--------+--------+--------+
|summary|    Open|    High|     Low|   Close|  Volume|
+-------+--------+--------+--------+--------+--------+
|  count|1,258.00|1,258.00|1,258.00|1,258.00|    1258|
|   mean|   72.36|   72.84|   71.92|   72.39| 8222093|
| stddev|    6.77|    6.77|    6.74|    6.76| 4519780|
|    min|   56.39|   57.06|   56.30|   56.42| 2094900|
|    max|   90.80|   90.97|   89.25|   90.47|80898100|
+-------+--------+--------+--------+--------+--------+



#### 7. Crear un nuevo dataframe con una columna llamada *'HV Ratio'* que representa la relación entre las features 'High' y 'Volume' por dia.

In [11]:
df2 = df.withColumn("HV Ratio",df["High"]/df["Volume"]).select('HV Ratio').show()

+--------------------+
|            HV Ratio|
+--------------------+
|4.819714653321546E-6|
|6.290848613094555E-6|
|4.669412994783916E-6|
|7.367338463826307E-6|
|8.915604778943901E-6|
|8.644477436914568E-6|
|9.351828421515645E-6|
| 8.29141562102703E-6|
|7.712212102001476E-6|
|7.071764823529412E-6|
|1.015495466386981E-5|
|6.576354146362592...|
| 5.90145296180676E-6|
|8.547679455011844E-6|
|8.420709512685392E-6|
|1.041448341728929...|
|8.316075414862431E-6|
|9.721183814992126E-6|
|8.029436027707578E-6|
|6.307432259386365E-6|
+--------------------+
only showing top 20 rows



#### 8. Que dia tuvo el pico mas alto de la variable 'High'?

In [12]:
df.orderBy(df["High"].desc()).head(1)[0][0]

datetime.datetime(2015, 1, 13, 0, 0)

#### 9. Calcular la media de la variable 'Close'?

In [14]:
from pyspark.sql.functions import mean
df.select(mean('Close')).show()

+-----------------+
|       avg(Close)|
+-----------------+
|72.38844998012726|
+-----------------+



#### 10. Cuales son el máximo y el mínimo de la variable 'Volume'?

In [15]:
from pyspark.sql.functions import max,min

In [16]:
df.select(max('Volume'),min('Volume')).show()

+-----------+-----------+
|max(Volume)|min(Volume)|
+-----------+-----------+
|   80898100|    2094900|
+-----------+-----------+



#### 11. Cuántos dias ha estado la variable 'Close' por debajo de 60?

In [21]:
df.filter(df['Close']<60).count()

81

#### 12. Qué porcentaje del tiempo la variable 'High' ha estado por encima de 80 ?

In [25]:
df.filter(df['High']>80).count()/df.count()*100

9.141494435612083

#### 13. Cuál es el valor de la correlación Pearson entre 'High' y 'Volume'? [Pista](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameStatFunctions.corr)

In [26]:
from pyspark.sql.functions import corr
df.select(corr("High","Volume")).show()

+-------------------+
| corr(High, Volume)|
+-------------------+
|-0.3384326061737161|
+-------------------+



#### 14. Cuál es el máximo de la variable 'High' por año?

In [27]:
# Importo una funcion para extraer el año de la variable 'Date'
from pyspark.sql.functions import year

In [28]:
# Creo otro df con la variable año
otro_df = df.withColumn("Anio",year(df["Date"]))

In [30]:
otro_df.head()

Row(Date=datetime.datetime(2012, 1, 3, 0, 0), Open=59.970001, High=61.060001, Low=59.869999, Close=60.330002, Volume=12668800, Adj Close=52.619234999999996, Anio=2012)

In [33]:
# Hago un grupBy x año aplicando el maximo.
otro_df.groupBy('Anio').max().show()

+----+-----------------+---------+---------+----------+-----------+-----------------+---------+
|Anio|        max(Open)|max(High)| max(Low)|max(Close)|max(Volume)|   max(Adj Close)|max(Anio)|
+----+-----------------+---------+---------+----------+-----------+-----------------+---------+
|2015|        90.800003|90.970001|    89.25| 90.470001|   80898100|84.91421600000001|     2015|
|2013|        81.209999|81.370003|    80.82| 81.209999|   25683700|        73.929868|     2013|
|2014|87.08000200000001|88.089996|86.480003| 87.540001|   22812400|81.70768000000001|     2014|
|2012|        77.599998|77.599998|76.690002| 77.150002|   38007300|        68.568371|     2012|
|2016|             74.5|75.190002|73.629997| 74.300003|   35076700|        73.233524|     2016|
+----+-----------------+---------+---------+----------+-----------+-----------------+---------+



In [None]:
# Si solo quiero mostrar los maximos del campo especifico hago un select()
otro_df.groupBy('Anio').max().select('max(High)').show()



#### 15. Cual es la media de la variable 'Close' para cada mes (pero sin tener en cuenta el año)?

In [55]:
# Creo otro df con la variable mes
from pyspark.sql.functions import month
otro_df2 = df.withColumn("Mes",month(df["Date"]))
newdf=otro_df2.groupBy('Mes').mean()
newdf.orderBy('Mes').select(newdf['Mes'],format_number(newdf['avg(Close)'],2).alias('Close')).show()

+---+-----+
|Mes|Close|
+---+-----+
|  1|71.45|
|  2|71.31|
|  3|71.78|
|  4|72.97|
|  5|72.31|
|  6|72.50|
|  7|74.44|
|  8|73.03|
|  9|72.18|
| 10|71.58|
| 11|72.11|
| 12|72.85|
+---+-----+



#### 16. Cual es la media de la variable 'Close' para cada mes (teniendo en cuenta el año)?

In [56]:
# Muy parecido al anterior.
# Importo las funciones de fecha para extraer el año y el mes de la variable 'Date'
from pyspark.sql.functions import year,month

In [57]:
# Creo otro df con la variable año
otro_df = df.withColumn("Anio",year(df["Date"]))
otro_df = otro_df.withColumn("Mes",month(df["Date"]))
newdf=otro_df.groupBy('Anio','Mes').mean().select(['Anio','Mes','avg(Close)'])
newdf.printSchema()

root
 |-- Anio: integer (nullable = true)
 |-- Mes: integer (nullable = true)
 |-- avg(Close): double (nullable = true)



In [58]:
newdf.orderBy('Anio','Mes').select(newdf['Anio'],
                                   newdf['Mes'],
                                   format_number(newdf['avg(Close)'],2).alias('Close')).show()

+----+---+-----+
|Anio|Mes|Close|
+----+---+-----+
|2012|  1|60.24|
|2012|  2|60.90|
|2012|  3|60.43|
|2012|  4|60.15|
|2012|  5|61.46|
|2012|  6|67.50|
|2012|  7|72.41|
|2012|  8|73.04|
|2012|  9|74.18|
|2012| 10|75.31|
|2012| 11|71.11|
|2012| 12|69.71|
|2013|  1|69.09|
|2013|  2|70.62|
|2013|  3|73.44|
|2013|  4|77.69|
|2013|  5|77.82|
|2013|  6|74.98|
|2013|  7|77.12|
|2013|  8|75.22|
+----+---+-----+
only showing top 20 rows

