<a href="https://www.kaggle.com/code/leomauro/spark-brazil-stock-market?scriptVersionId=114172079" target="_blank"><img align="left" alt="Kaggle" title="Open in Kaggle" src="https://kaggle.com/static/images/open-in-kaggle.svg"></a>

# Spark - Brazil Stock Market

This notebook presents how to use Spark in Python Jupyter Notebook for analytical analisys. We are going to perform a data analysis on [Brazil Stock Market - Data Warehouse](https://www.kaggle.com/datasets/leomauro/brazilian-stock-market-data-warehouse). In the end, I hope you are going to be able to apply these technologies in our projects.

> **Summary** - Spark in Stock Market, real data.   
> Content for intermediate level in Machine Learning and Data Science!

<a id="ToC"></a>
## Table of Contents
- [Apache Spark](#spark)
- [Queries](#query)
    - [pyspark.sql](#pyspark)
    - [Spark SQL](#spark_sql)

<a id="spark"></a>

---
# Apache Spark Instantiation

[> Back to Table of Contents](#ToC)

In [1]:
#install Apache Spark
!pip install pyspark --quiet

[0m

In [2]:
import os

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

/kaggle/input/brazilian-stock-market-data-warehouse/data-warehouse.sql
/kaggle/input/brazilian-stock-market-data-warehouse/factStocks.csv
/kaggle/input/brazilian-stock-market-data-warehouse/factCoins.csv
/kaggle/input/brazilian-stock-market-data-warehouse/dimCoin.csv
/kaggle/input/brazilian-stock-market-data-warehouse/dimCompany.csv
/kaggle/input/brazilian-stock-market-data-warehouse/dimTime.csv


Starting Spark session.

In [3]:
# starting new ambient
from pyspark.sql import SparkSession
from pyspark.sql.functions import round, desc

spark = SparkSession.builder.appName("pyspark-notebook").master("local[*]").getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/12/18 21:05:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Create Spark DataFrames.

In [4]:
import pandas as pd
pd.set_option('display.float_format', lambda x: '%.2f' % x)

# loading Spark DataFrames
stocks = spark.read.csv(path='/kaggle/input/brazilian-stock-market-data-warehouse/factStocks.csv', header=True, sep=",")
coins = spark.read.csv(path='/kaggle/input/brazilian-stock-market-data-warehouse/factCoins.csv', header=True, sep=",")
dim_coin = spark.read.csv(path='/kaggle/input/brazilian-stock-market-data-warehouse/dimCoin.csv', header=True, sep=",")
dim_company = spark.read.csv(path='/kaggle/input/brazilian-stock-market-data-warehouse/dimCompany.csv', header=True, sep=",")
dim_time = spark.read.csv(path='/kaggle/input/brazilian-stock-market-data-warehouse/dimTime.csv', header=True, sep=",")

                                                                                

Casting data types, using `FloatType` and `IntegerType`. By default, every column is string.

In [5]:
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType

# casting settings
stocks_casting = {
    'int_columns': ['keyTime', 'keyCompany', 'quantityStock'],
    'float_columns': ['openValueStock', 'closeValueStock', 'highValueStock', 'lowValueStock']
}
coins_casting = {
    'int_columns': ['keyTime', 'keyCoin'],
    'float_columns': ['valueCoin']
}
dim_coin_casting = {
    'int_columns': ['keyCoin']
}
dim_company_casting = {
    'int_columns': ['keyCompany']
}
dim_time_casting = {
    'int_columns': ['keyTime', 'dayTime', 'dayWeekTime', 'monthTime', 'bimonthTime', 'quarterTime', 'semesterTime', 'yearTime']
}

# integer casting
for column in stocks_casting['int_columns']:
    stocks = stocks.withColumn(column, stocks[column].cast(IntegerType()))
for column in coins_casting['int_columns']:
    coins = coins.withColumn(column, coins[column].cast(IntegerType()))
for column in dim_coin_casting['int_columns']:
    dim_coin = dim_coin.withColumn(column, dim_coin[column].cast(IntegerType()))
for column in dim_company_casting['int_columns']:
    dim_company = dim_company.withColumn(column, dim_company[column].cast(IntegerType()))
for column in dim_time_casting['int_columns']:
    dim_time = dim_time.withColumn(column, dim_time[column].cast(IntegerType()))

# float casting
for column in stocks_casting['float_columns']:
    stocks = stocks.withColumn(column, stocks[column].cast(FloatType()))
for column in coins_casting['float_columns']:
    coins = coins.withColumn(column, coins[column].cast(FloatType()))

Load temporary views for Spark SQL.

In [6]:
# loading temporary views, for Spark SQL
stocks.createOrReplaceTempView("stocks")
coins.createOrReplaceTempView("coins")
dim_coin.createOrReplaceTempView("dim_coin")
dim_company.createOrReplaceTempView("dim_company")
dim_time.createOrReplaceTempView("dim_time")

In [7]:
stocks.show(5)

+-------+----------+--------------+---------------+--------------+-------------+-------------+
|keyTime|keyCompany|openValueStock|closeValueStock|highValueStock|lowValueStock|quantityStock|
+-------+----------+--------------+---------------+--------------+-------------+-------------+
|   1355|        18|          45.8|           45.8|          45.8|         45.8|          916|
|   1355|       107|         12.15|           12.6|          12.6|        12.15|       668420|
|   1355|       108|          17.0|           17.5|          17.5|        16.95|      3789172|
|   1355|       604|         299.0|          299.0|         299.0|        299.0|        59800|
|   1355|       110|          53.1|           52.0|          53.8|         52.0|      8331641|
+-------+----------+--------------+---------------+--------------+-------------+-------------+
only showing top 5 rows



In [8]:
coins.show(5)

+-------+-------+---------+
|keyTime|keyCoin|valueCoin|
+-------+-------+---------+
|      4|      1|     0.94|
|      4|      2|     2.51|
|      5|      1|     0.93|
|      5|      2|     2.62|
|      6|      1|     0.92|
+-------+-------+---------+
only showing top 5 rows



In [9]:
dim_coin.show(5)

+-------+----------+--------+----------+
|keyCoin|abbrevCoin|nameCoin|symbolCoin|
+-------+----------+--------+----------+
|      1|       USD|   DOLAR|         $|
|      2|       EUR|    EURO|         €|
+-------+----------+--------+----------+



In [10]:
dim_company.show(5)

+----------+----------------+------------+-----------------+--------------------+--------------------+
|keyCompany|stockCodeCompany| nameCompany|sectorCodeCompany|       sectorCompany|      segmentCompany|
+----------+----------------+------------+-----------------+--------------------+--------------------+
|         1|           BRAP4|   BRADESPAR|             IMAT|BM&FBOVESPA BASIC...|SEGMENTS AND SECTORS|
|         2|           PMAM3|PARANAPANEMA|             IMAT|BM&FBOVESPA BASIC...|SEGMENTS AND SECTORS|
|         3|           CSNA3|SID NACIONAL|             IMAT|BM&FBOVESPA BASIC...|SEGMENTS AND SECTORS|
|         4|           SUZB3| SUZANO S.A.|             IMAT|BM&FBOVESPA BASIC...|SEGMENTS AND SECTORS|
|         5|           BRKM5|     BRASKEM|             IMAT|BM&FBOVESPA BASIC...|SEGMENTS AND SECTORS|
+----------+----------------+------------+-----------------+--------------------+--------------------+
only showing top 5 rows



In [11]:
dim_time.show(5)

+-------+----------+-------+-----------+-----------------+-------------------+---------+---------------+-----------------+-----------+-----------+------------+--------+
|keyTime|  datetime|dayTime|dayWeekTime|dayWeekAbbrevTime|dayWeekCompleteTime|monthTime|monthAbbrevTime|monthCompleteTime|bimonthTime|quarterTime|semesterTime|yearTime|
+-------+----------+-------+-----------+-----------------+-------------------+---------+---------------+-----------------+-----------+-----------+------------+--------+
|      1|1994-07-01|      1|          6|              SAB|             SABADO|        7|            JUL|            JULHO|          4|          3|           2|    1994|
|      2|1994-07-02|      2|          7|              DOM|            DOMINGO|        7|            JUL|            JULHO|          4|          3|           2|    1994|
|      3|1994-07-03|      3|          1|              SEG|            SEGUNDA|        7|            JUL|            JULHO|          4|          3|         

<a id="query"></a>

---
# Queries

[> Back to Table of Contents](#ToC)

<a id="pyspark"></a>

# pyspark.sql

What are the common methods?
- `var.schema` - Check the table schema
- `var.join(var, on="key")` - Star join between two DataFrames
- `var.where("query")` - Realiza uma filtragem sobre os dados
- `var.orderBy(desc("columnX"), ...)` - Ordena os valores de acordo com a coluna X
- `var.withColumnRenamed("columnX", "x")` - Renomeia as colunas do DataFrame
- `var.withColumn("columnX", round("columnX", 2))` - Arredonda os valores para duas casas decimais
- `var.groupBy("columnX", ...)` - Agrupa os elementos sobre uma coluna; usualmente, deve-se adicionar a função de agregação, tais como `.count()`, `.sum("columnY")`, `.mean("columnY")`, etc.

[> Back to Table of Contents](#ToC)

**Drill-down Query**

This query is characterized by providing more detailed aggregation levels for the data. In the context Stock Market, we can use a drill-down query to analyze the differences in the behavior of the average stock value when we analyze the time dimension under different granularities going from the largest grain to the smallest grain (e.g., years, semesters, months). The analysis of different granularities can contribute to the perception of short-, medium- and long-term trends of a stock, and is therefore extremely relevant for investidors in their decision-making on which stock to invest.

**Question**: What was the average value of the LAME3 option in 2020? Its average value in the first half of this year? Its observed average value in February of the same year?

In [12]:
# Avg value in 2020
print('Avg value in 2020')
dim_company\
    .where("stockCodeCompany == 'LAME3'")\
    .join(stocks, on="keyCompany")\
    .join(dim_time, on="keyTime")\
    .where("yearTime == 2020")\
    .groupBy("yearTime").mean("closeValueStock")\
    .withColumnRenamed("avg(closeValueStock)", "AvgValue")\
    .withColumn("AvgValue", round("AvgValue", 2))\
    .show()

# Avg value in 2020, first semester
print('Avg value in 2020, First Semester')
dim_company\
    .where("stockCodeCompany == 'LAME3'")\
    .join(stocks, on="keyCompany")\
    .join(dim_time, on="keyTime")\
    .where("yearTime == 2020 and semesterTime == 1")\
    .groupBy("yearTime").mean("closeValueStock")\
    .withColumnRenamed("avg(closeValueStock)", "AvgValue")\
    .withColumn("AvgValue", round("AvgValue", 2))\
    .show()

# Avg value in 2020, Feb
print('Avg value in 2020, Feb')
dim_company\
    .where("stockCodeCompany == 'LAME3'")\
    .join(stocks, on="keyCompany")\
    .join(dim_time, on="keyTime")\
    .where("yearTime == 2020 and monthTime == 2")\
    .groupBy("yearTime").mean("closeValueStock")\
    .withColumnRenamed("avg(closeValueStock)", "AvgValue")\
    .withColumn("AvgValue", round("AvgValue", 2))\
    .show()

Avg value in 2020


                                                                                

+--------+--------+
|yearTime|AvgValue|
+--------+--------+
|    2020|   23.09|
+--------+--------+

Avg value in 2020, First Semester


                                                                                

+--------+--------+
|yearTime|AvgValue|
+--------+--------+
|    2020|   21.24|
+--------+--------+

Avg value in 2020, Feb




+--------+--------+
|yearTime|AvgValue|
+--------+--------+
|    2020|   22.81|
+--------+--------+



                                                                                

<a id="spark_sql"></a>

# Spark SQL

In this case, the query will be answered using the textual SQL language and the `spark.sql()` method.

```python
# example
spark.sql("query").show()
```

[> Back to Table of Contents](#ToC)

**Drill-across Query**

This query is characterized by comparing two, or more, distinct measures related by some a common dimension. Let's run a Drill-across query in our application to compare numerical measures of stock value with the dollar exchange rate in the time dimension. In this way, the user can assess whether the current stock price appeals to him, or whether there is greater benefit from international investments.

**Question**: What is the average LAME3 value and average dollar for each year?

In [13]:
# Spark SQL
query = """
    SELECT t.yearTime, cc.abbrevCoin, FORMAT_NUMBER(AVG(c.valueCoin), 2) AS avgValueCoin
    FROM coins AS c
    INNER JOIN dim_coin AS cc
        ON c.KeyCoin = cc.KeyCoin
    INNER JOIN dim_time AS t
        ON c.KeyTime = t.KeyTime
    WHERE
        cc.abbrevCoin == 'USD'
    GROUP BY t.yearTime, cc.abbrevCoin
    ORDER BY t.yearTime DESC
"""
spark.sql(query).show()

+--------+----------+------------+
|yearTime|abbrevCoin|avgValueCoin|
+--------+----------+------------+
|    2012|       USD|        5.12|
|    2011|       USD|        3.92|
|    2010|       USD|        3.28|
|    2009|       USD|        3.56|
|    2008|       USD|        2.62|
|    2007|       USD|        2.16|
|    2006|       USD|        1.79|
|    2005|       USD|        1.74|
|    2004|       USD|        1.99|
|    2003|       USD|        1.91|
|    2002|       USD|        2.23|
|    2001|       USD|        2.83|
|    2000|       USD|        3.19|
|    1999|       USD|        2.41|
|    1998|       USD|        1.86|
|    1997|       USD|        1.41|
|    1996|       USD|        1.07|
|    1995|       USD|        0.96|
|    1994|       USD|        0.87|
+--------+----------+------------+



In [14]:
# Spark SQL
query = """
    SELECT t.yearTime, c.stockCodeCompany, FORMAT_NUMBER(AVG(s.closeValueStock), 2) AS avgValueStock
    FROM stocks AS s
    INNER JOIN dim_company AS c
        ON s.keyCompany = c.KeyCompany
    INNER JOIN dim_time AS t
        ON s.KeyTime = t.KeyTime
    WHERE
        c.stockCodeCompany == 'LAME3'
    GROUP BY t.yearTime, c.stockCodeCompany
    ORDER BY t.yearTime DESC
"""
spark.sql(query).show()



+--------+----------------+-------------+
|yearTime|stockCodeCompany|avgValueStock|
+--------+----------------+-------------+
|    2020|           LAME3|        23.09|
|    2019|           LAME3|        14.46|
|    2018|           LAME3|        13.13|
|    2017|           LAME3|        13.46|
|    2016|           LAME3|        12.46|
|    2015|           LAME3|        12.67|
|    2014|           LAME3|        12.52|
|    2013|           LAME3|        15.06|
|    2012|           LAME3|        14.49|
|    2011|           LAME3|        12.64|
|    2010|           LAME3|        11.66|
|    2009|           LAME3|         8.20|
|    2008|           LAME3|        11.54|
|    2007|           LAME3|        82.41|
|    2006|           LAME3|        97.33|
|    2005|           LAME3|        61.25|
|    2004|           LAME3|        33.57|
|    2003|           LAME3|        14.85|
|    2002|           LAME3|         6.51|
|    2001|           LAME3|         3.20|
+--------+----------------+-------

                                                                                

In [15]:
# Spark SQL
query = """
SELECT t1.yearTime, t1.abbrevCoin, t1.avgValueCoin, t2.stockCodeCompany, t2.avgValueStock
FROM
    (SELECT t.yearTime, cc.abbrevCoin, FORMAT_NUMBER(AVG(c.valueCoin), 2) AS avgValueCoin
    FROM coins AS c
    INNER JOIN dim_coin AS cc
        ON c.KeyCoin = cc.KeyCoin
    INNER JOIN dim_time AS t
        ON c.KeyTime = t.KeyTime
    WHERE
        cc.abbrevCoin == 'USD'
    GROUP BY t.yearTime, cc.abbrevCoin
    ORDER BY t.yearTime DESC) AS t1,
    (SELECT t.yearTime, c.stockCodeCompany, FORMAT_NUMBER(AVG(s.closeValueStock), 2) AS avgValueStock
    FROM stocks AS s
    INNER JOIN dim_company AS c
        ON s.keyCompany = c.KeyCompany
    INNER JOIN dim_time AS t
        ON s.KeyTime = t.KeyTime
    WHERE
        c.stockCodeCompany == 'LAME3'
    GROUP BY t.yearTime, c.stockCodeCompany
    ORDER BY t.yearTime DESC) AS t2
WHERE
    t1.yearTime == t2.yearTime
ORDER BY t1.yearTime DESC
"""
spark.sql(query).show()

                                                                                

+--------+----------+------------+----------------+-------------+
|yearTime|abbrevCoin|avgValueCoin|stockCodeCompany|avgValueStock|
+--------+----------+------------+----------------+-------------+
|    2012|       USD|        5.12|           LAME3|        14.49|
|    2011|       USD|        3.92|           LAME3|        12.64|
|    2010|       USD|        3.28|           LAME3|        11.66|
|    2009|       USD|        3.56|           LAME3|         8.20|
|    2008|       USD|        2.62|           LAME3|        11.54|
|    2007|       USD|        2.16|           LAME3|        82.41|
|    2006|       USD|        1.79|           LAME3|        97.33|
|    2005|       USD|        1.74|           LAME3|        61.25|
|    2004|       USD|        1.99|           LAME3|        33.57|
|    2003|       USD|        1.91|           LAME3|        14.85|
|    2002|       USD|        2.23|           LAME3|         6.51|
|    2001|       USD|        2.83|           LAME3|         3.20|
|    2000|