In [1]:
import pyspark

from pyspark.sql import SparkSession, types
spark = SparkSession.builder.appName('tratamento').getOrCreate()

# Habilita o scroll horizontal 
from IPython.core.display import HTML
display(HTML("<style>pre { white-space: pre !important; }</style>"));

# Impede que arquivos de _SUCCESS e metadados sejam criados na hora de salvar
spark.sparkContext._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.sparkContext._jsc.hadoopConfiguration().set("parquet.enable.summary-metadata", "false")

In [2]:
from pyspark.sql.functions import (trim, initcap, upper, regexp_replace, translate, to_date, to_timestamp, 
                                   format_number, round)

In [3]:
def spark_shape(self):
    return (self.count(), len(self.columns))

pyspark.sql.dataframe.DataFrame.shape = spark_shape

In [4]:
df_respiradores = spark.read.csv('../../../Projetos/Respiradores/Arquivos/distribuicao_respiradores.csv',
                                 header=True, inferSchema=True, sep=';')
df_respiradores.printSchema()

root
 |-- DATA: string (nullable = true)
 |-- FORNECEDOR: string (nullable = true)
 |-- DESTINO: string (nullable = true)
 |-- ESTADO/MUNICIPIO: string (nullable = true)
 |-- TIPO: string (nullable = true)
 |-- QUANTIDADE: integer (nullable = true)
 |-- VALOR: string (nullable = true)
 |-- DESTINATARIO: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- DATA DE ENTREGA: string (nullable = true)



In [5]:
df_respiradores.shape()

(2650, 10)

In [6]:
df_respiradores.show()

+----------+----------+-------+----------------+----------+----------+---------------+--------------------+---+---------------+
|      DATA|FORNECEDOR|DESTINO|ESTADO/MUNICIPIO|      TIPO|QUANTIDADE|          VALOR|        DESTINATARIO| UF|DATA DE ENTREGA|
+----------+----------+-------+----------------+----------+----------+---------------+--------------------+---+---------------+
|21/05/2020|    VYAIRE|   ACRE|          ESTADO|       UTI|        20|R$ 1.200.000,00|Secretaria Estadu...| AC|     23/05/2020|
|21/05/2020|  MAGNAMED|   ACRE|          ESTADO|TRANSPORTE|        10|  R$ 482.900,00|Secretaria Estadu...| AC|     23/05/2020|
|05/06/2020|  LEISTUNG|   ACRE|          ESTADO|       UTI|        30|R$ 1.800.000,00|Secretaria Estadu...| AC|     08/06/2020|
|16/06/2020|    VYAIRE|   ACRE|       MUNICIPIO|       UTI|         5|  R$ 300.000,00|Secretaria Munici...| AC|     24/06/2020|
|16/06/2020|  MAGNAMED|   ACRE|       MUNICIPIO|TRANSPORTE|         5|  R$ 241.450,00|Secretaria Munici.

---

# Análise

In [7]:
list_columns = df_respiradores.columns

for i in list_columns:
    print(f"{i}: {df_respiradores.filter(df_respiradores[i].isNull()).count()}")

DATA: 0
FORNECEDOR: 0
DESTINO: 0
ESTADO/MUNICIPIO: 0
TIPO: 0
QUANTIDADE: 0
VALOR: 0
DESTINATARIO: 0
UF: 0
DATA DE ENTREGA: 0


In [8]:
df_respiradores.select('DESTINO').distinct().orderBy('DESTINO').show()
df_respiradores.select('DESTINO').distinct().orderBy('DESTINO').count()

+------------------+
|           DESTINO|
+------------------+
|              ACRE|
|           ALAGOAS|
|             AMAPA|
|          AMAZONAS|
|             BAHIA|
|             CEARA|
|  DISTRITO FEDERAL|
|   ESPIRITO SANTO |
|             GOIAS|
|            HAITI |
|            LIBANO|
|          MARANHÃO|
|      MATO GROSSO |
|MATO GROSSO DO SUL|
|      MINAS GERAIS|
|              PARA|
|           PARAIBA|
|            PARANA|
|        PERNAMBUCO|
|              PERU|
+------------------+
only showing top 20 rows



30

In [9]:
df_respiradores.select('UF').distinct().orderBy('UF').show()

+---+
| UF|
+---+
|  -|
| AC|
|AC |
| AL|
|AL |
| AM|
| AP|
| BA|
| CE|
| DF|
| ES|
| GO|
| MA|
| MG|
| MS|
| MT|
| PA|
| PB|
| PE|
| PI|
+---+
only showing top 20 rows



In [10]:
df_respiradores.filter(df_respiradores.UF.contains('-')).show()

+----------+----------+-------+----------------+----------+----------+----------------+--------------------+---+---------------+
|      DATA|FORNECEDOR|DESTINO|ESTADO/MUNICIPIO|      TIPO|QUANTIDADE|           VALOR|        DESTINATARIO| UF|DATA DE ENTREGA|
+----------+----------+-------+----------------+----------+----------+----------------+--------------------+---+---------------+
|07/08/2020|  MAGNAMED| LIBANO|               -|TRANSPORTE|       300|R$ 14.487.000,00|MISSÃO FAB (Doaçã...|  -|     08/08/2020|
|29/12/2020|  MAGNAMED|   PERU|               -|       UTI|       100| R$ 6.000.000,00|Cooperação Humani...|  -|     28/12/2020|
|29/12/2020|       KTK|   PERU|               -|TRANSPORTE|       230| R$ 4.600.000,00|Cooperação Humani...|  -|     28/12/2020|
|08/01/2021|       KTK| HAITI |               -|TRANSPORTE|        50| R$ 1.000.000,00|Cooperação Humani...|  -|     12/01/2021|
|08/01/2021|  MAGNAMED| HAITI |               -|       UTI|        50| R$ 3.000.000,00|Cooperação

In [11]:
df_respiradores.select('TIPO').distinct().show()

+--------------+
|          TIPO|
+--------------+
|           UTI|
|TRANSPORTE USA|
|Transporte USA|
|    TRANSPORTE|
+--------------+



In [12]:
df_respiradores.select('ESTADO/MUNICIPIO').distinct().show()

+----------------+
|ESTADO/MUNICIPIO|
+----------------+
|       MUNICIPIO|
|         ESTADO |
|          ESTADO|
|               -|
+----------------+



In [13]:
df_respiradores.select('FORNECEDOR').distinct().show()

+--------------------+
|          FORNECEDOR|
+--------------------+
|            LIFEMED |
|                 UTI|
|     LEISTUNG/VYAIRE|
|            MAGNAMED|
| MAGNAMED/REQUISIÇÃO|
|             VYAIRE |
|            INSPIRAR|
|            LEISTUNG|
|             LIFEMED|
|       RUSSER BRASIL|
|            HORTRON |
| VYAIRE LTV 1200 USA|
|           INSPIRAR |
|SUZANO/Zhongxun M...|
|           LEISTUNG |
|             HORTRON|
|              VYAIRE|
|              RUSSER|
|            ALLIAGE |
|                 KTK|
+--------------------+
only showing top 20 rows



---

# Tratamentos

In [14]:
df_respiradores.show(5)

+----------+----------+-------+----------------+----------+----------+---------------+--------------------+---+---------------+
|      DATA|FORNECEDOR|DESTINO|ESTADO/MUNICIPIO|      TIPO|QUANTIDADE|          VALOR|        DESTINATARIO| UF|DATA DE ENTREGA|
+----------+----------+-------+----------------+----------+----------+---------------+--------------------+---+---------------+
|21/05/2020|    VYAIRE|   ACRE|          ESTADO|       UTI|        20|R$ 1.200.000,00|Secretaria Estadu...| AC|     23/05/2020|
|21/05/2020|  MAGNAMED|   ACRE|          ESTADO|TRANSPORTE|        10|  R$ 482.900,00|Secretaria Estadu...| AC|     23/05/2020|
|05/06/2020|  LEISTUNG|   ACRE|          ESTADO|       UTI|        30|R$ 1.800.000,00|Secretaria Estadu...| AC|     08/06/2020|
|16/06/2020|    VYAIRE|   ACRE|       MUNICIPIO|       UTI|         5|  R$ 300.000,00|Secretaria Munici...| AC|     24/06/2020|
|16/06/2020|  MAGNAMED|   ACRE|       MUNICIPIO|TRANSPORTE|         5|  R$ 241.450,00|Secretaria Munici.

In [15]:
df_respiradores = df_respiradores.withColumn('UF', trim(df_respiradores.UF))
df_respiradores = df_respiradores.withColumn('DESTINO', trim(initcap(df_respiradores.DESTINO)))
df_respiradores = df_respiradores.withColumn('DESTINATARIO', trim(upper(df_respiradores.DESTINATARIO)))
df_respiradores = df_respiradores.withColumn('ESTADO/MUNICIPIO', trim(df_respiradores['ESTADO/MUNICIPIO']))
df_respiradores = df_respiradores.withColumn('TIPO', upper(df_respiradores.TIPO))

df_respiradores = df_respiradores.withColumnRenamed('VALOR', 'valor_(r$)')
df_respiradores = df_respiradores.withColumn('valor_(r$)', translate(df_respiradores['valor_(r$)'], "R$., ", ""))
df_respiradores = df_respiradores.withColumn("valor_(r$)", round(df_respiradores["valor_(r$)"].cast(types.FloatType()), 2))

In [16]:
df_respiradores.show(5)

+----------+----------+-------+----------------+----------+----------+----------+--------------------+---+---------------+
|      DATA|FORNECEDOR|DESTINO|ESTADO/MUNICIPIO|      TIPO|QUANTIDADE|valor_(r$)|        DESTINATARIO| UF|DATA DE ENTREGA|
+----------+----------+-------+----------------+----------+----------+----------+--------------------+---+---------------+
|21/05/2020|    VYAIRE|   Acre|          ESTADO|       UTI|        20|     1.2E8|SECRETARIA ESTADU...| AC|     23/05/2020|
|21/05/2020|  MAGNAMED|   Acre|          ESTADO|TRANSPORTE|        10|   4.829E7|SECRETARIA ESTADU...| AC|     23/05/2020|
|05/06/2020|  LEISTUNG|   Acre|          ESTADO|       UTI|        30|     1.8E8|SECRETARIA ESTADU...| AC|     08/06/2020|
|16/06/2020|    VYAIRE|   Acre|       MUNICIPIO|       UTI|         5|     3.0E7|SECRETARIA MUNICI...| AC|     24/06/2020|
|16/06/2020|  MAGNAMED|   Acre|       MUNICIPIO|TRANSPORTE|         5|  2.4145E7|SECRETARIA MUNICI...| AC|     24/06/2020|
+----------+----

### Datas

In [17]:
df_respiradores = df_respiradores.withColumn('DATA DE ENTREGA',
                                             regexp_replace(df_respiradores['DATA DE ENTREGA'], "21/04/20201", "21/04/2021"))

df_respiradores = df_respiradores.withColumn('DATA DE ENTREGA',
                                             regexp_replace(df_respiradores['DATA DE ENTREGA'], "31/04/2020", "01/05/2020"))

df_respiradores = df_respiradores.withColumn('DATA DE ENTREGA',
                                             regexp_replace(df_respiradores['DATA DE ENTREGA'], "29/02/2021", "01/03/2021"))

df_respiradores = df_respiradores.withColumn('DATA DE ENTREGA',
                                             regexp_replace(df_respiradores['DATA DE ENTREGA'], "30/10//2020", "30/10/2020"))

df_respiradores = df_respiradores.withColumn('DATA', regexp_replace(df_respiradores['DATA'], " 00:00", ""))

df_respiradores = df_respiradores.withColumn('DATA DE ENTREGA',
                                             regexp_replace(df_respiradores['DATA DE ENTREGA'], "-1", "- 1"))

df_respiradores = df_respiradores.withColumn('DATA DE ENTREGA',
                                             regexp_replace(df_respiradores['DATA DE ENTREGA'], " - ", " "))

In [18]:
list_dates = list(df_respiradores.select('DATA DE ENTREGA').filter(~df_respiradores['DATA DE ENTREGA']
                                                           .contains(':')).distinct().toPandas().values)

df_respiradores = df_respiradores.withColumn('DATA', to_date(df_respiradores['DATA'], 'dd/MM/yyyy'))

for i in range(0, len(list_dates)):
    df_respiradores = df_respiradores.withColumn('DATA DE ENTREGA', 
                                                 regexp_replace(df_respiradores['DATA DE ENTREGA'], list_dates[i][0], f"{list_dates[i][0]} 00:00"))

df_respiradores = df_respiradores.withColumn('DATA DE ENTREGA', regexp_replace(df_respiradores['DATA DE ENTREGA'], "00:00 ", ""))
df_respiradores = df_respiradores.withColumn('DATA DE ENTREGA', to_timestamp(df_respiradores['DATA DE ENTREGA'], 'd/M/y HH:mm'))

In [19]:
df_respiradores.show(92)

+----------+--------------------+--------+----------------+----------+----------+------------+--------------------+---+-------------------+
|      DATA|          FORNECEDOR| DESTINO|ESTADO/MUNICIPIO|      TIPO|QUANTIDADE|  valor_(r$)|        DESTINATARIO| UF|    DATA DE ENTREGA|
+----------+--------------------+--------+----------------+----------+----------+------------+--------------------+---+-------------------+
|2020-05-21|              VYAIRE|    Acre|          ESTADO|       UTI|        20|       1.2E8|SECRETARIA ESTADU...| AC|2020-05-23 00:00:00|
|2020-05-21|            MAGNAMED|    Acre|          ESTADO|TRANSPORTE|        10|     4.829E7|SECRETARIA ESTADU...| AC|2020-05-23 00:00:00|
|2020-06-05|            LEISTUNG|    Acre|          ESTADO|       UTI|        30|       1.8E8|SECRETARIA ESTADU...| AC|2020-06-08 00:00:00|
|2020-06-16|              VYAIRE|    Acre|       MUNICIPIO|       UTI|         5|       3.0E7|SECRETARIA MUNICI...| AC|2020-06-24 00:00:00|
|2020-06-16|        

In [20]:
df_respiradores.printSchema()

root
 |-- DATA: date (nullable = true)
 |-- FORNECEDOR: string (nullable = true)
 |-- DESTINO: string (nullable = true)
 |-- ESTADO/MUNICIPIO: string (nullable = true)
 |-- TIPO: string (nullable = true)
 |-- QUANTIDADE: integer (nullable = true)
 |-- valor_(r$): float (nullable = true)
 |-- DESTINATARIO: string (nullable = true)
 |-- UF: string (nullable = true)
 |-- DATA DE ENTREGA: timestamp (nullable = true)



---

## Salvar o tratamento

In [21]:
df_respiradores.coalesce(1).write.format("csv")\
                                 .mode("overwrite")\
                                 .save("../File/respiradores_tratado.csv", header="true")