In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql import SQLContext
spark = SparkSession.builder.master("local[1]").appName('').getOrCreate()

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/02/14 13:09:31 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [2]:
# tb = spark.read.option("inferSchema", "true").parquet("intermunicipal_sc/ref/parquet")
# tb = spark.read.schema(schema).parquet("intermunicipal_sc/ref/parquet")
tb = spark.read.parquet("intermunicipal_sc/ref/parquet")
tb.createOrReplaceTempView("tb")

dicionario_secoes = spark.read.option("delimiter", ";").option("header", "true").option("inferSchema", "true").option("encoding", "utf-8").csv("./intermunicipal_sc/dicionario/dicionario_secoes_stripped.csv")
dicionario_secoes.createOrReplaceTempView("dicionario_secoes")

dicionario_linhas = spark.read.option("delimiter", ";").option("header", "true").option("inferSchema", "true").option("encoding", "utf-8").csv("./intermunicipal_sc/dicionario/dicionario_linhas_stripped.csv")
dicionario_linhas.createOrReplaceTempView("dicionario_linhas")

print(f"\
      tb = {tb.count()} linhas | {len(tb.columns)} colunas\n\
      dicionario_secoes = {dicionario_secoes.count()} linhas | {len(dicionario_secoes.columns)} colunas\n\
      dicionario_linhas = {dicionario_linhas.count()} linhas | {len(dicionario_linhas.columns)} colunas\
      ")

22/02/14 13:09:36 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.
                                                                                

      tb = 615285 linhas | 53 colunas
      dicionario_secoes = 1710 linhas | 2 colunas
      dicionario_linhas = 242 linhas | 2 colunas      


In [None]:
# tb.printSchema()

In [3]:
df_join = spark.sql("""

SELECT
tb.ano AS `ano`,
tb.empresa AS `empresa`,
servico_ambito AS `modalidade`,
prefixo,
CONCAT(linha_ori.sig, '-', linha_des.sig) AS sig_linha_ori_des,
CONCAT(secao_ori.sig, '-', secao_des.sig) AS sig_secao_ori_des,
km,
pax_total AS `passageiros_total`,
tb.ori_municipio_nome AS sie_linha_ori,
tb.des_municipio_nome AS sie_linha_des,
tb.ori_localidade_nome AS sie_secao_ori,
tb.des_localidade_nome AS sie_secao_des,
linha_ori.sig AS sig_linha_ori,
linha_des.sig AS sig_linha_des,
secao_ori.sig AS sig_secao_ori,
secao_des.sig AS sig_secao_des
FROM tb
LEFT JOIN dicionario_secoes AS secao_ori ON tb.ori_localidade_nome = secao_ori.sie
LEFT JOIN dicionario_secoes AS secao_des ON tb.des_localidade_nome = secao_des.sie
LEFT JOIN dicionario_linhas AS linha_ori ON tb.ori_municipio_nome = linha_ori.sie
LEFT JOIN dicionario_linhas AS linha_des ON tb.des_municipio_nome = linha_des.sie
ORDER BY `ano`, `empresa`, `prefixo`

""")

In [4]:
print((df_join.count(), len(df_join.columns)))

[Stage 16:>                                                         (0 + 1) / 1]

(615285, 16)


                                                                                

# Gravação

In [7]:
df_join.write.option("delimiter", ";").option("header", "true").option("encoding", "utf-8").csv("./intermunicipal_sc/ref_join/")

                                                                                

# Teste

In [8]:
leitura_joined = spark.read.option('delimiter', ';').option('encoding', 'utf-8').option('header', 'true').csv('./intermunicipal_sc/ref_join/')

def teste_qualidade(df, n_colunas, n_linhas):
    if (len(df.columns) == n_colunas) == True and (int(df.count()) == n_linhas) == True:
        print("OK")
    else:
        print("FAILED")

teste_qualidade(leitura_joined, 16, 615285)

[Stage 38:>                                                         (0 + 1) / 1]

OK


                                                                                

In [9]:
leitura_joined.printSchema()

root
 |-- ano: string (nullable = true)
 |-- empresa: string (nullable = true)
 |-- modalidade: string (nullable = true)
 |-- prefixo: string (nullable = true)
 |-- sig_linha_ori_des: string (nullable = true)
 |-- sig_secao_ori_des: string (nullable = true)
 |-- km: string (nullable = true)
 |-- passageiros_total: string (nullable = true)
 |-- sie_linha_ori: string (nullable = true)
 |-- sie_linha_des: string (nullable = true)
 |-- sie_secao_ori: string (nullable = true)
 |-- sie_secao_des: string (nullable = true)
 |-- sig_linha_ori: string (nullable = true)
 |-- sig_linha_des: string (nullable = true)
 |-- sig_secao_ori: string (nullable = true)
 |-- sig_secao_des: string (nullable = true)



# Exemplos

## Seleção com atributos usando spark.sql
* algumas colunas
* 2019 como ano-base
* empresas de código 23 e 1505 (Reunidas)
* movimentação de passageiros superior a 10.000
* modalidade (âmbito) do serviço rodoviário
* ordenação decrescente do total de passageiros

In [None]:
spark.sql("""

SELECT ano AS Ano, empresa AS Empresa, CONCAT(ori_municipio_nome, '-', des_municipio_nome) AS Linha, ori_des_localidade_nome AS `Origem-destino`, pax_total AS `Total de passageiros`
FROM tb
WHERE ano = 2019
AND (empresa LIKE '23%' OR empresa LIKE '1505%')
AND pax_total > 10000
AND servico_ambito = 'RODOVIARIO'
ORDER BY pax_total DESC

""").show(10, False)

## Seleção com atributos usando select do DataFrame
* algumas colunas

In [None]:
tb.select(
    "ano",
    "empresa",
    "prefixo",
    concat_ws("-", "ori_municipio_nome", "des_municipio_nome").alias("linha_ori_des_sie"),
    concat_ws("-", "ori_localidade_nome", "des_localidade_nome").alias("secao_ori_des_sie"),
    "pax_total"
).show(10, False)

## Seleção com atributos usando select do DataFrame
* algumas colunas
* concatenação de dados

In [None]:
tb.select(
    "ano",
    "empresa",
    struct("prefixo", concat_ws("-", "ori_municipio_nome", "des_municipio_nome").alias("linha_ori_des_sie")).alias("linha"),
    concat_ws("-", "ori_localidade_nome", "des_localidade_nome").alias("secao_ori_des_sie"),
    "pax_total"
).show(10, False)

##### _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ _ 

In [None]:
schema = StructType([
    StructField("ano", LongType(), True),
    StructField("codigo", StringType(), True),
    StructField("des_localidade_id", StringType(), True),
    StructField("des_localidade_nome", StringType(), True),
    StructField("des_localidade_uf", StringType(), True),
    StructField("des_municipio_id", StringType(), True),
    StructField("des_municipio_nome", StringType(), True),
    StructField("empresa", StringType(), True),
    StructField("empresa_cnpj", StringType(), True),
    StructField("empresa_situacao", StringType(), True),
    StructField("empresa_tipo", StringType(), True),
    StructField("ida_idoso_desconto", StringType(), True),
    StructField("ida_idoso_gratis", StringType(), True),
    StructField("ida_jovem_desconto", StringType(), True),
    StructField("ida_jovem_gratis", StringType(), True),
    StructField("ida_pagantes", StringType(), True),
    StructField("ida_passelivre", StringType(), True),
    StructField("km", StringType(), True),
    StructField("km_total", StringType(), True),
    StructField("linha", StringType(), True),
    StructField("linha_id", StringType(), True),
    StructField("lugares_idas", StringType(), True),
    StructField("lugares_voltas", StringType(), True),
    StructField("mes", StringType(), True),
    StructField("ori_des_localidade_nome", StringType(), True),
    StructField("ori_localidade_id", StringType(), True),
    StructField("ori_localidade_nome", StringType(), True),
    StructField("ori_localidade_uf", StringType(), True),
    StructField("ori_municipio_id", StringType(), True),
    StructField("ori_municipio_nome", StringType(), True),
    StructField("pax_gratis_descontos", StringType(), True),
    StructField("pax_idoso_desconto", StringType(), True),
    StructField("pax_idoso_gratis", StringType(), True),
    StructField("pax_jovem_desconto", StringType(), True),
    StructField("pax_jovem_gratis", StringType(), True),
    StructField("pax_pagantes", StringType(), True),
    StructField("pax_passelivre", StringType(), True),
    StructField("pax_total", StringType(), True),
    StructField("prefixo", StringType(), True),
    StructField("secao_id", StringType(), True),
    StructField("sequencial", StringType(), True),
    StructField("servico_ambito", StringType(), True),
    StructField("servico_tipo", StringType(), True),
    StructField("sisdap_fim", StringType(), True),
    StructField("sisdap_inicio", StringType(), True),
    StructField("viagem_idas", StringType(), True),
    StructField("viagem_voltas", StringType(), True),
    StructField("volta_idoso_desconto", StringType(), True),
    StructField("volta_idoso_gratis", StringType(), True),
    StructField("volta_jovem_desconto", StringType(), True),
    StructField("volta_jovem_gratis", StringType(), True),
    StructField("volta_pagantes", StringType(), True),
    StructField("volta_passelivre", StringType(), True)])