In [1]:
from pyspark.sql import SparkSession

Inciando a sessão spark, que pode ser vista como uma manifestação do driver process para o usuário 

In [2]:
spark = SparkSession.builder.getOrCreate()


Definindo a leitura do arquivo. 
A leitura de um arquivo, não e uma action dentro do spark, por conta disso, ela será apenas inserida a uma pilha de transformações, que serão todas executadas no momento em que uma action for invocada

Interessante saber que, nesse momento meu df, possui um determinado numero de colunas, porém um indeterminado numero de linhas, isso se dá devido a ação de leitura ser executada em lazy evaluation

In [3]:
df_bitcoin = spark.read.option('inferSchema', 'true'
                        ).option('header', 'true'
                        ).csv('./data/archive/btcusd_1-min_data.csv')
df_bitcoin

DataFrame[Timestamp: double, Open: double, High: double, Low: double, Close: double, Volume: double]

Utilizando a action take

In [4]:
df_bitcoin.take(2)

[Row(Timestamp=1325412060.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412120.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0)]

### Construindo nossa primeira transformação

Aqui, vou realizar uma transformação do tipo sort. 
Por definição a ordenação de uma linha em relação a outras não pode ser tratada como uma narrow transformation.
Uma vez que a ordenação depende das outras linhas, por conta idsso trata-se de uma wide transformations. (quando o processamento é feito em disco)

Utilizando o explain

In [6]:
df_bitcoin.sort('Timestamp').explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [Timestamp#17 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(Timestamp#17 ASC NULLS FIRST, 200), ENSURE_REQUIREMENTS, [plan_id=45]
      +- FileScan csv [Timestamp#17,Open#18,High#19,Low#20,Close#21,Volume#22] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/a:/DS/Spark/learningSpark/data/archive/btcusd_1-min_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Timestamp:double,Open:double,High:double,Low:double,Close:double,Volume:double>




Alternado a configuração do spark, para retornar 5 arquivos de particao como saida 

In [7]:
spark.conf.set('spark.sql.shuffle.partitions', '5')


In [8]:
df_bitcoin.sort('Timestamp').take(8)

[Row(Timestamp=None, Open=57854.0, High=57864.0, Low=57835.0, Close=57835.0, Volume=1.35346619),
 Row(Timestamp=1325412060.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412120.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412180.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412240.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412300.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412360.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0),
 Row(Timestamp=1325412420.0, Open=4.58, High=4.58, Low=4.58, Close=4.58, Volume=0.0)]

## DataFrames and SQL

O spark me permite registrar qualquer dataframe, como uma tabela, para então executar quaisuqer transformações/acoes nesse dataframe via puro sql. 


NÃO HÁ DIFERENÇAS DE PERFOMANCE ENTRE executar queries sql e exeuctar dataframe transformations. 


In [11]:
df_bitcoin.createOrReplaceTempView('bictoin_data')


In [42]:
sqlWay = spark.sql(''' 
                    select 
                        to_date(FROM_UNIXTIME(Timestamp)) as data, 
                        count(*)
                     from bictoin_data
                     group by Timestamp
                     order by Timestamp
                   ''')
dfWay = df_bitcoin.groupBy('Timestamp').count().sort('Timestamp') 


In [43]:
sqlWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Project [data#214, count(1)#216L]
   +- Sort [Timestamp#17 ASC NULLS FIRST], true, 0
      +- Exchange rangepartitioning(Timestamp#17 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=763]
         +- HashAggregate(keys=[Timestamp#17], functions=[count(1)])
            +- Exchange hashpartitioning(Timestamp#17, 5), ENSURE_REQUIREMENTS, [plan_id=760]
               +- HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(Timestamp#17)) AS Timestamp#17], functions=[partial_count(1)])
                  +- FileScan csv [Timestamp#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/a:/DS/Spark/learningSpark/data/archive/btcusd_1-min_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Timestamp:double>




In [44]:
dfWay.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [Timestamp#17 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(Timestamp#17 ASC NULLS FIRST, 5), ENSURE_REQUIREMENTS, [plan_id=784]
      +- HashAggregate(keys=[Timestamp#17], functions=[count(1)])
         +- Exchange hashpartitioning(Timestamp#17, 5), ENSURE_REQUIREMENTS, [plan_id=781]
            +- HashAggregate(keys=[knownfloatingpointnormalized(normalizenanandzero(Timestamp#17)) AS Timestamp#17], functions=[partial_count(1)])
               +- FileScan csv [Timestamp#17] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/a:/DS/Spark/learningSpark/data/archive/btcusd_1-min_data.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<Timestamp:double>




Ao invocar um "Take" estou fazendo com que o Spark execute o plano de transformações. 

Essa estratégia seria interessante (utilizar takes) somente para monitrar se as transformações vem ocorrendo conforme o esperado


In [45]:
sqlWay.take(5)

[Row(data=None, count(1)=1),
 Row(data=datetime.date(2012, 1, 1), count(1)=1),
 Row(data=datetime.date(2012, 1, 1), count(1)=1),
 Row(data=datetime.date(2012, 1, 1), count(1)=1),
 Row(data=datetime.date(2012, 1, 1), count(1)=1)]

In [40]:
dfWay.show()

+------------+-----+
|   Timestamp|count|
+------------+-----+
|        NULL|    1|
|1.32541206E9|    1|
|1.32541212E9|    1|
|1.32541218E9|    1|
|1.32541224E9|    1|
| 1.3254123E9|    1|
|1.32541236E9|    1|
|1.32541242E9|    1|
|1.32541248E9|    1|
|1.32541254E9|    1|
| 1.3254126E9|    1|
|1.32541266E9|    1|
|1.32541272E9|    1|
|1.32541278E9|    1|
|1.32541284E9|    1|
| 1.3254129E9|    1|
|1.32541296E9|    1|
|1.32541302E9|    1|
|1.32541308E9|    1|
|1.32541314E9|    1|
+------------+-----+
only showing top 20 rows



### Brincando com manipulacoes um pouco mais complexas

Quais sao as 5 primeiras datas em que o bitcoin teve sua maior alta?  e  sua maior baixa? 

In [48]:
from pyspark.sql.functions import from_unixtime, to_date

top_five_highersSql = spark.sql(''' 
    select 
        to_date(FROM_UNIXTIME(Timestamp)) as data, 
        high as maior_alta

    from bictoin_data 
    order by high desc   
    limit 5  

''')

topFiveHighersDf = df_bitcoin.withColumn('data', to_date(from_unixtime("Timestamp"))
                                         ).select('data', 'high'
                                         ).withColumnRenamed('high', 'maiorAlta'
                                         ).orderBy('maiorAlta', ascending = False
                                         ).limit(5)



In [49]:
topFiveHighersDf.take(5)

[Row(data=datetime.date(2025, 1, 20), maiorAlta=109030.0),
 Row(data=datetime.date(2025, 1, 20), maiorAlta=109024.0),
 Row(data=datetime.date(2025, 1, 20), maiorAlta=108979.0),
 Row(data=datetime.date(2025, 1, 20), maiorAlta=108939.0),
 Row(data=datetime.date(2025, 1, 20), maiorAlta=108908.0)]

In [50]:
top_five_highersSql.take(5)

[Row(data=datetime.date(2025, 1, 20), maior_alta=109030.0),
 Row(data=datetime.date(2025, 1, 20), maior_alta=109024.0),
 Row(data=datetime.date(2025, 1, 20), maior_alta=108979.0),
 Row(data=datetime.date(2025, 1, 20), maior_alta=108939.0),
 Row(data=datetime.date(2025, 1, 20), maior_alta=108908.0)]