In [41]:
import pyspark

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F
from pyspark.sql.window import Window

spark = SparkSession \
        .builder \
        .appName("Ejemplo de Spark") \
        .getOrCreate()

spark

In [42]:
df = spark.read.csv("../data/IBEX35")
df.show()

# Does not read headers

+------+----------+--------+---------+------+------------+--------+--------+--------+--------+
|   _c0|       _c1|     _c2|      _c3|   _c4|         _c5|     _c6|     _c7|     _c8|     _c9|
+------+----------+--------+---------+------+------------+--------+--------+--------+--------+
|Ticker|      Date|   Close|Reference|Volume|    Turnover|    Last|    High|     Low| Average|
|  AENA|02/01/2019|137.0000| 135.7500|143458| 19591073.50|137.0000|138.0000|133.9500|136.5631|
|  AENA|03/01/2019|137.0500| 137.0000|461151| 63059756.45|137.0500|139.1500|136.4000|137.4490|
|  AENA|04/01/2019|140.5000| 137.0500|110214| 15444845.65|140.5000|140.9500|137.7000|140.1351|
|  AENA|07/01/2019|139.1500| 140.5000| 92663| 12908476.55|139.1500|141.1000|138.2000|139.3056|
|  AENA|08/01/2019|140.0000| 139.1500|179857| 25206724.88|140.0000|141.0500|138.8000|140.1306|
|  AENA|09/01/2019|138.5000| 140.0000|163959| 22859110.75|138.5000|141.6500|138.3500|138.8855|
|  AENA|10/01/2019|139.2500| 138.5000|181664| 2520

In [49]:
df = spark.read.option("header", "true").csv("../data/IBEX35")
df.show()

+------+----------+--------+---------+------+------------+--------+--------+--------+--------+
|Ticker|      Date|   Close|Reference|Volume|    Turnover|    Last|    High|     Low| Average|
+------+----------+--------+---------+------+------------+--------+--------+--------+--------+
|  AENA|02/01/2019|137.0000| 135.7500|143458| 19591073.50|137.0000|138.0000|133.9500|136.5631|
|  AENA|03/01/2019|137.0500| 137.0000|461151| 63059756.45|137.0500|139.1500|136.4000|137.4490|
|  AENA|04/01/2019|140.5000| 137.0500|110214| 15444845.65|140.5000|140.9500|137.7000|140.1351|
|  AENA|07/01/2019|139.1500| 140.5000| 92663| 12908476.55|139.1500|141.1000|138.2000|139.3056|
|  AENA|08/01/2019|140.0000| 139.1500|179857| 25206724.88|140.0000|141.0500|138.8000|140.1306|
|  AENA|09/01/2019|138.5000| 140.0000|163959| 22859110.75|138.5000|141.6500|138.3500|138.8855|
|  AENA|10/01/2019|139.2500| 138.5000|181664| 25208702.51|139.2500|139.5000|137.5500|138.8824|
|  AENA|11/01/2019|139.4500| 139.2500|710391| 9912

In [4]:
print(f"The number of tickers is {df.count()}")
print(f"The data colums are {df.columns}")

The number of tickers is 8575
The data colums are ['Ticker', 'Date', 'Close', 'Reference', 'Volume', 'Turnover', 'Last', 'High', 'Low', 'Average']


In [5]:
df = df.withColumn("Aumento_valor", df.Close > df.Reference)
df.show()

+------+----------+--------+---------+------+------------+--------+--------+--------+--------+-------------+
|Ticker|      Date|   Close|Reference|Volume|    Turnover|    Last|    High|     Low| Average|Aumento_valor|
+------+----------+--------+---------+------+------------+--------+--------+--------+--------+-------------+
|  AENA|02/01/2019|137.0000| 135.7500|143458| 19591073.50|137.0000|138.0000|133.9500|136.5631|         true|
|  AENA|03/01/2019|137.0500| 137.0000|461151| 63059756.45|137.0500|139.1500|136.4000|137.4490|         true|
|  AENA|04/01/2019|140.5000| 137.0500|110214| 15444845.65|140.5000|140.9500|137.7000|140.1351|         true|
|  AENA|07/01/2019|139.1500| 140.5000| 92663| 12908476.55|139.1500|141.1000|138.2000|139.3056|        false|
|  AENA|08/01/2019|140.0000| 139.1500|179857| 25206724.88|140.0000|141.0500|138.8000|140.1306|         true|
|  AENA|09/01/2019|138.5000| 140.0000|163959| 22859110.75|138.5000|141.6500|138.3500|138.8855|        false|
|  AENA|10/01/2019|

In [6]:
df.filter(df.Ticker == "REP").show(5)

df.filter((df.Ticker == "REP") & (df.Aumento_valor == "true")).show(5)


+------+----------+-------+---------+--------+------------+-------+-------+-------+-------+-------------+
|Ticker|      Date|  Close|Reference|  Volume|    Turnover|   Last|   High|    Low|Average|Aumento_valor|
+------+----------+-------+---------+--------+------------+-------+-------+-------+-------+-------------+
|   REP|02/01/2019|14.2350|  14.0800|10003999|140210096.55|14.2350|14.3100|13.7850|14.0805|         true|
|   REP|03/01/2019|14.2550|  14.2350|12814838|180613397.06|14.2550|14.3800|14.0250|14.2255|         true|
|   REP|04/01/2019|14.5950|  14.2550| 7089672|103139023.91|14.5950|14.7050|14.3600|14.5892|         true|
|   REP|07/01/2019|14.4750|  14.5950| 4409347| 63898025.48|14.4750|14.6950|14.3300|14.4915|        false|
|   REP|08/01/2019|14.4900|  14.4750|12227338|177321179.12|14.4900|14.7900|14.4400|14.5760|         true|
+------+----------+-------+---------+--------+------------+-------+-------+-------+-------+-------------+
only showing top 5 rows

+------+----------+--

In [16]:
df.filter(df.Ticker == "IDR").show()
print(df.filter((df.Ticker == "IDR") & (df.Volume > 500000)).count())
print(df.filter(df.Ticker == "IDR").count())
df.select("Ticker").distinct().show()
# df.filter((df.Ticker == "IDR") & (df.Volume > 500000)).show(42)


+------+----------+------+---------+------+----------+------+------+------+-------+-------------+
|Ticker|      Date| Close|Reference|Volume|  Turnover|  Last|  High|   Low|Average|Aumento_valor|
+------+----------+------+---------+------+----------+------+------+------+-------+-------------+
|   IDR|02/01/2019|8.1250|   8.2350|554146|4504780.53|8.1250|8.2100|8.0000| 8.1292|        false|
|   IDR|03/01/2019|8.0000|   8.1250|518359|4154868.74|8.0000|8.1350|7.9200| 8.0154|        false|
|   IDR|04/01/2019|8.2050|   8.0000|539367|4389231.91|8.2050|8.2400|8.0400| 8.1377|         true|
|   IDR|07/01/2019|8.0500|   8.2050|776378|6226567.10|8.0500|8.2250|7.8900| 8.0200|        false|
|   IDR|08/01/2019|8.1850|   8.0500|547992|4480941.25|8.1850|8.2650|8.0550| 8.1770|         true|
|   IDR|09/01/2019|8.5800|   8.1850|882949|7544821.85|8.5800|8.7000|8.2500| 8.5442|         true|
|   IDR|10/01/2019|8.5350|   8.5800|351675|3001337.82|8.5350|8.5900|8.4800| 8.5344|        false|
|   IDR|11/01/2019|8

In [17]:
df.show()

+------+----------+--------+---------+------+------------+--------+--------+--------+--------+-------------+
|Ticker|      Date|   Close|Reference|Volume|    Turnover|    Last|    High|     Low| Average|Aumento_valor|
+------+----------+--------+---------+------+------------+--------+--------+--------+--------+-------------+
|  AENA|02/01/2019|137.0000| 135.7500|143458| 19591073.50|137.0000|138.0000|133.9500|136.5631|         true|
|  AENA|03/01/2019|137.0500| 137.0000|461151| 63059756.45|137.0500|139.1500|136.4000|137.4490|         true|
|  AENA|04/01/2019|140.5000| 137.0500|110214| 15444845.65|140.5000|140.9500|137.7000|140.1351|         true|
|  AENA|07/01/2019|139.1500| 140.5000| 92663| 12908476.55|139.1500|141.1000|138.2000|139.3056|        false|
|  AENA|08/01/2019|140.0000| 139.1500|179857| 25206724.88|140.0000|141.0500|138.8000|140.1306|         true|
|  AENA|09/01/2019|138.5000| 140.0000|163959| 22859110.75|138.5000|141.6500|138.3500|138.8855|        false|
|  AENA|10/01/2019|

In [7]:
from pyspark.sql.functions import round, col

df = df.withColumn("Diff", round(col("Close") - col("Reference"), 2))
df.show()

+------+----------+--------+---------+------+------------+--------+--------+--------+--------+-------------+-----+
|Ticker|      Date|   Close|Reference|Volume|    Turnover|    Last|    High|     Low| Average|Aumento_valor| Diff|
+------+----------+--------+---------+------+------------+--------+--------+--------+--------+-------------+-----+
|  AENA|02/01/2019|137.0000| 135.7500|143458| 19591073.50|137.0000|138.0000|133.9500|136.5631|         true| 1.25|
|  AENA|03/01/2019|137.0500| 137.0000|461151| 63059756.45|137.0500|139.1500|136.4000|137.4490|         true| 0.05|
|  AENA|04/01/2019|140.5000| 137.0500|110214| 15444845.65|140.5000|140.9500|137.7000|140.1351|         true| 3.45|
|  AENA|07/01/2019|139.1500| 140.5000| 92663| 12908476.55|139.1500|141.1000|138.2000|139.3056|        false|-1.35|
|  AENA|08/01/2019|140.0000| 139.1500|179857| 25206724.88|140.0000|141.0500|138.8000|140.1306|         true| 0.85|
|  AENA|09/01/2019|138.5000| 140.0000|163959| 22859110.75|138.5000|141.6500|138.

In [10]:
df = df.withColumn("Date", F.to_date(df["Date"], 'dd/MM/yyyy'))
df.printSchema()
df.show()

root
 |-- Ticker: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Close: string (nullable = true)
 |-- Reference: string (nullable = true)
 |-- Volume: string (nullable = true)
 |-- Turnover: string (nullable = true)
 |-- Last: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Average: string (nullable = true)
 |-- Aumento_valor: boolean (nullable = true)
 |-- Diff: double (nullable = true)

+------+----------+--------+---------+------+------------+--------+--------+--------+--------+-------------+-----+
|Ticker|      Date|   Close|Reference|Volume|    Turnover|    Last|    High|     Low| Average|Aumento_valor| Diff|
+------+----------+--------+---------+------+------------+--------+--------+--------+--------+-------------+-----+
|  AENA|2019-01-02|137.0000| 135.7500|143458| 19591073.50|137.0000|138.0000|133.9500|136.5631|         true| 1.25|
|  AENA|2019-01-03|137.0500| 137.0000|461151| 63059756.45|137.0500|139.1500|13

In [11]:
df = df.withColumn("Month", F.month(df["Date"]))
df.show()

+------+----------+--------+---------+------+------------+--------+--------+--------+--------+-------------+-----+-----+
|Ticker|      Date|   Close|Reference|Volume|    Turnover|    Last|    High|     Low| Average|Aumento_valor| Diff|Month|
+------+----------+--------+---------+------+------------+--------+--------+--------+--------+-------------+-----+-----+
|  AENA|2019-01-02|137.0000| 135.7500|143458| 19591073.50|137.0000|138.0000|133.9500|136.5631|         true| 1.25|    1|
|  AENA|2019-01-03|137.0500| 137.0000|461151| 63059756.45|137.0500|139.1500|136.4000|137.4490|         true| 0.05|    1|
|  AENA|2019-01-04|140.5000| 137.0500|110214| 15444845.65|140.5000|140.9500|137.7000|140.1351|         true| 3.45|    1|
|  AENA|2019-01-07|139.1500| 140.5000| 92663| 12908476.55|139.1500|141.1000|138.2000|139.3056|        false|-1.35|    1|
|  AENA|2019-01-08|140.0000| 139.1500|179857| 25206724.88|140.0000|141.0500|138.8000|140.1306|         true| 0.85|    1|
|  AENA|2019-01-09|138.5000| 140

In [16]:
df.groupBy("Month").agg(F.round(F.mean("Close"), 2).alias("avg_close")).orderBy("avg_close", asecnding = False).show()

+-----+---------+
|Month|avg_close|
+-----+---------+
|    1|    21.25|
|    8|    22.19|
|    2|    22.24|
|   10|    22.75|
|    9|    22.77|
|    3|    23.03|
|    5|    23.12|
|    7|    23.26|
|    6|    23.34|
|   11|    23.37|
|   12|    23.42|
|    4|    23.75|
+-----+---------+



In [24]:
df.groupBy("Month").agg(F.round(F.sum("Diff"), 2).alias("total_diff")).orderBy("total_diff", ascending = False).show()

+-----+----------+
|Month|total_diff|
+-----+----------+
|    1|     59.94|
|    3|      33.2|
|    4|     28.19|
|    6|     27.49|
|    2|     19.37|
|   11|     17.46|
|    9|     16.49|
|   12|     12.54|
|   10|      7.63|
|    8|     -3.65|
|    7|    -16.45|
|    5|    -45.79|
+-----+----------+



In [27]:
df_REP = df.filter(df.Ticker == "REP")
df_REP.show()
df_REP.count()

df_AENA = df.filter(df.Ticker == "AENA")
df_AENA.show()
df_AENA.count()

+------+----------+-------+---------+--------+------------+-------+-------+-------+-------+-------------+-----+-----+
|Ticker|      Date|  Close|Reference|  Volume|    Turnover|   Last|   High|    Low|Average|Aumento_valor| Diff|Month|
+------+----------+-------+---------+--------+------------+-------+-------+-------+-------+-------------+-----+-----+
|   REP|2019-01-02|14.2350|  14.0800|10003999|140210096.55|14.2350|14.3100|13.7850|14.0805|         true| 0.15|    1|
|   REP|2019-01-03|14.2550|  14.2350|12814838|180613397.06|14.2550|14.3800|14.0250|14.2255|         true| 0.02|    1|
|   REP|2019-01-04|14.5950|  14.2550| 7089672|103139023.91|14.5950|14.7050|14.3600|14.5892|         true| 0.34|    1|
|   REP|2019-01-07|14.4750|  14.5950| 4409347| 63898025.48|14.4750|14.6950|14.3300|14.4915|        false|-0.12|    1|
|   REP|2019-01-08|14.4900|  14.4750|12227338|177321179.12|14.4900|14.7900|14.4400|14.5760|         true| 0.02|    1|
|   REP|2019-01-09|14.6950|  14.4900|11410241|166077803.

245

In [31]:
df_names = spark.read.option("header", "true").csv("../data/IBEX35_names.csv")
df_names.show()

+--------------+--------------------+
|COMPANY TICKER|        COMPANY NAME|
+--------------+--------------------+
|           ANA|             ACCIONA|
|           ACX|            ACERINOX|
|           ACS|ACS ACTIVIDADES C...|
|          AENA|                AENA|
|           AMS|    AMADEUS IT GROUP|
|           MTS|       ARCELORMITTAL|
|           SAB|   BANCO DE SABADELL|
|           SAN|     BANCO SANTANDER|
|          BKIA|              BANKIA|
|           BKT|           BANKINTER|
|          BBVA|BANCO BILBAO VIZC...|
|          CABK|           CAIXABANK|
|          CLNX|     CELLNEX TELECOM|
|           CIE|      CIE AUTOMOTIVE|
|           ENG|              ENAGAS|
|           ENC|ENCE ENERGIA Y CE...|
|           ELE|              ENDESA|
|           FER|           FERROVIAL|
|           GRF|     GRIFOLS CLASE A|
|           IAG|INTERNATIONAL CON...|
+--------------+--------------------+
only showing top 20 rows



In [35]:
df_join = df.join(df_names, on = df["Ticker"] == df_names["Company Ticker"], how = "right")
df_join.show()

+------+----------+-------+---------+------+-----------+-------+-------+-------+-------+-------------+-----+-----+--------------+------------+
|Ticker|      Date|  Close|Reference|Volume|   Turnover|   Last|   High|    Low|Average|Aumento_valor| Diff|Month|COMPANY TICKER|COMPANY NAME|
+------+----------+-------+---------+------+-----------+-------+-------+-------+-------+-------------+-----+-----+--------------+------------+
|   ANA|2019-12-13|93.8500|  93.3000| 55309| 5192000.25|93.8500|94.3500|93.4500|93.8726|         true| 0.55|   12|           ANA|     ACCIONA|
|   ANA|2019-12-12|93.3000|  92.8500| 46860| 4364399.45|93.3000|93.4500|92.4500|93.1370|         true| 0.45|   12|           ANA|     ACCIONA|
|   ANA|2019-12-11|92.8500|  91.9500| 48398| 4478126.00|92.8500|92.9000|92.0500|92.5271|         true|  0.9|   12|           ANA|     ACCIONA|
|   ANA|2019-12-10|91.9500|  92.0500| 46267| 4254898.65|91.9500|92.5000|91.1500|91.9640|        false| -0.1|   12|           ANA|     ACCIONA|

In [51]:
df_join = df_join.drop("COMPANY TICKER")
df_join.show()
df.show()

+------+----------+-------+---------+------+-----------+-------+-------+-------+-------+-------------+-----+-----+------------+
|Ticker|      Date|  Close|Reference|Volume|   Turnover|   Last|   High|    Low|Average|Aumento_valor| Diff|Month|COMPANY NAME|
+------+----------+-------+---------+------+-----------+-------+-------+-------+-------+-------------+-----+-----+------------+
|   ANA|2019-12-13|93.8500|  93.3000| 55309| 5192000.25|93.8500|94.3500|93.4500|93.8726|         true| 0.55|   12|     ACCIONA|
|   ANA|2019-12-12|93.3000|  92.8500| 46860| 4364399.45|93.3000|93.4500|92.4500|93.1370|         true| 0.45|   12|     ACCIONA|
|   ANA|2019-12-11|92.8500|  91.9500| 48398| 4478126.00|92.8500|92.9000|92.0500|92.5271|         true|  0.9|   12|     ACCIONA|
|   ANA|2019-12-10|91.9500|  92.0500| 46267| 4254898.65|91.9500|92.5000|91.1500|91.9640|        false| -0.1|   12|     ACCIONA|
|   ANA|2019-12-09|92.0500|  92.0000|129741|11889158.00|92.0500|92.6000|91.4500|92.1137|         true| 0

In [58]:
df_join = df_join.withColumn("Volume", F.col("Volume").cast("int"))

window = Window.partitionBy("Month").orderBy(df_join.Volume.desc())

df_join.withColumn("month_vol_rank", F.row_number().over(window)).filter(F.col("month_vol_rank") == 3).show()

+------+----------+------+---------+---------+-------------+------+------+------+-------+-------------+-----+-----+-----------------+--------------+
|Ticker|      Date| Close|Reference|   Volume|     Turnover|  Last|  High|   Low|Average|Aumento_valor| Diff|Month|     COMPANY NAME|month_vol_rank|
+------+----------+------+---------+---------+-------------+------+------+------+-------+-------------+-----+-----+-----------------+--------------+
|   SAN|2019-01-24|4.3355|   4.3550|250739919|1099255730.82|4.3355|4.4200|4.2875| 4.3620|        false|-0.02|    1|  BANCO SANTANDER|             3|
|   SAN|2019-02-06|4.1240|   4.1050|265526091|1083201324.37|4.1240|4.1435|4.0550| 4.1112|         true| 0.02|    2|  BANCO SANTANDER|             3|
|   SAB|2019-03-22|0.9100|   0.9304| 94583890|  88227743.79|0.9100|0.9422|0.9068| 0.9156|        false|-0.02|    3|BANCO DE SABADELL|             3|
|   SAN|2019-04-17|4.6185|   4.5715|251111657|1150139655.01|4.6185|4.6820|4.5600| 4.6245|         true| 0.

In [59]:
df_join = df_join.withColumn("Turnover", F.col("Turnover").cast("float"))
df_join.show()

+------+----------+-------+---------+------+-----------+-------+-------+-------+-------+-------------+-----+-----+------------+
|Ticker|      Date|  Close|Reference|Volume|   Turnover|   Last|   High|    Low|Average|Aumento_valor| Diff|Month|COMPANY NAME|
+------+----------+-------+---------+------+-----------+-------+-------+-------+-------+-------------+-----+-----+------------+
|   ANA|2019-12-13|93.8500|  93.3000| 55309|  5192000.0|93.8500|94.3500|93.4500|93.8726|         true| 0.55|   12|     ACCIONA|
|   ANA|2019-12-12|93.3000|  92.8500| 46860|  4364399.5|93.3000|93.4500|92.4500|93.1370|         true| 0.45|   12|     ACCIONA|
|   ANA|2019-12-11|92.8500|  91.9500| 48398|  4478126.0|92.8500|92.9000|92.0500|92.5271|         true|  0.9|   12|     ACCIONA|
|   ANA|2019-12-10|91.9500|  92.0500| 46267|  4254898.5|91.9500|92.5000|91.1500|91.9640|        false| -0.1|   12|     ACCIONA|
|   ANA|2019-12-09|92.0500|  92.0000|129741|1.1889158E7|92.0500|92.6000|91.4500|92.1137|         true| 0

In [60]:
window = Window.partitionBy("Ticker").orderBy(df_join.Turnover.desc())

df_join.withColumn("turnover_rank", F.row_number().over(window)).filter(F.col("turnover_rank") == 10).show()

+------+----------+--------+---------+--------+------------+--------+--------+--------+--------+-------------+-----+-----+--------------------+-------------+
|Ticker|      Date|   Close|Reference|  Volume|    Turnover|    Last|    High|     Low| Average|Aumento_valor| Diff|Month|        COMPANY NAME|turnover_rank|
+------+----------+--------+---------+--------+------------+--------+--------+--------+--------+-------------+-----+-----+--------------------+-------------+
|   ACS|2019-06-05| 37.4000|  37.2500| 1504343| 5.6504048E7| 37.4000| 37.7700| 37.2300| 37.4230|         true| 0.15|    6|ACS ACTIVIDADES C...|           10|
|   ACX|2019-11-07|  9.4440|   9.2700| 2304097|  2.163195E7|  9.4440|  9.4700|  9.3100|  9.4143|         true| 0.17|   11|            ACERINOX|           10|
|  AENA|2019-12-10|166.3500| 166.3000|  789906|1.31597344E8|166.3500|167.3000|164.4500|166.1044|         true| 0.05|   12|                AENA|           10|
|   AMS|2019-07-15| 72.4400|  71.7200| 5186456|3.757