In [2]:
import os
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col

spark = SparkSession.builder \
        .appName("ingestion-linhas") \
        .master("local[*]") \
        .config("spark.hadoop.fs.s3a.endpoint", os.getenv("S3_ENDPOINT")) \
        .config("spark.hadoop.fs.s3a.access.key", os.getenv("AWS_ACCESS_KEY_ID")) \
        .config("spark.hadoop.fs.s3a.secret.key", os.getenv("AWS_SECRET_ACCESS_KEY")) \
        .config("spark.hadoop.fs.s3a.path.style.access", "true") \
        .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
        .config("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider") \
        .getOrCreate()
    
today=datetime.now().strftime('%Y-%m-%d')

In [9]:
posicao=spark.read.option('header', 'true').csv(f's3a://silver/posicao_by_linha/csv/')
paradas=spark.read.option('header', 'true').csv(f's3a://silver/paradas_by_linhas/csv/')
linhas=spark.read.option('header', 'true').csv(f's3a://silver/linhas/csv/')

In [24]:
(paradas
    .withColumnRenamed('py', 'latitude_parada')
    .withColumnRenamed('px', 'longitude_parada'))

DataFrame[cl: string, cp: string, np: string, ed: string, latitude_parada: string, longitude_parada: string, date: string]

In [26]:
final=spark.read.option('header', 'true').csv(f's3a://gold/linhas/csv/')

In [27]:
final.show()

+------------+----------+-----------------+---------------+--------------+--------------------+-------------------+------------------+-------------+-----------+---------------+---------------+----------------+--------+--------------+-------+-------------+-----------------------+------------------------+
|codigo_linha|      date|hora_extracao_api|prefixo_veiculo|acessibilidade|  loc_capturada_hora|           latitude|         longitude|codigo_parada|nome_parada|endereco_parada|latitude_parada|longitude_parada|circular|prefixo_numero|sentido|sufixo_numero|letreiro_term_principal|letreiro_term_secundario|
+------------+----------+-----------------+---------------+--------------+--------------------+-------------------+------------------+-------------+-----------+---------------+---------------+----------------+--------+--------------+-------+-------------+-----------------------+------------------------+
|        2495|2024-11-04|            08:48|          10582|          true|2024-11-04T

In [21]:
paradas.printSchema()

root
 |-- cl: string (nullable = true)
 |-- cp: string (nullable = true)
 |-- np: string (nullable = true)
 |-- ed: string (nullable = true)
 |-- py: string (nullable = true)
 |-- px: string (nullable = true)
 |-- date: string (nullable = true)



In [22]:
posicao.printSchema()

root
 |-- cl: string (nullable = true)
 |-- hr: string (nullable = true)
 |-- p: string (nullable = true)
 |-- a: string (nullable = true)
 |-- ta: string (nullable = true)
 |-- py: string (nullable = true)
 |-- px: string (nullable = true)
 |-- date: string (nullable = true)



In [23]:
linhas.printSchema()

root
 |-- cl: string (nullable = true)
 |-- lc: string (nullable = true)
 |-- lt: string (nullable = true)
 |-- sl: string (nullable = true)
 |-- tl: string (nullable = true)
 |-- tp: string (nullable = true)
 |-- ts: string (nullable = true)
 |-- date: string (nullable = true)



In [12]:
df=(posicao
    .join(paradas, on=['cl', 'date'], how='left')
    .join(linhas, on=['cl', 'date'], how='left'))

In [20]:
df.dropDuplicates(['cl', 'date'])

DataFrame[cl: string, date: string, hr: string, p: string, a: string, ta: string, py: string, px: string, cp: string, np: string, ed: string, py: string, px: string, lc: string, lt: string, sl: string, tl: string, tp: string, ts: string]

In [18]:
df_treated=(df
             .withColumnRenamed('cl','codigo_linha')
             .withColumnRenamed('hr', 'hora_extracao_api')
             .withColumnRenamed('p', 'prefixo_veiculo')
             .withColumnRenamed('a', 'acessibilidade')
             .withColumnRenamed('ta', 'loc_capturada_hora')
             .withColumnRenamed('py', 'latitude')
             .withColumnRenamed('px', 'longitude')
             .withColumnRenamed('lc', 'circular')
             .withColumnRenamed('lt', 'prefixo_numero')
             .withColumnRenamed('sl', 'sentido')
             .withColumnRenamed('tp', 'letreiro_term_principal')
             .withColumnRenamed('ts', 'letreiro_term_secundario')
             .withColumnRenamed('cp', 'codigo_parada')
             .withColumnRenamed('np', 'nome_parada')
             .withColumnRenamed('ed', 'endereco_parada')
             .withColumnRenamed('tl', 'sufixo_numero')
           )

In [19]:
df_treated.printSchema()

root
 |-- codigo_linha: string (nullable = true)
 |-- date: string (nullable = true)
 |-- hora_extracao_api: string (nullable = true)
 |-- prefixo_veiculo: string (nullable = true)
 |-- acessibilidade: string (nullable = true)
 |-- loc_capturada_hora: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- codigo_parada: string (nullable = true)
 |-- nome_parada: string (nullable = true)
 |-- endereco_parada: string (nullable = true)
 |-- latitude: string (nullable = true)
 |-- longitude: string (nullable = true)
 |-- circular: string (nullable = true)
 |-- prefixo_numero: string (nullable = true)
 |-- sentido: string (nullable = true)
 |-- sufixo_numero: string (nullable = true)
 |-- letreiro_term_principal: string (nullable = true)
 |-- letreiro_term_secundario: string (nullable = true)

