# Spark - Brazil Stock Market

how to use Spark for analytical analisys. We are using [Brazil Stock Market - Data Warehouse](https://www.kaggle.com/datasets/leomauro/brazilian-stock-market-data-warehouse) for data analysis.





In [None]:
#install Apache Spark
!pip install pyspark --quiet

In [None]:
import os

for dirname, _, filenames in os.walk('/kaggle/input'):
    for filename in filenames:
        print(os.path.join(dirname, filename))

In [None]:
# starting new ambient
from pyspark.sql import SparkSession
from pyspark.sql.functions import round, desc

spark = SparkSession.builder.appName("pyspark-notebook").master("local[*]").getOrCreate()

In [None]:
import pandas as pd
pd.set_option('display.float_format', lambda x: '%.2f' % x)

# loading Spark DataFrames
stocks = spark.read.csv(path='/kaggle/input/brazilian-stock-market-data-warehouse/factStocks.csv', header=True, sep=",")
coins = spark.read.csv(path='/kaggle/input/brazilian-stock-market-data-warehouse/factCoins.csv', header=True, sep=",")
dim_coin = spark.read.csv(path='/kaggle/input/brazilian-stock-market-data-warehouse/dimCoin.csv', header=True, sep=",")
dim_company = spark.read.csv(path='/kaggle/input/brazilian-stock-market-data-warehouse/dimCompany.csv', header=True, sep=",")
dim_time = spark.read.csv(path='/kaggle/input/brazilian-stock-market-data-warehouse/dimTime.csv', header=True, sep=",")

Casting data types, using `FloatType` and `IntegerType`. By default, every column is string.

In [None]:
from pyspark.sql.types import FloatType
from pyspark.sql.types import IntegerType

# casting settings
stocks_casting = {
    'int_columns': ['keyTime', 'keyCompany', 'quantityStock'],
    'float_columns': ['openValueStock', 'closeValueStock', 'highValueStock', 'lowValueStock']
}
coins_casting = {
    'int_columns': ['keyTime', 'keyCoin'],
    'float_columns': ['valueCoin']
}
dim_coin_casting = {
    'int_columns': ['keyCoin']
}
dim_company_casting = {
    'int_columns': ['keyCompany']
}
dim_time_casting = {
    'int_columns': ['keyTime', 'dayTime', 'dayWeekTime', 'monthTime', 'bimonthTime', 'quarterTime', 'semesterTime', 'yearTime']
}

# integer casting
for column in stocks_casting['int_columns']:
    stocks = stocks.withColumn(column, stocks[column].cast(IntegerType()))
for column in coins_casting['int_columns']:
    coins = coins.withColumn(column, coins[column].cast(IntegerType()))
for column in dim_coin_casting['int_columns']:
    dim_coin = dim_coin.withColumn(column, dim_coin[column].cast(IntegerType()))
for column in dim_company_casting['int_columns']:
    dim_company = dim_company.withColumn(column, dim_company[column].cast(IntegerType()))
for column in dim_time_casting['int_columns']:
    dim_time = dim_time.withColumn(column, dim_time[column].cast(IntegerType()))

# float casting
for column in stocks_casting['float_columns']:
    stocks = stocks.withColumn(column, stocks[column].cast(FloatType()))
for column in coins_casting['float_columns']:
    coins = coins.withColumn(column, coins[column].cast(FloatType()))

Load temporary views for Spark SQL.

In [None]:
# loading temporary views, for Spark SQL
stocks.createOrReplaceTempView("stocks")
coins.createOrReplaceTempView("coins")
dim_coin.createOrReplaceTempView("dim_coin")
dim_company.createOrReplaceTempView("dim_company")
dim_time.createOrReplaceTempView("dim_time")

In [None]:
stocks.show(5)

In [None]:
coins.show(5)

In [None]:
dim_coin.show(5)

In [None]:
dim_company.show(5)

In [None]:
dim_time.show(5)

<a id="pyspark"></a>

# pyspark.sql

What are the common methods?
- `var.schema` - Check the table schema
- `var.join(var, on="key")` - Star join between two DataFrames
- `var.where("query")` - Realiza uma filtragem sobre os dados
- `var.orderBy(desc("columnX"), ...)` - Ordena os valores de acordo com a coluna X
- `var.withColumnRenamed("columnX", "x")` - Renomeia as colunas do DataFrame
- `var.withColumn("columnX", round("columnX", 2))` - Arredonda os valores para duas casas decimais
- `var.groupBy("columnX", ...)` - Agrupa os elementos sobre uma coluna; usualmente, deve-se adicionar a função de agregação, tais como `.count()`, `.sum("columnY")`, `.mean("columnY")`, etc.



**Drill-down Query**

This query is characterized by providing more detailed aggregation levels for the data. In the context Stock Market, we can use a drill-down query to analyze the differences in the behavior of the average stock value when we analyze the time dimension under different granularities going from the largest grain to the smallest grain (e.g., years, semesters, months). The analysis of different granularities can contribute to the perception of short-, medium- and long-term trends of a stock, and is therefore extremely relevant for investidors in their decision-making on which stock to invest.

**Question**: What was the average value of the LAME3 option in 2020? Its average value in the first half of this year? Its observed average value in February of the same year?

In [None]:
# Avg value in 2020
print('Avg value in 2020')
dim_company\
    .where("stockCodeCompany == 'LAME3'")\
    .join(stocks, on="keyCompany")\
    .join(dim_time, on="keyTime")\
    .where("yearTime == 2020")\
    .groupBy("yearTime").mean("closeValueStock")\
    .withColumnRenamed("avg(closeValueStock)", "AvgValue")\
    .withColumn("AvgValue", round("AvgValue", 2))\
    .show()

# Avg value in 2020, first semester
print('Avg value in 2020, First Semester')
dim_company\
    .where("stockCodeCompany == 'LAME3'")\
    .join(stocks, on="keyCompany")\
    .join(dim_time, on="keyTime")\
    .where("yearTime == 2020 and semesterTime == 1")\
    .groupBy("yearTime").mean("closeValueStock")\
    .withColumnRenamed("avg(closeValueStock)", "AvgValue")\
    .withColumn("AvgValue", round("AvgValue", 2))\
    .show()

# Avg value in 2020, Feb
print('Avg value in 2020, Feb')
dim_company\
    .where("stockCodeCompany == 'LAME3'")\
    .join(stocks, on="keyCompany")\
    .join(dim_time, on="keyTime")\
    .where("yearTime == 2020 and monthTime == 2")\
    .groupBy("yearTime").mean("closeValueStock")\
    .withColumnRenamed("avg(closeValueStock)", "AvgValue")\
    .withColumn("AvgValue", round("AvgValue", 2))\
    .show()

<a id="spark_sql"></a>

# Spark SQL

In this case, the query will be answered using the textual SQL language and the `spark.sql()` method.

```python
# example
spark.sql("query").show()
```

**Drill-across Query**

This query is characterized by comparing two, or more, distinct measures related by some a common dimension. Let's run a Drill-across query in our application to compare numerical measures of stock value with the dollar exchange rate in the time dimension. In this way, the user can assess whether the current stock price appeals to him, or whether there is greater benefit from international investments.

**Question**: What is the average LAME3 value and average dollar for each year?

In [None]:
# Spark SQL
query = """
    SELECT t.yearTime, cc.abbrevCoin, FORMAT_NUMBER(AVG(c.valueCoin), 2) AS avgValueCoin
    FROM coins AS c
    INNER JOIN dim_coin AS cc
        ON c.KeyCoin = cc.KeyCoin
    INNER JOIN dim_time AS t
        ON c.KeyTime = t.KeyTime
    WHERE
        cc.abbrevCoin == 'USD'
    GROUP BY t.yearTime, cc.abbrevCoin
    ORDER BY t.yearTime DESC
"""
spark.sql(query).show()

In [None]:
# Spark SQL
query = """
    SELECT t.yearTime, c.stockCodeCompany, FORMAT_NUMBER(AVG(s.closeValueStock), 2) AS avgValueStock
    FROM stocks AS s
    INNER JOIN dim_company AS c
        ON s.keyCompany = c.KeyCompany
    INNER JOIN dim_time AS t
        ON s.KeyTime = t.KeyTime
    WHERE
        c.stockCodeCompany == 'LAME3'
    GROUP BY t.yearTime, c.stockCodeCompany
    ORDER BY t.yearTime DESC
"""
spark.sql(query).show()

In [None]:
# Spark SQL
query = """
SELECT t1.yearTime, t1.abbrevCoin, t1.avgValueCoin, t2.stockCodeCompany, t2.avgValueStock
FROM
    (SELECT t.yearTime, cc.abbrevCoin, FORMAT_NUMBER(AVG(c.valueCoin), 2) AS avgValueCoin
    FROM coins AS c
    INNER JOIN dim_coin AS cc
        ON c.KeyCoin = cc.KeyCoin
    INNER JOIN dim_time AS t
        ON c.KeyTime = t.KeyTime
    WHERE
        cc.abbrevCoin == 'USD'
    GROUP BY t.yearTime, cc.abbrevCoin
    ORDER BY t.yearTime DESC) AS t1,
    (SELECT t.yearTime, c.stockCodeCompany, FORMAT_NUMBER(AVG(s.closeValueStock), 2) AS avgValueStock
    FROM stocks AS s
    INNER JOIN dim_company AS c
        ON s.keyCompany = c.KeyCompany
    INNER JOIN dim_time AS t
        ON s.KeyTime = t.KeyTime
    WHERE
        c.stockCodeCompany == 'LAME3'
    GROUP BY t.yearTime, c.stockCodeCompany
    ORDER BY t.yearTime DESC) AS t2
WHERE
    t1.yearTime == t2.yearTime
ORDER BY t1.yearTime DESC
"""
spark.sql(query).show()