# Primeros pasos en Spark

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://mirrors.sonic.net/apache/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
!tar xzf spark-3.1.2-bin-hadoop3.2.tgz
!pip install -q findspark


import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"


import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [None]:
spark

In [None]:
import pandas as pd

In [None]:
data = spark.read.options(inferSchema = True, delimeter = "\t", header=True).csv("drive/My Drive/Colab Notebooks/walmart_stock.csv") # We import the CSV into our variable. However, with command read.table we didn't get a nice view, so we use pandas
data.show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|              Open|              High|               Low|             Close|  Volume|         Adj Close|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|2012-01-03|         59.970001|         61.060001|         59.869999|         60.330002|12668800|52.619234999999996|
|2012-01-04|60.209998999999996|         60.349998|         59.470001|59.709998999999996| 9593300|         52.078475|
|2012-01-05|         59.349998|         59.619999|         58.369999|         59.419998|12768200|         51.825539|
|2012-01-06|         59.419998|         59.450001|         58.869999|              59.0| 8069400|          51.45922|
|2012-01-09|         59.029999|         59.549999|         58.919998|             59.18| 6679300|51.616215000000004|
|2012-01-10|             59.43|59.709998999999996|             5

In [None]:
df = data.toPandas() # We can convert the dataframe into a new dataframe with Pandas to have a better view. So now we can work with both (data and df)
df

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539
3,2012-01-06,59.419998,59.450001,58.869999,59.000000,8069400,51.459220
4,2012-01-09,59.029999,59.549999,58.919998,59.180000,6679300,51.616215
...,...,...,...,...,...,...,...
1253,2016-12-23,69.430000,69.750000,69.360001,69.540001,4803900,69.032411
1254,2016-12-27,69.300003,69.820000,69.250000,69.699997,4435700,69.191240
1255,2016-12-28,69.940002,70.000000,69.260002,69.309998,4875700,68.804087
1256,2016-12-29,69.209999,69.519997,69.120003,69.260002,4298400,68.754456


In [None]:
data.printSchema() # We can see the column names using pandas. Also, we can use that command to have a better information about the columns
# Pandas doesn't have the printSchema option so that's why we use that command with our dataframe data. Still, both have the same columns names.

root
 |-- Date: string (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume: integer (nullable = true)
 |-- Adj Close: double (nullable = true)



The command Schema show us the column names and what type of information it stores. It looks like a pathway from the root where it has different nodes (the columns) which one of them contain some information.

In [None]:
df.head(5) # If we've used the data.head(5) the view would be quite worst, so we use this command with the df converted to Pandas.

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539
3,2012-01-06,59.419998,59.450001,58.869999,59.0,8069400,51.45922
4,2012-01-09,59.029999,59.549999,58.919998,59.18,6679300,51.616215


In [None]:
df.describe() # If we use the describe function in the Pandas df we will get a different information that if we used it with our data (CSV file) dataframe.

Unnamed: 0,Open,High,Low,Close,Volume,Adj Close
count,1258.0,1258.0,1258.0,1258.0,1258.0,1258.0
mean,72.357854,72.839388,71.918601,72.38845,8222093.0,67.238838
std,6.76809,6.768187,6.744076,6.756859,4519781.0,6.722609
min,56.389999,57.060001,56.299999,56.419998,2094900.0,50.363689
25%,68.627503,69.059998,68.162502,68.632497,5791100.0,63.778335
50%,73.235,73.725002,72.839996,73.265,7093500.0,68.541162
75%,76.629997,77.094999,76.25,76.709999,9394675.0,71.105668
max,90.800003,90.970001,89.25,90.470001,80898100.0,84.914216


In [None]:
data.describe().toPandas() # If we want to see the statistics of our dataframe, then we need to used our original dataframe with describe function, and to have a better view use the to.Pandas() function

Unnamed: 0,summary,Date,Open,High,Low,Close,Volume,Adj Close
0,count,1258,1258.0,1258.0,1258.0,1258.0,1258.0,1258.0
1,mean,,72.35785375357709,72.83938807631165,71.9186009594594,72.38844998012726,8222093.481717011,67.23883848728146
2,stddev,,6.76809024470826,6.768186808159218,6.744075756255496,6.756859163732991,4519780.8431556,6.722609449996857
3,min,2012-01-03,56.389999,57.060001,56.299999,56.419998,2094900.0,50.363689
4,max,2016-12-30,90.800003,90.970001,89.25,90.470001,80898100.0,84.91421600000001


Describes function show us the total number of registers (1258), the mean each column (with Date it won't show anything because it has a date.time format) and the min and max value of each column). In the df (converted to Pandas) we can see interest values as how much values are uniq in the the high and low value, realising that we have some values that are the same.

In [None]:
df['Open'] = pd.to_numeric(df['Open'])
df['High'] = pd.to_numeric(df['High'])
df['Low'] = pd.to_numeric(df['Low'])
df['Close'] = pd.to_numeric(df['Close'])
df['Volume'] = pd.to_numeric(df['Volume'])
df['Adj Close'] = pd.to_numeric(df['Adj Close'])
df['Date'] = pd.to_datetime(df["Date"])

df.dtypes

Date         datetime64[ns]
Open                float64
High                float64
Low                 float64
Close               float64
Volume                int32
Adj Close           float64
dtype: object

Para redondear a 2 decimales, primero tenemos que convertir las columnas con los valores numéricos en float64 (estaban como strings, por tanto, al aplicar cualquier función round no van a redondear nada ya que la información no se interpreta como numérica).

In [None]:
df.round(2)

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close
0,2012-01-03,59.97,61.06,59.87,60.33,12668800,52.62
1,2012-01-04,60.21,60.35,59.47,59.71,9593300,52.08
2,2012-01-05,59.35,59.62,58.37,59.42,12768200,51.83
3,2012-01-06,59.42,59.45,58.87,59.00,8069400,51.46
4,2012-01-09,59.03,59.55,58.92,59.18,6679300,51.62
...,...,...,...,...,...,...,...
1253,2016-12-23,69.43,69.75,69.36,69.54,4803900,69.03
1254,2016-12-27,69.30,69.82,69.25,69.70,4435700,69.19
1255,2016-12-28,69.94,70.00,69.26,69.31,4875700,68.80
1256,2016-12-29,69.21,69.52,69.12,69.26,4298400,68.75


Una vez hecho eso, se emplea el comando round, indicando el número de decimales que queremos que nos redondée y voilá! Funciona. Fijémonos que esto solo lo hace sobre los valores con decimales, no actuando sobre el resto de columnas.

In [None]:
data2 = data.withColumn("HV Ratio", (data["High"] / data["Volume"]))
df2 = data2.toPandas()
df2

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close,HV Ratio
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235,0.000005
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475,0.000006
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539,0.000005
3,2012-01-06,59.419998,59.450001,58.869999,59.000000,8069400,51.459220,0.000007
4,2012-01-09,59.029999,59.549999,58.919998,59.180000,6679300,51.616215,0.000009
...,...,...,...,...,...,...,...,...
1253,2016-12-23,69.430000,69.750000,69.360001,69.540001,4803900,69.032411,0.000015
1254,2016-12-27,69.300003,69.820000,69.250000,69.699997,4435700,69.191240,0.000016
1255,2016-12-28,69.940002,70.000000,69.260002,69.309998,4875700,68.804087,0.000014
1256,2016-12-29,69.209999,69.519997,69.120003,69.260002,4298400,68.754456,0.000016


In [None]:
highpr = data.groupBy("Date").agg({"High":"max"}).toPandas()
highpr.sort_values(by=['max(High)'], ascending=False)

# Lo que estamos haciendo aquí es muy simple. Agrupamos los datos en 2 columnas, día y valor máximo. Y en la columna de valor máximo las ordenamos del máximo valor al menor, agrupada con su correspondiente día.

Unnamed: 0,Date,max(High)
1141,2015-01-13,90.970001
437,2015-01-08,90.669998
1103,2015-01-09,90.389999
495,2015-01-12,90.309998
661,2015-01-23,89.260002
...,...,...
333,2015-11-16,58.029999
507,2015-11-12,57.770000
328,2015-10-28,57.720001
926,2015-11-02,57.610001


El día con el máximo valor en bolsa ha sido el 13 de Enero del 2015.

In [None]:
df["Close"].mean() # We can calculate automatically the mean with that function

72.38844998012726

In [None]:
df["Volume"].max() # Max value is 2094900
df["Volume"].min() # Min value is 2094900

2094900

In [None]:
data2.registerTempTable("datos")  # Primero, tenemos que guardar nuestra base de datos para que podamos leerla desde SQL
spark.sql("select * from datos where close < 60").count() # 81 days the close value was lower than 60.

81

In [None]:
(spark.sql("select * from datos where high > 80").count())/(spark.sql("select Date from datos").count()) * 100
# Contamos el número de días donde el máximo valor fue mayor de 80 y lo dividimos entre los días totales. Para hallar el porcentaje, multiplicamos por 100
# Por tanto, el 9% de los días el valor máximo fue mayor de 80.

9.141494435612083

In [None]:
data2.corr("High","Volume", "pearson") # Para calcular la correlación de Pearson, empleamos la función de que nos calcula la correlación entre las columnas que queremos y el método elegido (Pearson)

-0.3384326061737161

La correlación entre el valor más alto y el volumen es negativo y muy bajo, por lo que podemos extraer que apenas hay correlación y que a mayor valor del día, menor es el volumen.

In [None]:
df2['Month'] = pd.DatetimeIndex(df['Date']).month # Es necesario crear una columna que recoja solo los años
df2['Year'] = pd.DatetimeIndex(df['Date']).year # Es necesario crear una columna que recoja solo los meses
df2.head(50) # Chequeamos que todo está correcto

Unnamed: 0,Date,Open,High,Low,Close,Volume,Adj Close,HV Ratio,Month,Year
0,2012-01-03,59.970001,61.060001,59.869999,60.330002,12668800,52.619235,5e-06,1,2012
1,2012-01-04,60.209999,60.349998,59.470001,59.709999,9593300,52.078475,6e-06,1,2012
2,2012-01-05,59.349998,59.619999,58.369999,59.419998,12768200,51.825539,5e-06,1,2012
3,2012-01-06,59.419998,59.450001,58.869999,59.0,8069400,51.45922,7e-06,1,2012
4,2012-01-09,59.029999,59.549999,58.919998,59.18,6679300,51.616215,9e-06,1,2012
5,2012-01-10,59.43,59.709999,58.98,59.040001,6907300,51.494109,9e-06,1,2012
6,2012-01-11,59.060001,59.529999,59.040001,59.400002,6365600,51.808098,9e-06,1,2012
7,2012-01-12,59.790001,60.0,59.400002,59.5,7236400,51.895316,8e-06,1,2012
8,2012-01-13,59.18,59.610001,59.009998,59.540001,7729300,51.930204,8e-06,1,2012
9,2012-01-17,59.869999,60.110001,59.52,59.849998,8500000,52.200581,7e-06,1,2012


In [None]:
df2['Month'].head(250)

0       1
1       1
2       1
3       1
4       1
       ..
245    12
246    12
247    12
248    12
249    12
Name: Month, Length: 250, dtype: int64

In [None]:
df2.groupby("Year").agg({"High":"max"}) # Con esta fórmula conseguimos saber cuál fue el máximo valor en bolsa de cada año.

Unnamed: 0_level_0,High
Year,Unnamed: 1_level_1
2012,77.599998
2013,81.370003
2014,88.089996
2015,90.970001
2016,75.190002


In [None]:
df2.groupby("Month").agg({"Close":"mean"}) # Con esta fórmula conseguimos saber cuál fue el valor medio al cierre de la bolsa de cada mes

Unnamed: 0_level_0,Close
Month,Unnamed: 1_level_1
1,71.44802
2,71.306804
3,71.777944
4,72.973619
5,72.309717
6,72.495377
7,74.439719
8,73.029819
9,72.184118
10,71.578545
