# Instalando o Spark

In [None]:
!pip install pyspark #==3.3.1

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m16.6 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=84313375126cdcb25ab973bc0b2a595e9da6f2c64af3d467c9ef6d2e497e856f
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip

--2023-01-12 01:25:40--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 54.161.241.46, 18.205.222.128, 52.202.168.65, ...
Connecting to bin.equinox.io (bin.equinox.io)|54.161.241.46|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 13832437 (13M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2023-01-12 01:25:40 (74.0 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [13832437/13832437]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   


# Iniciar Sessão Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

# ConfigureSparkUI
conf = SparkConf().set('spark.ui.port', '4050')
sc = SparkContext(conf=conf)
sc.stop()

spark = (
    SparkSession.builder                  # Método da classe que constrói a sessão spark
      .appName("Meu Primeiro App Spark")  # Nome do App Spark
      .getOrCreate())                     # Verifica se há uma sessão ativa, e se não há, cria uma nova sessão


In [None]:
!curl -s http://localhost:4040/api/tunnels 

# SparkSQL

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, StringType, TimestampType

caminho_csv = "./base_de_dados.csv"

schema_base_pix = StructType([
    StructField('id', IntegerType()),
    StructField('valor', DoubleType()),
    StructField('parte_debitada_nome', StringType()),
    StructField('parte_debitada_conta', StringType()),
    StructField('parte_debitada_banco', StringType()),
    StructField('parte_creditada_nome', StringType()),
    StructField('parte_creditada_conta', StringType()),
    StructField('parte_creditada_banco', StringType()),
    StructField('chave_pix_tipo', StringType()),
    StructField('chave_pix_valor', StringType()),
    StructField('data_transacao', TimestampType())
])

df = spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_base_pix,
    timestampFormat="dd/MM/yyyy HH:mm"
)
spark.read.csv(
    path=caminho_csv,
    header=True,
    sep=";",
    schema=schema_base_pix,
    timestampFormat="dd/MM/yyyy HH:mm"
).createOrReplaceTempView("base_pix")

In [None]:
spark.sql("select * from base_pix limit 3").show()

+---+-----+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
| id|valor| parte_debitada_nome|parte_debitada_conta|parte_debitada_banco|parte_creditada_nome|parte_creditada_conta|parte_creditada_banco|chave_pix_tipo|chave_pix_valor|     data_transacao|
+---+-----+--------------------+--------------------+--------------------+--------------------+---------------------+---------------------+--------------+---------------+-------------------+
|  1| 9.93|Dra. Ana Carolina...|            79470453|              Nubank|       Maysa da Cruz|             67162333|                 Itau|           cpf|     8439752610|2022-02-18 13:28:00|
|  2|15.38|        Ana Caldeira|            19689668|                Itau|        Evelyn Sales|             60005091|             Bradesco|           cpf|    27145380617|2022-04-08 01:47:00|
|  3|57.58|    Arthur Goncalves|            1

Porém, como saber se a manipulação de dados com Dataframes não é mais rápida que SQL?

Para isso vamos propor um group by das duas maneiras e verificar qual é o plano de execução que o spark cria. 


In [None]:
group_sql = spark.sql("select chave_pix_tipo, count(1) from base_pix group by chave_pix_tipo")

In [None]:
group_dataframe = df.groupBy('chave_pix_tipo').count()

In [None]:
print("SQL Group")
group_sql.explain()

print("DataFrame Group")
group_dataframe.explain()

SQL Group
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix_tipo#123], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix_tipo#123, 200), ENSURE_REQUIREMENTS, [plan_id=97]
      +- HashAggregate(keys=[chave_pix_tipo#123], functions=[partial_count(1)])
         +- FileScan csv [chave_pix_tipo#123] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/content/base_de_dados.csv], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<chave_pix_tipo:string>


DataFrame Group
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- HashAggregate(keys=[chave_pix_tipo#101], functions=[count(1)])
   +- Exchange hashpartitioning(chave_pix_tipo#101, 200), ENSURE_REQUIREMENTS, [plan_id=110]
      +- HashAggregate(keys=[chave_pix_tipo#101], functions=[partial_count(1)])
         +- FileScan csv [chave_pix_tipo#101] Batched: false, DataFilters: [], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/co

In [None]:
spark.sql(
  """
    select chave_pix_tipo, sum(valor) 
    from base_pix 
    group by 1
  """
).show()

+--------------+------------------+
|chave_pix_tipo|        sum(valor)|
+--------------+------------------+
|       celular|         207778.46|
|         email|499009.38000000006|
|           cpf| 659513.3499999997|
+--------------+------------------+



In [None]:
spark.sql(
  """
    select chave_pix_tipo, round(sum(valor), 2)
    from base_pix 
    group by 1
  """
).show()

+--------------+--------------------+
|chave_pix_tipo|round(sum(valor), 2)|
+--------------+--------------------+
|       celular|           207778.46|
|         email|           499009.38|
|           cpf|           659513.35|
+--------------+--------------------+



In [None]:
spark.sql(
  """
    select chave_pix_tipo, round(sum(valor), 2) as sum_valor
    from base_pix 
    group by 1
  """
).show()

+--------------+---------+
|chave_pix_tipo|sum_valor|
+--------------+---------+
|       celular|207778.46|
|         email|499009.38|
|           cpf|659513.35|
+--------------+---------+



In [None]:
spark.sql(
  """
    select chave_pix_tipo, count(*) as count
    from base_pix 
    group by 1
  """
).show()

+--------------+-----+
|chave_pix_tipo|count|
+--------------+-----+
|       celular|   22|
|         email|   29|
|           cpf|   49|
+--------------+-----+



PARA AQUI

In [None]:
spark.sql(
    """
      select
        destinatario.banco,
        valor,
        row_number() over (partition by destinatario.banco order by valor desc) as row_number
      from transacoes_pix
      limit 10
).show()

+---------------------+-------------------+---------+
|parte_creditada_banco|     data_transacao|RowNumber|
+---------------------+-------------------+---------+
|                  BTG|2022-12-08 23:47:00|        1|
|                  BTG|2022-11-29 06:17:00|        2|
|                  BTG|2022-11-04 01:50:00|        3|
|                  BTG|2022-08-26 00:56:00|        4|
|                  BTG|2022-08-24 15:39:00|        5|
|                  BTG|2022-08-04 00:48:00|        6|
|                  BTG|2022-07-30 19:56:00|        7|
|                  BTG|2022-07-18 22:46:00|        8|
|                  BTG|2022-07-14 03:18:00|        9|
|                  BTG|2022-07-03 23:37:00|       10|
|                  BTG|2022-06-05 12:14:00|       11|
|                  BTG|2022-05-23 06:48:00|       12|
|                  BTG|2022-05-06 11:33:00|       13|
|                  BTG|2022-04-28 16:47:00|       14|
|                  BTG|2022-02-26 15:05:00|       15|
|                  BTG|2022-

CTE stands for common table expression. A CTE allows you to define a temporary named result set that available temporarily in the execution scope of a statement such as SELECT, INSERT, UPDATE, DELETE, or MERGE

In [None]:
spark.sql(
  """
  with base_pix_row_number as(
    select
      parte_creditada_banco, 
      data_transacao,
      row_number() over (partition by parte_creditada_banco order by data_transacao desc) as row_number
    from base_pix
  ) select
      parte_creditada_banco,
      data_transacao
    from base_pix_row_number
    where row_number = 1
    order by data_transacao desc
  """
).show()

+---------------------+-------------------+
|parte_creditada_banco|     data_transacao|
+---------------------+-------------------+
|                 Itau|2022-12-15 01:29:00|
|                  BTG|2022-12-08 23:47:00|
|               Nubank|2022-11-19 19:25:00|
|             Bradesco|2022-08-07 17:01:00|
+---------------------+-------------------+



Porém, não precisa ficar limitado somente a execução de queries SQL. 

Podemos pegar o resultado de uma query e retorná-la para um DataFrame!

In [None]:
df_window = spark.sql(
  """
  with base_pix_row_number as(
    select
      parte_creditada_banco, 
      data_transacao,
      row_number() over (partition by parte_creditada_banco order by data_transacao desc) as row_number
    from base_pix
  ) select
      parte_creditada_banco,
      data_transacao
    from base_pix_row_number
    where row_number = 1
    order by data_transacao desc
  """
)

In [None]:
df_window.show()

+---------------------+-------------------+
|parte_creditada_banco|     data_transacao|
+---------------------+-------------------+
|                 Itau|2022-12-15 01:29:00|
|                  BTG|2022-12-08 23:47:00|
|               Nubank|2022-11-19 19:25:00|
|             Bradesco|2022-08-07 17:01:00|
+---------------------+-------------------+



This opens up the true power of Spark. We can treat selectExpr as a simple way to build up
complex expressions that create new DataFrames. In fact, we can add any valid non-aggregating
SQL statement, and as long as the columns resolve, it will be valid!

In [None]:
from pyspark.sql.functions import col

In [None]:
df.selectExpr(
    "date(data_transacao) as date_data_transacao",
    "va"
).groupBy('date_data_transacao').count().orderBy(col('count').desc()).show()

+-------------------+-----+
|date_data_transacao|count|
+-------------------+-----+
|         2022-02-26|    2|
|         2022-03-02|    2|
|         2021-06-22|    1|
|         2022-11-29|    1|
|         2021-02-15|    1|
|         2022-02-16|    1|
|         2021-07-20|    1|
|         2022-01-09|    1|
|         2021-03-22|    1|
|         2022-04-12|    1|
|         2021-04-25|    1|
|         2021-03-07|    1|
|         2022-01-15|    1|
|         2022-05-23|    1|
|         2022-02-01|    1|
|         2021-07-11|    1|
|         2022-06-05|    1|
|         2021-09-06|    1|
|         2021-06-20|    1|
|         2021-12-14|    1|
+-------------------+-----+
only showing top 20 rows



Exercício
1. Vimos que há dois dias em que houve duas transações pix. Descubra são os ids dessas transações.

2. Vimos que há dois dias em que houve duas transações pix. Descubra quais chaves pix foram utilizadas para realizar as transações. 

In [None]:
lista_datas = spark.sql(
  """
  select
    date(data_transacao) as date_data_transacao
  from base_pix
  group by 1
  having count(*) > 1
  """
).collect()[0][0]
lista_datas

datetime.date(2022, 2, 26)