In [1]:
# Import SparkSession
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, to_timestamp, regexp_replace
from pyspark.sql.types import *
import pyspark.sql.functions as F
import os

In [2]:
# Build the SparkSession
spark = SparkSession.builder \
   .master("local") \
   .appName("Linear Regression Model") \
   .config("spark.executor.memory", "1gb") \
   .getOrCreate()
   
sc = spark.sparkContext

In [3]:
def merge_all(dfs):
    
    if len(dfs) == 0:
        return None
    
    df = dfs[0]
    
    for df2 in dfs[1:]:
        df = df.union(df2)
    
    return df

In [4]:
directory_raw = 'data/raw/'
directory_processed = 'data/processed/'

In [5]:
anos_interesse = ['2020', '2019', '2018']

# Atracação

In [6]:
dfs = []

for ano in anos_interesse:
    dfs.append(spark.read.options(header='True', inferSchema='True', delimiter=';') \
                          .csv(f"{directory_raw}{ano}Atracacao.txt"))

In [7]:
df_atracacao = merge_all(dfs)

In [8]:
# Write a custom function to convert the data type of DataFrame columns
def convertColumn(df, names):
    for name in names: 
        df = df.withColumn(name, to_timestamp(df[name], 'dd/MM/yyyy HH:mm:ss'))
    return df 

columns = ['Data Atracação', 'Data Chegada','Data Desatracação', 'Data Início Operação', 'Data Término Operação']

df_atracacao = convertColumn(df_atracacao, columns)

## TEsperaAtracacao: Atracação - Chegada

In [9]:
df_atracacao = df_atracacao.withColumn(
    "TEsperaAtracacao", 
    (F.col("Data Atracação").cast("long") - F.col("Data Chegada").cast("long"))/60.
)

## TEsperaInicioOp:  Início - Atracação

In [10]:
df_atracacao = df_atracacao.withColumn(
    "TEsperaInicioOp", 
    (F.col("Data Início Operação").cast("long") - F.col("Data Atracação").cast("long"))/60.
)

## TOperacao: Término - Início

In [11]:
df_atracacao = df_atracacao.withColumn(
    "TOperacao", 
    (F.col("Data Término Operação").cast("long") - F.col("Data Início Operação").cast("long"))/60.
)

## TEsperaDesatracacao: Desatracação - Término

In [12]:
df_atracacao = df_atracacao.withColumn(
    "TEsperaDesatracacao", 
    (F.col("Data Desatracação").cast("long") - F.col("Data Término Operação").cast("long"))/60.
)

## TAtracado: Desatracação - Atracação

In [13]:
df_atracacao = df_atracacao.withColumn(
    "TAtracado", 
    (F.col("Data Desatracação").cast("long") - F.col("Data Atracação").cast("long"))/60.
)

## TEstadia: Desatracação - Chegada

In [14]:
df_atracacao = df_atracacao.withColumn(
    "TEstadia", 
    (F.col("Data Desatracação").cast("long") - F.col("Data Chegada").cast("long"))/60.
)

### Salvar tabela final de Atracação

In [15]:
if not os.path.exists(directory_processed):
    os.makedirs(directory_processed)

df_atracacao.write.mode("overwrite").option("quoteAll", True).csv(f"{directory_processed}atracacao_fato.csv", header=True)

# Carga

In [16]:
dfs = []

for ano in anos_interesse:
    dfs.append(spark.read.options(header='True', inferSchema='True', delimiter=';') \
                          .csv(f"{directory_raw}{ano}Carga.txt"))

In [17]:
df_carga = merge_all(dfs)

### Merge entre as tabelas de Carga e Atracação

In [18]:
df_carga = df_carga.join(df_atracacao, df_carga.IDAtracacao == df_atracacao.IDAtracacao) \
                    .select(df_carga["*"], df_atracacao["Ano"], df_atracacao["Mes"], df_atracacao["Porto Atracação"], df_atracacao["SGUF"])

### Peso líquido da carga (Carga não conteinerizada = Peso bruto e Carga conteinerizada = Peso sem contêiner)

In [19]:
commaToDot = udf(lambda x : float(str(x).replace(',', '.')), FloatType())

df_carga = df_carga.withColumn(
    "VLPesoCargaBruta", 
    commaToDot('VLPesoCargaBruta')
)

In [20]:
df_carga = df_carga.withColumn(
    "Peso líquido da carga", 
    col("VLPesoCargaBruta")
)


df_carga = df_carga.withColumn(
    "Peso líquido da carga", 
    F.when( (col("FlagConteinerTamanho") == '20')  & (col("FlagConteinerTamanho").isNotNull()),
           (col("VLPesoCargaBruta") - 2.3)).otherwise(F.when( (col("FlagConteinerTamanho") == '40'),
           (col("VLPesoCargaBruta") - 3.7)).otherwise(col("VLPesoCargaBruta")))
)


### Salvar tabela final de Carga

In [21]:
if not os.path.exists(directory_processed):
    os.makedirs(directory_processed)

df_carga.write.mode("overwrite").option("quoteAll", True).csv(f"{directory_processed}carga_fato.csv", header=True)