# Extração das Transações do Bitcoin e Análise com PySpark

In [1]:
# https://pypi.org/project/findspark/
!pip install -q findspark
!pip install yfinance



In [1]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [32]:
# Imports
import pyspark                        
import pandas as pd                                            # manipulação de dados
import numpy as np                                             # manipulação de dados
import seaborn as sns                                          # gráficos
from matplotlib import pyplot as plt                           # gráficos
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession, SQLContext
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.regression import LinearRegression
from pyspark.mllib.evaluation import RegressionMetrics
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import StringIndexer
from pyspark.ml.stat import Correlation
from pyspark.ml.feature import MinMaxScaler
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator, CrossValidatorModel
from pyspark.ml.feature import VectorAssembler, StandardScaler
from pyspark.ml.evaluation import RegressionEvaluator

# Para extrair os dados da criptomoeda
import yfinance as yf
from datetime import datetime

In [33]:
# Definir o símbolo do Bitcoin no Yahoo Finance (BTC-USD)
symbol = "BTC-USD"

# Definir o intervalo de datas (exemplo: 2017-01-01 até 2023-12-31)
start_date = "2014-09-01"
end_date = datetime.now().strftime("%Y-%m-%d")  

# Obter os dados históricos
data = yf.download(symbol, start = start_date, end = end_date)

# Exibir as primeiras linhas do DataFrame
print(data.head())

# Salvar os dados em um arquivo CSV
data.to_csv("dados/bitcoin_historico_yfinance.csv")


[*********************100%***********************]  1 of 1 completed

Price            Close        High         Low        Open    Volume
Ticker         BTC-USD     BTC-USD     BTC-USD     BTC-USD   BTC-USD
Date                                                                
2014-09-17  457.334015  468.174011  452.421997  465.864014  21056800
2014-09-18  424.440002  456.859985  413.104004  456.859985  34483200
2014-09-19  394.795990  427.834991  384.532013  424.102997  37919700
2014-09-20  408.903992  423.295990  389.882996  394.673004  36863600
2014-09-21  398.821014  412.425995  393.181000  408.084991  26580100





In [5]:
# Formatação das saídas
pd.set_option('display.max_columns', 200)
pd.set_option('display.max_colwidth', 400)
from matplotlib.axes._axes import _log as matplotlib_axes_logger
matplotlib_axes_logger.setLevel('ERROR')

In [6]:
# Versões dos pacotes usados neste jupyter notebook
%reload_ext watermark
%watermark -a "Pedro Valadares Júnior" --iversions

Author: Pedro Valadares Júnior

matplotlib: 3.10.0
py4j      : 0.10.9.7
seaborn   : 0.13.2
numpy     : 2.2.0
pandas    : 2.2.3
decimal   : 1.70
sys       : 3.13.1 (main, Dec 15 2024, 14:06:39) [GCC 12.2.0]
findspark : 2.0.1
pyspark   : 3.5.0
yfinance  : 0.2.51



In [7]:
# Definindo semente aleatória (seed) para reprodutibilidade do notebook
rnd_seed = 23
np.random.seed = rnd_seed
np.random.set_state = rnd_seed

In [8]:
# Criando o Spark Context(em Spark, tudo que crio é um App)

sc = SparkContext(appName = 'Estudo_PySpark')


# O código abaixo abre uma conexão para o meu cluster. Observe que não identificamos o 
# cluster, quando não identificamos, ele cria automaticamente no cluster local(minha máquina).

25/01/01 22:33:12 WARN Utils: Your hostname, debian resolves to a loopback address: 127.0.1.1; using 192.168.15.62 instead (on interface wlp1s0)
25/01/01 22:33:12 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/01/01 22:33:13 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [9]:
spark_session = SparkSession.Builder().getOrCreate()

In [10]:
spark_session

### LOADING

In [11]:
df_spark = spark_session.read.csv('dados/bitcoin_historico_yfinance.csv', header = 'true', inferSchema = 'true')

In [12]:
# Exibe as primeiras linhas
df_spark.show()

+----------+------------------+------------------+------------------+------------------+--------+
|     Price|             Close|              High|               Low|              Open|  Volume|
+----------+------------------+------------------+------------------+------------------+--------+
|    Ticker|           BTC-USD|           BTC-USD|           BTC-USD|           BTC-USD| BTC-USD|
|      Date|              NULL|              NULL|              NULL|              NULL|    NULL|
|2014-09-17| 457.3340148925781|468.17401123046875| 452.4219970703125|  465.864013671875|21056800|
|2014-09-18|424.44000244140625| 456.8599853515625|   413.10400390625| 456.8599853515625|34483200|
|2014-09-19| 394.7959899902344| 427.8349914550781| 384.5320129394531| 424.1029968261719|37919700|
|2014-09-20|408.90399169921875| 423.2959899902344|389.88299560546875| 394.6730041503906|36863600|
|2014-09-21| 398.8210144042969| 412.4259948730469| 393.1809997558594| 408.0849914550781|26580100|
|2014-09-22| 402.152

In [13]:
# Retorna o tipo de cada coluna
df_spark.printSchema()

root
 |-- Price: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- Volume: string (nullable = true)



In [14]:
# Exibe um sumário estatístico
df_spark.describe().show()

25/01/01 22:33:57 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
[Stage 3:>                                                          (0 + 1) / 1]

+-------+----------+------------------+------------------+------------------+-----------------+--------------------+
|summary|     Price|             Close|              High|               Low|             Open|              Volume|
+-------+----------+------------------+------------------+------------------+-----------------+--------------------+
|  count|      3761|              3760|              3760|              3760|             3760|                3760|
|   mean|      NULL|19585.858209697766|20003.126694951487|19103.286843728372|19562.39062133042|1.858298119511146...|
| stddev|      NULL|22126.525736753556|22584.216017278886| 21597.38067931596|  22097.867871999|2.034322712413484...|
|    min|2014-09-17|    100041.5390625|1002.0999755859375|  10000.7080078125|   100046.6484375|         10014858959|
|    max|    Ticker|           BTC-USD|           BTC-USD|           BTC-USD|          BTC-USD|             BTC-USD|
+-------+----------+------------------+------------------+------

                                                                                

In [15]:
# Contando as linhas
linhas = df_spark.count()
print(f'Total de linhas: {linhas}')

Total de linhas: 3761


In [16]:
# Retornando os nomes das colunas do dataset
df_spark.columns

['Price', 'Close', 'High', 'Low', 'Open', 'Volume']

### DATA CLEANING

In [17]:
# Excluindo valores nulos
df_spark = df_spark.na.drop()

In [18]:
# Checando se os valore nulos foram excluidos
null_counts = df_spark.select(
    
    [sum(col(c).isNull().cast("int")).alias(c) for c in df_spark.columns]
)

null_counts.show()

# c representa o nome de cada coluna de df_spark
# col() é uma função pyspark que cria referência para a coluna com o nome c
# col(c).isNull() verifica se a coluna contém nulos ou não. (booleano)
# cast("int") converte o boleano em um número inteiro (0 ou 1)
# sum() soma todos os valores para cada coluna, que agora são 1 ou 0.
# alias(c) ao resultado da soma usa o nome da coluna atual(original)

+-----+-----+----+---+----+------+
|Price|Close|High|Low|Open|Volume|
+-----+-----+----+---+----+------+
|    0|    0|   0|  0|   0|     0|
+-----+-----+----+---+----+------+



In [19]:
df_spark.printSchema()

root
 |-- Price: string (nullable = true)
 |-- Close: string (nullable = true)
 |-- High: string (nullable = true)
 |-- Low: string (nullable = true)
 |-- Open: string (nullable = true)
 |-- Volume: string (nullable = true)



In [20]:
# Renomeando coluna
df_spark = df_spark.withColumnRenamed('Adj Close', 'Adj_Close').withColumnRenamed('Price', 'Date')
df_spark.columns

['Date', 'Close', 'High', 'Low', 'Open', 'Volume']

### DATA TRANSFORMATION

In [21]:
# Usando select() para retornar colunas específicas
df_spark.select('Date', 'Open', 'Low', 'High').show()

+----------+------------------+------------------+------------------+
|      Date|              Open|               Low|              High|
+----------+------------------+------------------+------------------+
|    Ticker|           BTC-USD|           BTC-USD|           BTC-USD|
|2014-09-17|  465.864013671875| 452.4219970703125|468.17401123046875|
|2014-09-18| 456.8599853515625|   413.10400390625| 456.8599853515625|
|2014-09-19| 424.1029968261719| 384.5320129394531| 427.8349914550781|
|2014-09-20| 394.6730041503906|389.88299560546875| 423.2959899902344|
|2014-09-21| 408.0849914550781| 393.1809997558594| 412.4259948730469|
|2014-09-22| 399.1000061035156| 397.1300048828125| 406.9159851074219|
|2014-09-23| 402.0920104980469| 396.1969909667969| 441.5570068359375|
|2014-09-24| 435.7510070800781| 421.1319885253906|436.11199951171875|
|2014-09-25|  423.156005859375| 409.4679870605469| 423.5199890136719|
|2014-09-26| 411.4289855957031| 400.0090026855469|   414.93798828125|
|2014-09-27| 403.555

In [22]:
# Criando uma nova coluna com a diferença entre a alta e a baixa
df_spark.withColumn('High-Low', col("High") - col("Low")).show()

+----------+------------------+------------------+------------------+------------------+--------+------------------+
|      Date|             Close|              High|               Low|              Open|  Volume|          High-Low|
+----------+------------------+------------------+------------------+------------------+--------+------------------+
|    Ticker|           BTC-USD|           BTC-USD|           BTC-USD|           BTC-USD| BTC-USD|              NULL|
|2014-09-17| 457.3340148925781|468.17401123046875| 452.4219970703125|  465.864013671875|21056800| 15.75201416015625|
|2014-09-18|424.44000244140625| 456.8599853515625|   413.10400390625| 456.8599853515625|34483200|  43.7559814453125|
|2014-09-19| 394.7959899902344| 427.8349914550781| 384.5320129394531| 424.1029968261719|37919700|   43.302978515625|
|2014-09-20|408.90399169921875| 423.2959899902344|389.88299560546875| 394.6730041503906|36863600|33.412994384765625|
|2014-09-21| 398.8210144042969| 412.4259948730469| 393.180999755

### Verificando as Datas de Transações

In [28]:
# Registra o DataFrame como uma tabela temporária
df_spark.createOrReplaceTempView("tabela_datas")

In [31]:
data_amplitude = spark_session.sql(
    """
        SELECT 
            MIN(DATE(Date)) AS Primeira_Negociacao,
            MAX(DATE(Date)) AS Ultima_Negociacao
        FROM
            tabela_datas
    """)

data_amplitude.show()

+-------------------+-----------------+
|Primeira_Negociacao|Ultima_Negociacao|
+-------------------+-----------------+
|         2014-09-17|       2024-12-31|
+-------------------+-----------------+



In [66]:
# Filtrando os dados de volumes de transações acima de de 20 milhões.
df_spark.filter(df_spark['Volume'] > 20000000).show()

+----------+------------------+------------------+------------------+------------------+--------+
|      Date|             Close|              High|               Low|              Open|  Volume|
+----------+------------------+------------------+------------------+------------------+--------+
|2014-09-17| 457.3340148925781|468.17401123046875| 452.4219970703125|  465.864013671875|21056800|
|2014-09-18|424.44000244140625| 456.8599853515625|   413.10400390625| 456.8599853515625|34483200|
|2014-09-19| 394.7959899902344| 427.8349914550781| 384.5320129394531| 424.1029968261719|37919700|
|2014-09-20|408.90399169921875| 423.2959899902344|389.88299560546875| 394.6730041503906|36863600|
|2014-09-21| 398.8210144042969| 412.4259948730469| 393.1809997558594| 408.0849914550781|26580100|
|2014-09-22| 402.1520080566406| 406.9159851074219| 397.1300048828125| 399.1000061035156|24127600|
|2014-09-23| 435.7909851074219| 441.5570068359375| 396.1969909667969| 402.0920104980469|45099500|
|2014-09-24| 423.204

In [7]:
# Finalize a sessão spark antes de começar outra.
df_spark.stop

NameError: name 'df_spark' is not defined