In [None]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:90% !important; }</style>"))

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
spark = SparkSession.builder.master("local[*]").appName('spark_intermunicipal_sc').getOrCreate()

tb = spark.read.parquet("ref/parquet")
tb.createOrReplaceTempView("tb")

dicionario_secoes = spark.read.option("delimiter", ";").option("header", "true").option("inferSchema", "true").option("encoding", "utf-8").csv("dicionario/dicionario_secoes_stripped.csv")
dicionario_secoes.createOrReplaceTempView("dicionario_secoes")

dicionario_linhas = spark.read.option("delimiter", ";").option("header", "true").option("inferSchema", "true").option("encoding", "utf-8").csv("dicionario/dicionario_linhas_stripped.csv")
dicionario_linhas.createOrReplaceTempView("dicionario_linhas")

# verificação dos arquivos lidos

print(f"\
      tb = {tb.count()} linhas | {len(tb.columns)} colunas\n\
      dicionario_secoes = {dicionario_secoes.count()} linhas | {len(dicionario_secoes.columns)} colunas\n\
      dicionario_linhas = {dicionario_linhas.count()} linhas | {len(dicionario_linhas.columns)} colunas\
      ")

df_join = spark.sql("""

SELECT
tb.ano AS `ano`,
tb.empresa AS `empresa`,
servico_ambito AS `modalidade`,
CONCAT(prefixo, ' - ', CONCAT(linha_ori.sig, '-', linha_des.sig)) AS linha,
prefixo,
CONCAT(linha_ori.sig, '-', linha_des.sig) AS sig_linha_ori_des,
CONCAT(secao_ori.sig, '-', secao_des.sig) AS sig_secao_ori_des,
km,
pax_total AS `passageiros_total`,
tb.ori_municipio_nome AS sie_linha_ori,
tb.des_municipio_nome AS sie_linha_des,
tb.ori_localidade_nome AS sie_secao_ori,
tb.des_localidade_nome AS sie_secao_des,
linha_ori.sig AS sig_linha_ori,
linha_des.sig AS sig_linha_des,
secao_ori.sig AS sig_secao_ori,
secao_des.sig AS sig_secao_des
FROM tb
LEFT JOIN dicionario_secoes AS secao_ori ON tb.ori_localidade_nome = secao_ori.sie
LEFT JOIN dicionario_secoes AS secao_des ON tb.des_localidade_nome = secao_des.sie
LEFT JOIN dicionario_linhas AS linha_ori ON tb.ori_municipio_nome = linha_ori.sie
LEFT JOIN dicionario_linhas AS linha_des ON tb.des_municipio_nome = linha_des.sie
ORDER BY `ano`, `empresa`, `prefixo`

""")

df_join.write.option("header", "True").option("encoding", "utf-8").parquet("ref_join/parquet/multifile/")
df_join.write.option("delimiter", ";").option("header", "True").option("encoding", "utf-8").csv("ref_join/csv/multifile/")

print((df_join.count(), len(df_join.columns)))

# df_join.write.option("header", "True").option("encoding", "utf-8").parquet("ref_join/parquet/singlefile/")
# df_join.write.option("delimiter", ";").option("header", "True").option("encoding", "utf-8").csv("ref_join/csv/singlefile/")
# df_join.write.partitionBy("ano", "empresa").option("delimiter", ";").option("header", "true").option("encoding", "utf-8").csv("./intermunicipal_sc/ref_join/")