In [1]:
# Libs

import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import to_timestamp, regexp_replace
from datetime import datetime

import warnings
warnings.filterwarnings('ignore')

In [2]:
# Instância o spark

spark = SparkSession.builder.appName("ANTAQ").getOrCreate()

22/03/05 20:38:35 WARN Utils: Your hostname, DESKTOP-GO9KC4P resolves to a loopback address: 127.0.1.1; using 172.22.14.95 instead (on interface eth0)
22/03/05 20:38:35 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/03/05 20:38:37 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
# Verifica os arquivos de consumo

!ls data/txt

2019Atracacao.txt  2020Atracacao.txt  2021Atracacao.txt
2019Carga.txt	   2020Carga.txt      2021Carga.txt


In [4]:
# Parâmetros

path_txt = 'data/txt/'
path_csv = 'data/csv/'

years = [2019, 2020, 2021]

if not os.path.exists(path_csv):
    os.makedirs(path_csv)

## atracacao_fato

In [5]:
# Uni as bases de dados

df = []
for i in years:
    df.append(spark.read.options(header='True', inferSchema='True', delimiter=';').csv(f'{path_txt}{i}Atracacao.txt'))
    
dfA = df[0]
    
for i in df[1:]:
    dfA = dfA.union(i)

                                                                                

In [6]:
# Verifica schema atual

print(dfA.count())

dfA.printSchema()

232230
root
 |-- IDAtracacao: integer (nullable = true)
 |-- CDTUP: string (nullable = true)
 |-- IDBerco: string (nullable = true)
 |-- Berço: string (nullable = true)
 |-- Porto Atracação: string (nullable = true)
 |-- Apelido Instalação Portuária: string (nullable = true)
 |-- Complexo Portuário: string (nullable = true)
 |-- Tipo da Autoridade Portuária: string (nullable = true)
 |-- Data Atracação: string (nullable = true)
 |-- Data Chegada: string (nullable = true)
 |-- Data Desatracação: string (nullable = true)
 |-- Data Início Operação: string (nullable = true)
 |-- Data Término Operação: string (nullable = true)
 |-- Ano: integer (nullable = true)
 |-- Mes: string (nullable = true)
 |-- Tipo de Operação: string (nullable = true)
 |-- Tipo de Navegação da Atracação: string (nullable = true)
 |-- Nacionalidade do Armador: integer (nullable = true)
 |-- FlagMCOperacaoAtracacao: integer (nullable = true)
 |-- Terminal: string (nullable = true)
 |-- Município: string (nullable = t

## Gabarito da Atracação

Ano -> Ano da data de início da operação

Mes -> Mês da data de início da operação

TEsperaAtracacao <-  Data Atracação - Data Chegada

TEsperaInicioOp <- Data Início Operação - Data Atracação 

TOperacao <- Data Término Operação - Data Início Operação

TEsperaDesatracacao <- Data Desatracação - Data Término Operação

TAtracado <- TEsperaInicioOp + TOperacao + TEsperaDesatracacao (Data Desatracação - Data Atracação)

TEstadia <- TEsperaAtracacao + TEsperaInicioOp + TOperacao + TEsperaDesatracacao (Data Desatracação - Data Chegada)

In [7]:
# Muda nome de coluna

dfA = dfA.withColumnRenamed('Ano', 'Ano da data de início da operação')\
             .withColumnRenamed('Mes', "Mês da data de início da operação")

In [8]:
# Converte de string para timestamp

columns = ["Data Atracação", "Data Chegada", "Data Desatracação", "Data Início Operação", "Data Término Operação"]

for i in columns:
    dfA = dfA.withColumn(i,to_timestamp(i, 'dd/MM/yyyy HH:mm:ss').alias(i))
    
dfA.printSchema()

root
 |-- IDAtracacao: integer (nullable = true)
 |-- CDTUP: string (nullable = true)
 |-- IDBerco: string (nullable = true)
 |-- Berço: string (nullable = true)
 |-- Porto Atracação: string (nullable = true)
 |-- Apelido Instalação Portuária: string (nullable = true)
 |-- Complexo Portuário: string (nullable = true)
 |-- Tipo da Autoridade Portuária: string (nullable = true)
 |-- Data Atracação: timestamp (nullable = true)
 |-- Data Chegada: timestamp (nullable = true)
 |-- Data Desatracação: timestamp (nullable = true)
 |-- Data Início Operação: timestamp (nullable = true)
 |-- Data Término Operação: timestamp (nullable = true)
 |-- Ano da data de início da operação: integer (nullable = true)
 |-- Mês da data de início da operação: string (nullable = true)
 |-- Tipo de Operação: string (nullable = true)
 |-- Tipo de Navegação da Atracação: string (nullable = true)
 |-- Nacionalidade do Armador: integer (nullable = true)
 |-- FlagMCOperacaoAtracacao: integer (nullable = true)
 |-- Ter

In [9]:
# Cria colunas com demandas solicitadas em Horas (Verificar o parâmetro de tempo)

dfA = dfA.withColumn('TEsperaAtracacao',(dfA['Data Atracação'].cast('long') - dfA['Data Chegada'].cast('long'))/3600)

dfA = dfA.withColumn('TEsperaInicioOp',(dfA['Data Início Operação'].cast('long') - dfA['Data Atracação'].cast('long'))/3600)

dfA = dfA.withColumn('TOperacao',(dfA['Data Término Operação'].cast('long') - dfA['Data Início Operação'].cast('long'))/3600)

dfA = dfA.withColumn('TEsperaDesatracacao',(dfA['Data Desatracação'].cast('long') - dfA['Data Término Operação'].cast('long'))/3600)

dfA = dfA.withColumn('TAtracado',(dfA['Data Desatracação'].cast('long') - dfA['Data Atracação'].cast('long'))/3600)

dfA = dfA.withColumn('TEstadia',(dfA['Data Desatracação'].cast('long') - dfA['Data Chegada'].cast('long'))/3600)

In [10]:
# Verifica os campos existentes

dfA.select(['Data Atracação', 'Data Chegada', 'Data Desatracação', 'Data Início Operação', 'Data Término Operação']).show(10)

+-------------------+-------------------+-------------------+--------------------+---------------------+
|     Data Atracação|       Data Chegada|  Data Desatracação|Data Início Operação|Data Término Operação|
+-------------------+-------------------+-------------------+--------------------+---------------------+
|2019-10-28 08:30:00|2019-10-28 08:15:00|2019-10-28 11:30:00|                null|                 null|
|2019-11-26 07:40:00|2019-11-26 06:55:00|2019-11-26 10:36:00|                null|                 null|
|2019-11-12 08:15:00|2019-11-12 07:00:00|2019-11-15 15:00:00|                null|                 null|
|2019-09-24 08:50:00|2019-09-24 08:00:00|2019-09-24 17:00:00|                null|                 null|
|2019-02-09 13:45:00|2019-02-09 12:20:00|2019-02-11 09:10:00|                null|                 null|
|2019-02-01 07:50:00|2019-02-01 07:00:00|2019-02-04 10:10:00| 2019-02-01 07:51:00|  2019-02-04 10:00:00|
|2019-03-15 09:50:00|2019-03-15 09:10:00|2019-03-18 10:

In [11]:
# Verifica os campos criados

dfA.select(['TEsperaAtracacao', 'TEsperaInicioOp', 'TOperacao', 'TEsperaDesatracacao', 'TAtracado', 'TEstadia']).show(10)

+------------------+--------------------+---------+-------------------+------------------+------------------+
|  TEsperaAtracacao|     TEsperaInicioOp|TOperacao|TEsperaDesatracacao|         TAtracado|          TEstadia|
+------------------+--------------------+---------+-------------------+------------------+------------------+
|              0.25|                null|     null|               null|               3.0|              3.25|
|              0.75|                null|     null|               null| 2.933333333333333| 3.683333333333333|
|              1.25|                null|     null|               null|             78.75|              80.0|
|0.8333333333333334|                null|     null|               null| 8.166666666666666|               9.0|
|1.4166666666666667|                null|     null|               null|43.416666666666664|44.833333333333336|
|0.8333333333333334|0.016666666666666666|    74.15|0.16666666666666666| 74.33333333333333| 75.16666666666667|
|0.6666666

In [12]:
# Última verificação antes do salvamento (Verificar se é necessário o tratamento de valores faltantes)

dfA.printSchema()

root
 |-- IDAtracacao: integer (nullable = true)
 |-- CDTUP: string (nullable = true)
 |-- IDBerco: string (nullable = true)
 |-- Berço: string (nullable = true)
 |-- Porto Atracação: string (nullable = true)
 |-- Apelido Instalação Portuária: string (nullable = true)
 |-- Complexo Portuário: string (nullable = true)
 |-- Tipo da Autoridade Portuária: string (nullable = true)
 |-- Data Atracação: timestamp (nullable = true)
 |-- Data Chegada: timestamp (nullable = true)
 |-- Data Desatracação: timestamp (nullable = true)
 |-- Data Início Operação: timestamp (nullable = true)
 |-- Data Término Operação: timestamp (nullable = true)
 |-- Ano da data de início da operação: integer (nullable = true)
 |-- Mês da data de início da operação: string (nullable = true)
 |-- Tipo de Operação: string (nullable = true)
 |-- Tipo de Navegação da Atracação: string (nullable = true)
 |-- Nacionalidade do Armador: integer (nullable = true)
 |-- FlagMCOperacaoAtracacao: integer (nullable = true)
 |-- Ter

In [13]:
# Salva dataframe (atracacao_fato) em spark

dfA.write.option("header",True).csv(f"{path_csv}atracacao_fato.csv")

22/03/05 20:38:49 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

In [14]:
# Carrega o dataframe spark

dfA = spark.read.format("csv").option("header","true").load("data/csv/atracacao_fato.csv")

# carga_fato

In [16]:
# Uni as bases de dados

df = []
for i in years:
    df.append(spark.read.options(header='True', inferSchema='True', delimiter=';').csv(f'{path_txt}{i}Carga.txt'))
    
dfC = df[0]
    
for i in df[1:]:
    dfC = dfC.union(i)

                                                                                

In [17]:
# Verifica schema atual

print(dfC.count())

dfC.printSchema()



6479100
root
 |-- IDCarga: integer (nullable = true)
 |-- IDAtracacao: integer (nullable = true)
 |-- Origem: string (nullable = true)
 |-- Destino: string (nullable = true)
 |-- CDMercadoria: string (nullable = true)
 |-- Tipo Operação da Carga: string (nullable = true)
 |-- Carga Geral Acondicionamento: string (nullable = true)
 |-- ConteinerEstado: string (nullable = true)
 |-- Tipo Navegação: string (nullable = true)
 |-- FlagAutorizacao: string (nullable = true)
 |-- FlagCabotagem: integer (nullable = true)
 |-- FlagCabotagemMovimentacao: integer (nullable = true)
 |-- FlagConteinerTamanho: string (nullable = true)
 |-- FlagLongoCurso: integer (nullable = true)
 |-- FlagMCOperacaoCarga: integer (nullable = true)
 |-- FlagOffshore: integer (nullable = true)
 |-- FlagTransporteViaInterioir: integer (nullable = true)
 |-- Percurso Transporte em vias Interiores: string (nullable = true)
 |-- Percurso Transporte Interiores: string (nullable = true)
 |-- STNaturezaCarga: string (nullabl

                                                                                

## Gabarito da Carga

Ano da data de início da operação da atracação <- join do atracacao_fato

Mês da data de início da operação da atracação <- join do atracacao_fato

Porto Atracação <- join do atracacao_fato

SGUF <- join do atracacao_fato

Peso líquido da carga

In [18]:
# Realiza o join com as caracteristicas solicitadas

dfC = dfC.join(dfA, dfC.IDAtracacao == dfA.IDAtracacao).select(dfC["*"], dfA["Ano da data de início da operação"], dfA["Mês da data de início da operação"], dfA["Porto Atracação"], dfA["SGUF"])

In [19]:
# Verifica novo schema de contagem de amostras 

print(dfC.count())

dfC.printSchema()



6479100
root
 |-- IDCarga: integer (nullable = true)
 |-- IDAtracacao: integer (nullable = true)
 |-- Origem: string (nullable = true)
 |-- Destino: string (nullable = true)
 |-- CDMercadoria: string (nullable = true)
 |-- Tipo Operação da Carga: string (nullable = true)
 |-- Carga Geral Acondicionamento: string (nullable = true)
 |-- ConteinerEstado: string (nullable = true)
 |-- Tipo Navegação: string (nullable = true)
 |-- FlagAutorizacao: string (nullable = true)
 |-- FlagCabotagem: integer (nullable = true)
 |-- FlagCabotagemMovimentacao: integer (nullable = true)
 |-- FlagConteinerTamanho: string (nullable = true)
 |-- FlagLongoCurso: integer (nullable = true)
 |-- FlagMCOperacaoCarga: integer (nullable = true)
 |-- FlagOffshore: integer (nullable = true)
 |-- FlagTransporteViaInterioir: integer (nullable = true)
 |-- Percurso Transporte em vias Interiores: string (nullable = true)
 |-- Percurso Transporte Interiores: string (nullable = true)
 |-- STNaturezaCarga: string (nullabl

                                                                                

In [20]:
# Verifica estrutura da coluna

dfC.select('VLPesoCargaBruta').show()

+----------------+
|VLPesoCargaBruta|
+----------------+
|         330,905|
|          27,552|
|          26,036|
|             672|
|          27,552|
|           100,6|
|           25,92|
|          192,03|
|           235,2|
|              37|
|           12,98|
|           42,24|
|           48,51|
|          34,465|
|           1,843|
|            1145|
|          1335,9|
|            1527|
|           52,78|
|             501|
+----------------+
only showing top 20 rows



In [21]:
# Converte de string para float

dfC = dfC.withColumn('VLPesoCargaBruta', regexp_replace("VLPesoCargaBruta", r'[,]',"."))

dfC = dfC.withColumn('VLPesoCargaBruta', dfC["VLPesoCargaBruta"].cast('float').alias("VLPesoCargaBruta"))

In [22]:
dfC.printSchema()

root
 |-- IDCarga: integer (nullable = true)
 |-- IDAtracacao: integer (nullable = true)
 |-- Origem: string (nullable = true)
 |-- Destino: string (nullable = true)
 |-- CDMercadoria: string (nullable = true)
 |-- Tipo Operação da Carga: string (nullable = true)
 |-- Carga Geral Acondicionamento: string (nullable = true)
 |-- ConteinerEstado: string (nullable = true)
 |-- Tipo Navegação: string (nullable = true)
 |-- FlagAutorizacao: string (nullable = true)
 |-- FlagCabotagem: integer (nullable = true)
 |-- FlagCabotagemMovimentacao: integer (nullable = true)
 |-- FlagConteinerTamanho: string (nullable = true)
 |-- FlagLongoCurso: integer (nullable = true)
 |-- FlagMCOperacaoCarga: integer (nullable = true)
 |-- FlagOffshore: integer (nullable = true)
 |-- FlagTransporteViaInterioir: integer (nullable = true)
 |-- Percurso Transporte em vias Interiores: string (nullable = true)
 |-- Percurso Transporte Interiores: string (nullable = true)
 |-- STNaturezaCarga: string (nullable = true

In [23]:
dfC.select('VLPesoCargaBruta').show()


+----------------+
|VLPesoCargaBruta|
+----------------+
|         330.905|
|          27.552|
|          26.036|
|           672.0|
|          27.552|
|           100.6|
|           25.92|
|          192.03|
|           235.2|
|            37.0|
|           12.98|
|           42.24|
|           48.51|
|          34.465|
|           1.843|
|          1145.0|
|          1335.9|
|          1527.0|
|           52.78|
|           501.0|
+----------------+
only showing top 20 rows

