## Window Function

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.window import Window


### Criar/Iniciar sessão do pyspark

In [2]:
spark = (
    SparkSession.builder
    .master('local')
    .appName('aula02')
    .getOrCreate()
)

In [3]:
df = spark.read.csv('../datasets/equipamentos.csv', header=True, inferSchema=True, sep=',')

In [4]:
df.show(5)

+--------------+------+---+-------+--------------------+-----------+--------+--------------------+------+---------------+--------------------+-------------+--------+--------------------+--------------------+
|id_equipamento|  ibge| uf| cidade|                nome|responsavel|telefone|            endereco|numero|    complemento|          referencia|       bairro|     cep|     georef_location|    data_atualizacao|
+--------------+------+---+-------+--------------------+-----------+--------+--------------------+------+---------------+--------------------+-------------+--------+--------------------+--------------------+
|   23001006462|230010| CE|Abaiara|       CRAS I - SEDE|       NULL|    NULL|JOAQUIM LEITE DA ...|   268|           NULL|PRÓXIMO A SECRETA...|       CENTRO|63240000|-7.36175927968484...|2024-05-09 01:00:...|
|   23001020547|230010| CE|Abaiara|             CRAS II|       NULL|    NULL|CE 393, SITIO BRE...|     0|           NULL|PRÓXIMO A CAPELA ...|Vila São José|63240000|-7.

In [5]:
df.printSchema()

root
 |-- id_equipamento: long (nullable = true)
 |-- ibge: integer (nullable = true)
 |-- uf: string (nullable = true)
 |-- cidade: string (nullable = true)
 |-- nome: string (nullable = true)
 |-- responsavel: string (nullable = true)
 |-- telefone: string (nullable = true)
 |-- endereco: string (nullable = true)
 |-- numero: integer (nullable = true)
 |-- complemento: string (nullable = true)
 |-- referencia: string (nullable = true)
 |-- bairro: string (nullable = true)
 |-- cep: integer (nullable = true)
 |-- georef_location: string (nullable = true)
 |-- data_atualizacao: timestamp (nullable = true)



In [6]:
for column in df.columns:
    print(column, df.filter(df[column].isNull()).count())

id_equipamento 0
ibge 0
uf 0
cidade 0
nome 0
responsavel 406
telefone 406
endereco 0
numero 0
complemento 315
referencia 135
bairro 0
cep 0
georef_location 13
data_atualizacao 0


In [7]:
df.filter(col('georef_location').isNull()).show()

+--------------+------+---+---------------+--------------------+-----------+--------+--------------------+------+-----------+--------------------+--------------------+--------+---------------+--------------------+
|id_equipamento|  ibge| uf|         cidade|                nome|responsavel|telefone|            endereco|numero|complemento|          referencia|              bairro|     cep|georef_location|    data_atualizacao|
+--------------+------+---+---------------+--------------------+-----------+--------+--------------------+------+-----------+--------------------+--------------------+--------+---------------+--------------------+
|   23021039721|230210| CE|       Baturité|CRAS - CONSELHEIR...|       NULL|    NULL|FRANCISCO BRAGA F...|     0|       NULL|VIZINHO A SECRETA...|CONSELHEIRO ESTELITA|62760000|           NULL|2024-05-09 01:00:...|
|   23027039321|230270| CE|   Campos Sales|CRAS - Miguel Cortez|       NULL|    NULL|Conjunto Lindalva...|     0|       NULL|                NUL

In [8]:
df.show()

+--------------+------+---+-----------------+--------------------+-----------+--------+--------------------+------+---------------+--------------------+--------------------+--------+--------------------+--------------------+
|id_equipamento|  ibge| uf|           cidade|                nome|responsavel|telefone|            endereco|numero|    complemento|          referencia|              bairro|     cep|     georef_location|    data_atualizacao|
+--------------+------+---+-----------------+--------------------+-----------+--------+--------------------+------+---------------+--------------------+--------------------+--------+--------------------+--------------------+
|   23001006462|230010| CE|          Abaiara|       CRAS I - SEDE|       NULL|    NULL|JOAQUIM LEITE DA ...|   268|           NULL|PRÓXIMO A SECRETA...|              CENTRO|63240000|-7.36175927968484...|2024-05-09 01:00:...|
|   23001020547|230010| CE|          Abaiara|             CRAS II|       NULL|    NULL|CE 393, SITIO

In [9]:
df = df.fillna({'complemento':'sem complemento',
           'referencia':'sem referencia',
           'georef_location':'indisponivel'})

In [10]:
df.show()

+--------------+------+---+-----------------+--------------------+-----------+--------+--------------------+------+---------------+--------------------+--------------------+--------+--------------------+--------------------+
|id_equipamento|  ibge| uf|           cidade|                nome|responsavel|telefone|            endereco|numero|    complemento|          referencia|              bairro|     cep|     georef_location|    data_atualizacao|
+--------------+------+---+-----------------+--------------------+-----------+--------+--------------------+------+---------------+--------------------+--------------------+--------+--------------------+--------------------+
|   23001006462|230010| CE|          Abaiara|       CRAS I - SEDE|       NULL|    NULL|JOAQUIM LEITE DA ...|   268|sem complemento|PRÓXIMO A SECRETA...|              CENTRO|63240000|-7.36175927968484...|2024-05-09 01:00:...|
|   23001020547|230010| CE|          Abaiara|             CRAS II|       NULL|    NULL|CE 393, SITIO

In [11]:
for column in df.columns:
    print(column, df.filter(df[column].isNull()).count())

id_equipamento 0
ibge 0
uf 0
cidade 0
nome 0
responsavel 406
telefone 406
endereco 0
numero 0
complemento 0
referencia 0
bairro 0
cep 0
georef_location 0
data_atualizacao 0


In [12]:
df = df.drop('responsavel', 'telefone')

In [13]:
df.show()

+--------------+------+---+-----------------+--------------------+--------------------+------+---------------+--------------------+--------------------+--------+--------------------+--------------------+
|id_equipamento|  ibge| uf|           cidade|                nome|            endereco|numero|    complemento|          referencia|              bairro|     cep|     georef_location|    data_atualizacao|
+--------------+------+---+-----------------+--------------------+--------------------+------+---------------+--------------------+--------------------+--------+--------------------+--------------------+
|   23001006462|230010| CE|          Abaiara|       CRAS I - SEDE|JOAQUIM LEITE DA ...|   268|sem complemento|PRÓXIMO A SECRETA...|              CENTRO|63240000|-7.36175927968484...|2024-05-09 01:00:...|
|   23001020547|230010| CE|          Abaiara|             CRAS II|CE 393, SITIO BRE...|     0|sem complemento|PRÓXIMO A CAPELA ...|       Vila São José|63240000|-7.34850646703843...|20

In [14]:
# copia de segurança
df2 = df

### window Ranking Function
- Window Function 1 > Número de linhas - row_number()
- Window Function 2 > Ranking 1 - rank()
- Window Function 3 > Ranking 2 - dense_rank()
- Window Function 4 > Porcentagem Ranking - percent_rank()
- Window Function 5 > Divisão em N partes - ntile()

#### Window Function 1 > Número de linhas - row_number()


In [15]:
numero_de_linhas = Window.partitionBy('complemento').orderBy(desc('cidade'))

df.withColumn('numero_linha', row_number().over(numero_de_linhas)).show(50)

+--------------+------+---+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------------+
|id_equipamento|  ibge| uf|              cidade|                nome|            endereco|numero|         complemento|          referencia|              bairro|     cep|     georef_location|    data_atualizacao|numero_linha|
+--------------+------+---+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+------------+
|   23054003286|230540| CE|                 Icó|CRAS II - IRINEID...|  PADRE VIEIRA  - 61|    61|                   *|Em frente ao merc...|                 BNH|63430000|-6.39797846461917...|2024-05-09 01:00:...|           1|
|   23035015152|230350| CE|            Cascavel|CRAS Sede II - Pl...|Nossa Senhora do ...|   508|   

#### Window Function 2 > Ranking 1 - rank()

In [16]:
rank1 = Window.partitionBy('complemento').orderBy(desc('cidade'))
df.withColumn('rank', rank().over(rank1)).show(50)

+--------------+------+---+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+----+
|id_equipamento|  ibge| uf|              cidade|                nome|            endereco|numero|         complemento|          referencia|              bairro|     cep|     georef_location|    data_atualizacao|rank|
+--------------+------+---+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+----+
|   23054003286|230540| CE|                 Icó|CRAS II - IRINEID...|  PADRE VIEIRA  - 61|    61|                   *|Em frente ao merc...|                 BNH|63430000|-6.39797846461917...|2024-05-09 01:00:...|   1|
|   23035015152|230350| CE|            Cascavel|CRAS Sede II - Pl...|Nossa Senhora do ...|   508|                   -|      sem refe

#### Window Function 3 > Ranking 2 - dense_rank()

In [17]:
rank2 = Window.partitionBy('complemento').orderBy(desc('cidade'))

df.withColumn('rank2', dense_rank().over(rank2)).show(50)

+--------------+------+---+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+-----+
|id_equipamento|  ibge| uf|              cidade|                nome|            endereco|numero|         complemento|          referencia|              bairro|     cep|     georef_location|    data_atualizacao|rank2|
+--------------+------+---+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+-----+
|   23054003286|230540| CE|                 Icó|CRAS II - IRINEID...|  PADRE VIEIRA  - 61|    61|                   *|Em frente ao merc...|                 BNH|63430000|-6.39797846461917...|2024-05-09 01:00:...|    1|
|   23035015152|230350| CE|            Cascavel|CRAS Sede II - Pl...|Nossa Senhora do ...|   508|                   -|      sem 

#### Window Function 2 > Porcentagem Ranking  - percent_rank()

In [18]:
porcentagem = Window.partitionBy('complemento').orderBy(desc('cidade'))

df.withColumn('porcentagem', percent_rank().over(porcentagem)).show()

+--------------+------+---+--------------+--------------------+--------------------+------+----------------+--------------------+--------------------+--------+--------------------+--------------------+------------------+
|id_equipamento|  ibge| uf|        cidade|                nome|            endereco|numero|     complemento|          referencia|              bairro|     cep|     georef_location|    data_atualizacao|       porcentagem|
+--------------+------+---+--------------+--------------------+--------------------+------+----------------+--------------------+--------------------+--------+--------------------+--------------------+------------------+
|   23054003286|230540| CE|           Icó|CRAS II - IRINEID...|  PADRE VIEIRA  - 61|    61|               *|Em frente ao merc...|                 BNH|63430000|-6.39797846461917...|2024-05-09 01:00:...|               0.0|
|   23035015152|230350| CE|      Cascavel|CRAS Sede II - Pl...|Nossa Senhora do ...|   508|               -|      se

#### Window Function 5 > Divisão em N partes - ntile()

In [19]:
parte = Window.partitionBy('complemento').orderBy(desc('cidade'))

df.withColumn('partes', ntile(10).over(parte)).show()

+--------------+------+---+--------------+--------------------+--------------------+------+----------------+--------------------+--------------------+--------+--------------------+--------------------+------+
|id_equipamento|  ibge| uf|        cidade|                nome|            endereco|numero|     complemento|          referencia|              bairro|     cep|     georef_location|    data_atualizacao|partes|
+--------------+------+---+--------------+--------------------+--------------------+------+----------------+--------------------+--------------------+--------+--------------------+--------------------+------+
|   23054003286|230540| CE|           Icó|CRAS II - IRINEID...|  PADRE VIEIRA  - 61|    61|               *|Em frente ao merc...|                 BNH|63430000|-6.39797846461917...|2024-05-09 01:00:...|     1|
|   23035015152|230350| CE|      Cascavel|CRAS Sede II - Pl...|Nossa Senhora do ...|   508|               -|      sem referencia|            Planalto|62850000|-4.12

### Window Analytic Functions

#### Window Function - LAG / DEGRAU lag()

In [22]:
degrau = Window.partitionBy('complemento').orderBy(desc('cidade'))

df.withColumn('degrau', lag('cep').over(degrau)).show(50)

+--------------+------+---+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------+
|id_equipamento|  ibge| uf|              cidade|                nome|            endereco|numero|         complemento|          referencia|              bairro|     cep|     georef_location|    data_atualizacao|  degrau|
+--------------+------+---+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------+
|   23054003286|230540| CE|                 Icó|CRAS II - IRINEID...|  PADRE VIEIRA  - 61|    61|                   *|Em frente ao merc...|                 BNH|63430000|-6.39797846461917...|2024-05-09 01:00:...|    NULL|
|   23035015152|230350| CE|            Cascavel|CRAS Sede II - Pl...|Nossa Senhora do ...|   508|                   

#### Window Function - Lead / Degrau - lead()

In [24]:
degrau = Window.partitionBy('complemento').orderBy(desc('cidade'))

df.withColumn('degrau',lead('cep').over(degrau)).show(50)

+--------------+------+---+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------+
|id_equipamento|  ibge| uf|              cidade|                nome|            endereco|numero|         complemento|          referencia|              bairro|     cep|     georef_location|    data_atualizacao|  degrau|
+--------------+------+---+--------------------+--------------------+--------------------+------+--------------------+--------------------+--------------------+--------+--------------------+--------------------+--------+
|   23054003286|230540| CE|                 Icó|CRAS II - IRINEID...|  PADRE VIEIRA  - 61|    61|                   *|Em frente ao merc...|                 BNH|63430000|-6.39797846461917...|2024-05-09 01:00:...|    NULL|
|   23035015152|230350| CE|            Cascavel|CRAS Sede II - Pl...|Nossa Senhora do ...|   508|                   