<a href="https://colab.research.google.com/github/oallanfarias/pyspark/blob/main/structType.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install pyspark

In [9]:
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

In [3]:
spark = ( SparkSession.builder
                        .master("local")
                        .appName("novas_funcoes")
                        .config("spark.ui.port", "4050")
                        .getOrCreate() 
        )

In [4]:
df = ( spark.read.format("csv")
                  .option("header", "true")
                  .option("inferschema", "true")
                  .option("delimiter", ";")
                  .load('/content/drive/MyDrive/Datasets/arquivo_geral.csv')
)

In [5]:
df.printSchema()

root
 |-- regiao: string (nullable = true)
 |-- estado: string (nullable = true)
 |-- data: timestamp (nullable = true)
 |-- casosNovos: integer (nullable = true)
 |-- casosAcumulados: integer (nullable = true)
 |-- obitosNovos: integer (nullable = true)
 |-- obitosAcumulados: integer (nullable = true)



In [6]:
#FUNÇãO DE AGREGAÇÃO - agg
#AGRUPAR POR ESTADO - SOMA DE CASOS NOVOS, O VALOR MÁXIMO DE CASOS ACUMULADOS, E A MÉDIA DE ÓBITOS ACUMULADOS - mean
#df.groupBy('estado').sum('casosNovos').max('casosAcumulados').mean('obitosAcumulados').show()
df.groupBy('estado').agg(F.sum('casosNovos').alias('soma_casos_novos'), 
                         F.max('casosAcumulados').alias('maximo_casosAcumulados'), 
                         F.mean('obitosAcumulados').alias('media_obitosAcumulados')).show(10)
#df.groupBy('estado').mean('casosNovos').show()

+------+----------------+----------------------+----------------------+
|estado|soma_casos_novos|maximo_casosAcumulados|media_obitosAcumulados|
+------+----------------+----------------------+----------------------+
|    SC|            1209|                  1209|     6.632183908045977|
|    RO|             328|                   328|    0.7586206896551724|
|    PI|             297|                   297|    2.4942528735632186|
|    AM|            3635|                  3635|    29.482758620689655|
|    RR|             345|                   345|    0.6206896551724138|
|    GO|             506|                   506|    3.6206896551724137|
|    TO|              50|                    50|   0.16091954022988506|
|    MT|             247|                   247|    1.0344827586206897|
|    SP|           20004|                 20004|     215.8505747126437|
|    PB|             447|                   447|     5.195402298850575|
+------+----------------+----------------------+----------------

* count() - Retorna quatidade
* mean() - Retorna a média
* max() - Retorna o valor máximo
* min() - Retorna o valor mínimo
* sum() - Retorna a soma

In [10]:
#PARA SUBSTITUIR VALORES NULOS - FILLNA
df.fillna(value='NA') #ALTERA AS STRINGS NULL PARA NA
df.fillna(value=0) #ALTERA OS CAMPOS INTEIROS PARA 0
df.fillna(value='NAN', subset=['estado', 'regiao'])

DataFrame[regiao: string, estado: string, data: timestamp, casosNovos: int, casosAcumulados: int, obitosNovos: int, obitosAcumulados: int]

TIPOS BÁSICOS DE DADOS SPARK
- ByteType() -    (int)
- ShortType() -   (int)
- IntegerType() - (int)
- LongType()    - (int)
- FloatType()   - (float)
- DoubleType()  - (float)
- StringType()  - (str)
- BooleanType() - (bool)
- DecimalType() - (decimal.Decimal)
- NULL          - null

TIPOS COMPLEXOS DE DADOS
- TimestampType()  - (datetime.datetime)
- DateType()       - (datetime.date)
- ArrayType()      - (list, tuple, array)
- MapType()        - (dict)
- StructType()     - (list, tuple)
- StructField()    - (Tipo do Field)

In [11]:
#Count Distinct
df.distinct().count()

2349

In [14]:
#CONVERTER O DATAFRAME DO PYSPARK PARA PANDAS
df_pandas = df.toPandas()

In [15]:
#CONVERTER UM DATAFRAME DO PANDAS PARA O PYSPARK
df_spark = spark.createDataFrame(df_pandas)

In [37]:
#SPLIT _ DIVIDIR ALGO
df.show(10)
df.select(F.split(F.col('data'), '-').alias('TESTE')).show(truncate=False)

+------+------+-------------------+----------+---------------+-----------+----------------+
|regiao|estado|               data|casosNovos|casosAcumulados|obitosNovos|obitosAcumulados|
+------+------+-------------------+----------+---------------+-----------+----------------+
| Norte|    RO|2020-01-30 00:00:00|         0|              0|          0|               0|
| Norte|    RO|2020-01-31 00:00:00|         0|              0|          0|               0|
| Norte|    RO|2020-02-01 00:00:00|         0|              0|          0|               0|
| Norte|    RO|2020-02-02 00:00:00|         0|              0|          0|               0|
| Norte|    RO|2020-02-03 00:00:00|         0|              0|          0|               0|
| Norte|    RO|2020-02-04 00:00:00|         0|              0|          0|               0|
| Norte|    RO|2020-02-05 00:00:00|         0|              0|          0|               0|
| Norte|    RO|2020-02-06 00:00:00|         0|              0|          0|      

In [28]:
#REPLACE - SUBSTITUIR ALGUM TEXTO
df.withColumn('Regiao_alterada', F.regexp_replace('regiao', 'e', ' AAA')).show(10)

+------+------+-------------------+----------+---------------+-----------+----------------+---------------+
|regiao|estado|               data|casosNovos|casosAcumulados|obitosNovos|obitosAcumulados|Regiao_alterada|
+------+------+-------------------+----------+---------------+-----------+----------------+---------------+
| Norte|    RO|2020-01-30 00:00:00|         0|              0|          0|               0|       Nort AAA|
| Norte|    RO|2020-01-31 00:00:00|         0|              0|          0|               0|       Nort AAA|
| Norte|    RO|2020-02-01 00:00:00|         0|              0|          0|               0|       Nort AAA|
| Norte|    RO|2020-02-02 00:00:00|         0|              0|          0|               0|       Nort AAA|
| Norte|    RO|2020-02-03 00:00:00|         0|              0|          0|               0|       Nort AAA|
| Norte|    RO|2020-02-04 00:00:00|         0|              0|          0|               0|       Nort AAA|
| Norte|    RO|2020-02-05 00

In [None]:
'''
(
    df.withColumn('Regiao_alterada', F.when(F.col('regiao') == 'Norte'), 
                                      F.regexp_replace('regiao', 'Norte', 'Nordeste'))
                                      .when(F.col('regiao') == 'Nordeste'),
                                      F.regexp_replace('regiao', 'Nordeste', 'Sudeste')
                                      .when()
)
'''

In [40]:
# StructType - DEFINE O ESQUEMA PARA O DATAFRAME
from pyspark.sql.types import *

In [42]:
esquema = (
    StructType([
        StructField('Região',StringType()),
        StructField('ESTADO', StringType()),
        StructField('DaTa', DateType()),
        StructField('CASOSnovos', IntegerType()),
        StructField('casosAcumulados', IntegerType()),
        StructField('ObitosNovos', IntegerType()),
        StructField('ObitosAcumulados', IntegerType())
        ])
)

In [43]:
df = ( spark.read.format("csv")
                  .option("header", "true")
                  .option("inferschema", "false")
                  .option("delimiter", ";")
                  .load('/content/drive/MyDrive/Datasets/arquivo_geral.csv', schema=esquema)
)

In [44]:
df.printSchema()

root
 |-- Região: string (nullable = true)
 |-- ESTADO: string (nullable = true)
 |-- DaTa: date (nullable = true)
 |-- CASOSnovos: integer (nullable = true)
 |-- casosAcumulados: integer (nullable = true)
 |-- ObitosNovos: integer (nullable = true)
 |-- ObitosAcumulados: integer (nullable = true)



In [45]:
df.show()

+------+------+----------+----------+---------------+-----------+----------------+
|Região|ESTADO|      DaTa|CASOSnovos|casosAcumulados|ObitosNovos|ObitosAcumulados|
+------+------+----------+----------+---------------+-----------+----------------+
| Norte|    RO|2020-01-30|         0|              0|          0|               0|
| Norte|    RO|2020-01-31|         0|              0|          0|               0|
| Norte|    RO|2020-02-01|         0|              0|          0|               0|
| Norte|    RO|2020-02-02|         0|              0|          0|               0|
| Norte|    RO|2020-02-03|         0|              0|          0|               0|
| Norte|    RO|2020-02-04|         0|              0|          0|               0|
| Norte|    RO|2020-02-05|         0|              0|          0|               0|
| Norte|    RO|2020-02-06|         0|              0|          0|               0|
| Norte|    RO|2020-02-07|         0|              0|          0|               0|
| No

In [51]:
#WINDOW FUNCTIONS - Funções de Janela
from pyspark.sql.window import Window

In [52]:
schema = ['nome', 'departamento', 'estado', 'salario']

dados = [('Anderson', 'vendas', 'SP', 9000),
         ('Kennedy', 'vendas', 'RJ', 4500),
         ('Luciana', 'vendas', 'SP', 4500),
         ('Marta', 'vendas', 'SP', 4500),
         ('João', 'vendas', 'SP', 4500),
         ('Diego', 'vendas', 'SP', 4500),
         ('Marilia', 'vendas', 'SP', 1200),
         ('Gustavo', 'financeiro', 'AM', 8000),
         ('Pedro', 'financeiro', 'AM', 2750),
         ('Juliana', 'financeiro', 'MG', 3000),
         ('Leticia', 'financeiro', 'RJ', 7500),
         ('Oswaldo', 'marketing', 'RJ', 2450),
         ('Denis', 'marketing', 'MG', 1300)
         
        ]
df_window = spark.createDataFrame(data=dados, schema=schema)

In [53]:
df_window.show()

+--------+------------+------+-------+
|    nome|departamento|estado|salario|
+--------+------------+------+-------+
|Anderson|      vendas|    SP|   9000|
| Kennedy|      vendas|    RJ|   4500|
| Luciana|      vendas|    SP|   4500|
|   Marta|      vendas|    SP|   4500|
|    João|      vendas|    SP|   4500|
|   Diego|      vendas|    SP|   4500|
| Marilia|      vendas|    SP|   1200|
| Gustavo|  financeiro|    AM|   8000|
|   Pedro|  financeiro|    AM|   2750|
| Juliana|  financeiro|    MG|   3000|
| Leticia|  financeiro|    RJ|   7500|
| Oswaldo|   marketing|    RJ|   2450|
|   Denis|   marketing|    MG|   1300|
+--------+------------+------+-------+



In [60]:
w0 = Window.partitionBy(F.col('departamento')).orderBy('salario')

In [57]:
#ROW NUMBER - RETORNA O NÚMERO DA LINHA
df_window.withColumn('row_number', F.row_number().over(w0)).show()

+--------+------------+------+-------+----------+
|    nome|departamento|estado|salario|row_number|
+--------+------------+------+-------+----------+
|   Pedro|  financeiro|    AM|   2750|         1|
| Juliana|  financeiro|    MG|   3000|         2|
| Leticia|  financeiro|    RJ|   7500|         3|
| Gustavo|  financeiro|    AM|   8000|         4|
|   Denis|   marketing|    MG|   1300|         1|
| Oswaldo|   marketing|    RJ|   2450|         2|
| Marilia|      vendas|    SP|   1200|         1|
| Kennedy|      vendas|    RJ|   4500|         2|
| Luciana|      vendas|    SP|   4500|         3|
|   Marta|      vendas|    SP|   4500|         4|
|    João|      vendas|    SP|   4500|         5|
|   Diego|      vendas|    SP|   4500|         6|
|Anderson|      vendas|    SP|   9000|         7|
+--------+------------+------+-------+----------+



In [61]:
#RANK
df_window.withColumn('rank', F.rank().over(w0)).show()

+--------+------------+------+-------+----+
|    nome|departamento|estado|salario|rank|
+--------+------------+------+-------+----+
|   Pedro|  financeiro|    AM|   2750|   1|
| Juliana|  financeiro|    MG|   3000|   2|
| Leticia|  financeiro|    RJ|   7500|   3|
| Gustavo|  financeiro|    AM|   8000|   4|
|   Denis|   marketing|    MG|   1300|   1|
| Oswaldo|   marketing|    RJ|   2450|   2|
| Marilia|      vendas|    SP|   1200|   1|
| Kennedy|      vendas|    RJ|   4500|   2|
| Luciana|      vendas|    SP|   4500|   2|
|   Marta|      vendas|    SP|   4500|   2|
|    João|      vendas|    SP|   4500|   2|
|   Diego|      vendas|    SP|   4500|   2|
|Anderson|      vendas|    SP|   9000|   7|
+--------+------------+------+-------+----+



In [62]:
#DENSE_RANK
df_window.withColumn('dense_rank', F.dense_rank().over(w0)).show()

+--------+------------+------+-------+----------+
|    nome|departamento|estado|salario|dense_rank|
+--------+------------+------+-------+----------+
|   Pedro|  financeiro|    AM|   2750|         1|
| Juliana|  financeiro|    MG|   3000|         2|
| Leticia|  financeiro|    RJ|   7500|         3|
| Gustavo|  financeiro|    AM|   8000|         4|
|   Denis|   marketing|    MG|   1300|         1|
| Oswaldo|   marketing|    RJ|   2450|         2|
| Marilia|      vendas|    SP|   1200|         1|
| Kennedy|      vendas|    RJ|   4500|         2|
| Luciana|      vendas|    SP|   4500|         2|
|   Marta|      vendas|    SP|   4500|         2|
|    João|      vendas|    SP|   4500|         2|
|   Diego|      vendas|    SP|   4500|         2|
|Anderson|      vendas|    SP|   9000|         3|
+--------+------------+------+-------+----------+



In [69]:
#LAG
df_window.withColumn('lag', F.lag("salario",2).over(w0)).show()

+--------+------------+------+-------+----+
|    nome|departamento|estado|salario| lag|
+--------+------------+------+-------+----+
|   Pedro|  financeiro|    AM|   2750|null|
| Juliana|  financeiro|    MG|   3000|null|
| Leticia|  financeiro|    RJ|   7500|2750|
| Gustavo|  financeiro|    AM|   8000|3000|
|   Denis|   marketing|    MG|   1300|null|
| Oswaldo|   marketing|    RJ|   2450|null|
| Marilia|      vendas|    SP|   1200|null|
| Kennedy|      vendas|    RJ|   4500|null|
| Luciana|      vendas|    SP|   4500|1200|
|   Marta|      vendas|    SP|   4500|4500|
|    João|      vendas|    SP|   4500|4500|
|   Diego|      vendas|    SP|   4500|4500|
|Anderson|      vendas|    SP|   9000|4500|
+--------+------------+------+-------+----+



In [70]:
#LEAD
df_window.withColumn('lead', F.lead("salario",1).over(w0)).show()

+--------+------------+------+-------+----+
|    nome|departamento|estado|salario|lead|
+--------+------------+------+-------+----+
|   Pedro|  financeiro|    AM|   2750|3000|
| Juliana|  financeiro|    MG|   3000|7500|
| Leticia|  financeiro|    RJ|   7500|8000|
| Gustavo|  financeiro|    AM|   8000|null|
|   Denis|   marketing|    MG|   1300|2450|
| Oswaldo|   marketing|    RJ|   2450|null|
| Marilia|      vendas|    SP|   1200|4500|
| Kennedy|      vendas|    RJ|   4500|4500|
| Luciana|      vendas|    SP|   4500|4500|
|   Marta|      vendas|    SP|   4500|4500|
|    João|      vendas|    SP|   4500|4500|
|   Diego|      vendas|    SP|   4500|9000|
|Anderson|      vendas|    SP|   9000|null|
+--------+------------+------+-------+----+

