In [16]:
from pyspark.sql import SparkSession
jdbc_driver_path = "/util/jar/postgresql-42.5.1.jar"

In [17]:
# spark = SparkSession.builder \
#     .appName("Escrita no PostgreSQL") \
#     .config("spark.jars", jdbc_driver_path) \
#     .getOrCreate()

url = "jdbc:postgresql://db:5432/SPTRANS"
properties = {
    "user": "admin",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode, col

spark = SparkSession.builder \
    .appName('GOLD_PREV_CHEGADA') \
    .config('spark.sql.extensions','io.delta.sql.DeltaSparkSessionExtension') \
    .config('spark.sql.catalog.spark_catalog','org.apache.spark.sql.delta.catalog.DeltaCatalog') \
    .config("spark.jars", jdbc_driver_path) \
    .getOrCreate()

In [19]:
trusted_path_previsao_chegada = 's3a://trusted/previsao_chegada'
trusted_path_posicao_veiculo = 's3a://trusted/posicao_veiculo'

In [20]:
df_previsao_chegada = spark.read.format('delta').load(trusted_path_previsao_chegada)
df_posicao_veiculo = spark.read.format('delta').load(trusted_path_posicao_veiculo).select("ID_LINHA", "LETREIRO_LINHA", "LETREIRO_DESTINO_LINHA", "LETREIRO_ORIGEM_LINHA", "ID_VEICULO").distinct()

In [21]:
df_previsao_chegada_join = df_previsao_chegada.join(df_posicao_veiculo, col("ID_VEICULO") == col("PREFIXO_VEICULO"), "inner")

In [22]:
from pyspark.sql.functions import unix_timestamp, abs, desc
df_previsao_chegada2 = df_previsao_chegada_join.withColumn("diff_in_minute", (abs(unix_timestamp("dat_ref_carga","HH:mm")-unix_timestamp("horario_previsto_cheada","HH:mm"))/60))

In [34]:
df_previsao_chegada2.select("dat_ref_carga", "horario_previsto_cheada", "diff_in_minute").show()

+-------------+-----------------------+--------------+
|dat_ref_carga|horario_previsto_cheada|diff_in_minute|
+-------------+-----------------------+--------------+
|        21:13|                  21:23|          10.0|
|        21:07|                  21:37|          30.0|
|        21:07|                  21:42|          35.0|
|        21:07|                  21:12|           5.0|
|        21:07|                  21:43|          36.0|
|        21:07|                  21:40|          33.0|
|        21:07|                  21:47|          40.0|
|        21:07|                  21:49|          42.0|
|        21:07|                  21:38|          31.0|
|        21:07|                  21:45|          38.0|
|        21:07|                  21:33|          26.0|
|        21:07|                  21:46|          39.0|
|        21:09|                  21:36|          27.0|
|        21:09|                  21:35|          26.0|
|        21:09|                  21:33|          24.0|
|        2

In [23]:
df_insight_velocidade = df_previsao_chegada2.withColumn("velocidade_para_chegar", (col("distancia_km") * 60) / col("diff_in_minute"))

In [43]:
from pyspark.sql.functions import percentile_approx, lit, when
df_insight_velocidade = df_insight_velocidade.withColumn(
    "SITUACAO", 
    when(col("velocidade_para_chegar") > 40, lit("ATRASADO")).otherwise(
        when(col("velocidade_para_chegar") < 10, lit("ADIANTADA")).otherwise("OK")))

In [45]:
from pyspark.sql.functions import percentile_approx
df_velocidade_maior_40_parada = df_insight_velocidade.groupBy("ID_PARADA","nome_parada", "SITUACAO").agg(
    percentile_approx("velocidade_para_chegar", 0.5).alias("media_velocidade_chegada"),
    percentile_approx("diff_in_minute", 0.5).alias("MEDIA_DIFF_PREV_CHEGADA_MINUTOS"),
    percentile_approx("distancia_km", 0.5).alias("media_distancia_km")
).sort(desc("media_velocidade_chegada"))

df_velocidade_maior_40_LINHA = df_insight_velocidade.groupBy("LETREIRO_LINHA","LETREIRO_DESTINO_LINHA","LETREIRO_ORIGEM_LINHA", "SITUACAO").agg(
    percentile_approx("velocidade_para_chegar", 0.5).alias("media_velocidade_chegada"),
    percentile_approx("diff_in_minute", 0.5).alias("MEDIA_DIFF_PREV_CHEGADA_MINUTOS"),
    percentile_approx("distancia_km", 0.5).alias("media_distancia_km")
).sort(desc("media_velocidade_chegada"))

df_velocidade_maior_40_VEICULO = df_insight_velocidade.groupBy("ID_VEICULO", "SITUACAO").agg(
    percentile_approx("velocidade_para_chegar", 0.5).alias("media_velocidade_chegada"),
    percentile_approx("diff_in_minute", 0.5).alias("MEDIA_DIFF_PREV_CHEGADA_MINUTOS"),
    percentile_approx("distancia_km", 0.5).alias("media_distancia_km")
).sort(desc("media_velocidade_chegada"))


# df_velocidade_menor_10_parada = df_velocidade_menor_10.groupBy("ID_PARADA","nome_parada").agg(
#     percentile_approx("velocidade_para_chegar", 0.5).alias("media_velocidade_chegada"),
#     percentile_approx("diff_in_minute", 0.5).alias("MEDIA_DIFF_PREV_CHEGADA_MINUTOS"),
#     percentile_approx("distancia_km", 0.5).alias("media_distancia_km")
# ).sort(desc("MEDIA_DIFF_PREV_CHEGADA_MINUTOS"),"media_velocidade_chegada")

# df_velocidade_menor_10_linha = df_velocidade_menor_10.groupBy("LETREIRO_LINHA","LETREIRO_DESTINO_LINHA","LETREIRO_ORIGEM_LINHA").agg(
#     percentile_approx("velocidade_para_chegar", 0.5).alias("media_velocidade_chegada"),
#     percentile_approx("diff_in_minute", 0.5).alias("MEDIA_DIFF_PREV_CHEGADA_MINUTOS"),
#     percentile_approx("distancia_km", 0.5).alias("media_distancia_km")
# ).sort(desc("MEDIA_DIFF_PREV_CHEGADA_MINUTOS"),"media_velocidade_chegada")

# df_velocidade_menor_10_VEICULO = df_velocidade_menor_10.groupBy("ID_VEICULO").agg(
#     percentile_approx("velocidade_para_chegar", 0.5).alias("media_velocidade_chegada"),
#     percentile_approx("diff_in_minute", 0.5).alias("MEDIA_DIFF_PREV_CHEGADA_MINUTOS"),
#     percentile_approx("distancia_km", 0.5).alias("media_distancia_km_PONTOS")
# ).sort(desc("MEDIA_DIFF_PREV_CHEGADA_MINUTOS"),"media_velocidade_chegada")

In [48]:
silver_path_posicao_veiculo = 's3a://gold/'

In [51]:
tabela_maior_velocidade = "PREV"
tabela_maior_parada = f"{tabela_maior_velocidade}_PARADA"
df_velocidade_maior_40_parada.coalesce(1).write.csv(f"{silver_path_posicao_veiculo}{tabela_maior_parada}")

tabela_maior_linha = f"{tabela_maior_velocidade}_LINHA"
df_velocidade_maior_40_LINHA.coalesce(1).write.csv(f"{silver_path_posicao_veiculo}{tabela_maior_linha}")

tabela_maior_veiculo = f"{tabela_maior_velocidade}_VEICULO"
df_velocidade_maior_40_VEICULO.coalesce(1).write.csv(f"{silver_path_posicao_veiculo}{tabela_maior_veiculo}")

# tabela_menor_velocidade = "PREV_ADIANTADA"
# tabela_menor_parada = f"{tabela_menor_velocidade}_PARADA"
# df_velocidade_menor_10_parada.coalesce(1).write.csv(f"{silver_path_posicao_veiculo}{tabela_menor_parada}")

# tabela_menor_LINHA = f"{tabela_menor_velocidade}_LINHA"
# df_velocidade_menor_10_linha.coalesce(1).write.csv(f"{silver_path_posicao_veiculo}{tabela_menor_LINHA}")

# tabela_menor_VEICULO = f"{tabela_menor_velocidade}_VEICULO"
# df_velocidade_menor_10_VEICULO.coalesce(1).write.csv(f"{silver_path_posicao_veiculo}{tabela_menor_VEICULO}")

In [53]:
df_velocidade_maior_40_parada.show()

+---------+--------------------+--------+------------------------+-------------------------------+------------------+
|ID_PARADA|         nome_parada|SITUACAO|media_velocidade_chegada|MEDIA_DIFF_PREV_CHEGADA_MINUTOS|media_distancia_km|
+---------+--------------------+--------+------------------------+-------------------------------+------------------+
|930012776|                    |ATRASADO|        51.9871670689115|                            1.0|0.8664527844818584|
|820012014|                    |ATRASADO|       51.75115486208621|                            2.0|1.7250384954028737|
|820012486|                    |ATRASADO|       46.06949462120833|                            3.0|2.3034747310604167|
|100014589|                    |ATRASADO|       46.03501437116608|                            2.0|1.5345004790388692|
|540014578|                    |ATRASADO|      45.042774253573945|                            1.0|0.7507129042262324|
|860012123|                    |ATRASADO|      42.274863