In [60]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pyspark.sql.functions as f
import pyspark.sql.types as t
import os
from functools import reduce
from datetime import datetime, time
from pyspark.sql import DataFrame
from pyspark.sql import SparkSession
from pyspark.sql.functions import lag
from pyspark.sql.window import Window
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import DoubleType
import pandas as pd
import math


In [61]:
spark = SparkSession.builder.master('local[*]') \
    .appName('Métricas') \
    .getOrCreate()

<p>1 – Quantidade de ônibus em operação</p>

In [62]:
directory_path = "/home/rodrigo/projetos/monitoramento_sptrans/data/datalake/prata/operacao_agrupada"
file_list = os.listdir(directory_path)
dataframes_agrupados = []
for item in file_list:
    if item.endswith('.parquet'):
        caminho_completo = os.path.join(directory_path, item)

        df = spark.read.parquet(caminho_completo)
        dataframes_agrupados.append(df)



In [63]:
soma = 0
for d in dataframes_agrupados:

    soma += d.count()
soma

1083996

In [64]:
def union_all(dfs):
    return reduce(DataFrame.unionAll, dfs)

In [65]:
dataframes_agrupados_completo = union_all(dataframes_agrupados)
dataframes_agrupados_completo.show()

+-------+-----------+--------------------+--------------------+--------+-------------+-----------------+--------------------+-----------------+--------------------+----------------------+-----------------+
|  LINHA|CODIGO_AREA|            CONSOCIO|             EMPRESA|HORA_API|DATA_EXTRACAO|LETREIRO_COMPLETO|CODIGO_IDENTIFICADOR|  LETREIRO_ORIGEM|    LETREIRO_DESTINO|QTDE_VEICULOS_OPERACAO|DATA_EXTRACAO_API|
+-------+-----------+--------------------+--------------------+--------+-------------+-----------------+--------------------+-----------------+--------------------+----------------------+-----------------+
|3054-10|          4|PÊSSEGO TRANSPORT...|PÊSSEGO TRANSPORT...|   08:50|   2023-09-15|          3054-10|               33743|  HOSP. SAPOPEMBA|        JD. PALANQUE|                     6|       2023-09-15|
|6026-10|          6|TRANSWOLFF TRANSP...|TRANSWOLFF TRANSP...|   08:50|   2023-09-15|          6026-10|                  87| TERM. STO. AMARO|          JD. ICARAÍ|            

In [66]:
dataframes_agrupados_completo.printSchema()

root
 |-- LINHA: string (nullable = true)
 |-- CODIGO_AREA: string (nullable = true)
 |-- CONSOCIO: string (nullable = true)
 |-- EMPRESA: string (nullable = true)
 |-- HORA_API: string (nullable = true)
 |-- DATA_EXTRACAO: date (nullable = true)
 |-- LETREIRO_COMPLETO: string (nullable = true)
 |-- CODIGO_IDENTIFICADOR: long (nullable = true)
 |-- LETREIRO_ORIGEM: string (nullable = true)
 |-- LETREIRO_DESTINO: string (nullable = true)
 |-- QTDE_VEICULOS_OPERACAO: long (nullable = true)
 |-- DATA_EXTRACAO_API: date (nullable = true)



In [67]:
dataframes_agrupados_completo = dataframes_agrupados_completo.drop('DATA_EXTRACAO_API')
dataframes_agrupados_completo.show()

+-------+-----------+--------------------+--------------------+--------+-------------+-----------------+--------------------+-----------------+--------------------+----------------------+
|  LINHA|CODIGO_AREA|            CONSOCIO|             EMPRESA|HORA_API|DATA_EXTRACAO|LETREIRO_COMPLETO|CODIGO_IDENTIFICADOR|  LETREIRO_ORIGEM|    LETREIRO_DESTINO|QTDE_VEICULOS_OPERACAO|
+-------+-----------+--------------------+--------------------+--------+-------------+-----------------+--------------------+-----------------+--------------------+----------------------+
|3054-10|          4|PÊSSEGO TRANSPORT...|PÊSSEGO TRANSPORT...|   08:50|   2023-09-15|          3054-10|               33743|  HOSP. SAPOPEMBA|        JD. PALANQUE|                     6|
|6026-10|          6|TRANSWOLFF TRANSP...|TRANSWOLFF TRANSP...|   08:50|   2023-09-15|          6026-10|                  87| TERM. STO. AMARO|          JD. ICARAÍ|                     5|
|857A-10|          7|      CONSÓRCIO KBPX|KBPX ADMINISTRAÇÃ.

In [68]:
dataframes_agrupados_completo.printSchema()

root
 |-- LINHA: string (nullable = true)
 |-- CODIGO_AREA: string (nullable = true)
 |-- CONSOCIO: string (nullable = true)
 |-- EMPRESA: string (nullable = true)
 |-- HORA_API: string (nullable = true)
 |-- DATA_EXTRACAO: date (nullable = true)
 |-- LETREIRO_COMPLETO: string (nullable = true)
 |-- CODIGO_IDENTIFICADOR: long (nullable = true)
 |-- LETREIRO_ORIGEM: string (nullable = true)
 |-- LETREIRO_DESTINO: string (nullable = true)
 |-- QTDE_VEICULOS_OPERACAO: long (nullable = true)



In [69]:
@f.udf(returnType=t.StringType())
def turno(hora: str):
    hora_formatada = datetime.strptime(hora, '%H:%M').time()
    if 0 <= hora_formatada.hour < 6:
        return 'Madrugada'
    elif 6 <= hora_formatada.hour < 12:
        return 'Manhã'
    elif 12 <= hora_formatada.hour < 18:
        return 'Tarde'
    else:
        return 'Noite'
        

In [70]:
turno('23:55')

Column<'turno(23:55)'>

In [71]:
dataframes_agrupados_completo = dataframes_agrupados_completo.withColumn('TURNO', turno(f.col('HORA_API'))) 

df_filter = dataframes_agrupados_completo.select(
    dataframes_agrupados_completo.CONSOCIO,
    dataframes_agrupados_completo.TURNO,
    dataframes_agrupados_completo.DATA_EXTRACAO ,
    dataframes_agrupados_completo.LETREIRO_ORIGEM,
    dataframes_agrupados_completo.LETREIRO_DESTINO,
    dataframes_agrupados_completo.HORA_API,
    dataframes_agrupados_completo.LETREIRO_COMPLETO,
    dataframes_agrupados_completo.QTDE_VEICULOS_OPERACAO,
    dataframes_agrupados_completo.CODIGO_IDENTIFICADOR,
    ).filter(
        (dataframes_agrupados_completo.DATA_EXTRACAO == '2023-09-15' ) & 
        (dataframes_agrupados_completo.TURNO == 'Noite') &
        (dataframes_agrupados_completo.LETREIRO_COMPLETO == '9047-10')
        )
df_filter.show(truncate=False)

[Stage 18:>                                                         (0 + 1) / 1]

+-----------------------+-----+-------------+---------------+----------------+--------+-----------------+----------------------+--------------------+
|CONSOCIO               |TURNO|DATA_EXTRACAO|LETREIRO_ORIGEM|LETREIRO_DESTINO|HORA_API|LETREIRO_COMPLETO|QTDE_VEICULOS_OPERACAO|CODIGO_IDENTIFICADOR|
+-----------------------+-----+-------------+---------------+----------------+--------+-----------------+----------------------+--------------------+
|CONSÓRCIO TRANSNOROESTE|Noite|2023-09-15   |LAPA           |JD. PAULISTANO  |18:00   |9047-10          |9                     |33651               |
|CONSÓRCIO TRANSNOROESTE|Noite|2023-09-15   |LAPA           |JD. PAULISTANO  |18:00   |9047-10          |5                     |883                 |
|CONSÓRCIO TRANSNOROESTE|Noite|2023-09-15   |LAPA           |JD. PAULISTANO  |18:05   |9047-10          |7                     |33651               |
|CONSÓRCIO TRANSNOROESTE|Noite|2023-09-15   |LAPA           |JD. PAULISTANO  |18:05   |9047-10      

                                                                                

In [72]:
df_filter.groupBy(['CONSOCIO', 'LETREIRO_COMPLETO',  'LETREIRO_ORIGEM', 'LETREIRO_DESTINO','HORA_API', 'TURNO']) \
    .agg(
        F.sum('QTDE_VEICULOS_OPERACAO') \
         .alias('QUANTIDADE_VEICULOS_OPERACAO')
    ) \
    .orderBy('HORA_API')\
    .show(truncate=False)

+-----------------------+-----------------+---------------+----------------+--------+-----+----------------------------+
|CONSOCIO               |LETREIRO_COMPLETO|LETREIRO_ORIGEM|LETREIRO_DESTINO|HORA_API|TURNO|QUANTIDADE_VEICULOS_OPERACAO|
+-----------------------+-----------------+---------------+----------------+--------+-----+----------------------------+
|CONSÓRCIO TRANSNOROESTE|9047-10          |LAPA           |JD. PAULISTANO  |18:00   |Noite|14                          |
|CONSÓRCIO TRANSNOROESTE|9047-10          |LAPA           |JD. PAULISTANO  |18:05   |Noite|13                          |
|CONSÓRCIO TRANSNOROESTE|9047-10          |LAPA           |JD. PAULISTANO  |18:10   |Noite|15                          |
|CONSÓRCIO TRANSNOROESTE|9047-10          |LAPA           |JD. PAULISTANO  |18:15   |Noite|14                          |
|CONSÓRCIO TRANSNOROESTE|9047-10          |LAPA           |JD. PAULISTANO  |18:20   |Noite|15                          |
|CONSÓRCIO TRANSNOROESTE|9047-10

In [73]:
directory_path = "/home/rodrigo/projetos/monitoramento_sptrans/data/datalake/prata/operacao_desagrupada"
file_list = os.listdir(directory_path)
dataframes_desagrupados = []
for item in file_list:
    if item.endswith('.parquet'):
        caminho_completo = os.path.join(directory_path, item)

        df = spark.read.parquet(caminho_completo)
        dataframes_desagrupados.append(df)



In [74]:
dataframes_desagrupados_completo = union_all(dataframes_desagrupados)
dataframes_desagrupados_completo = dataframes_desagrupados_completo.drop('LINHA')
dataframes_desagrupados_completo = dataframes_desagrupados_completo.withColumn('TURNO', turno(f.col('HORA_API')))
dataframes_desagrupados_completo.show(truncate=False)



+-----------+-------------------------------------------+-------------------------------------------+-------------+--------+-----------------+----------------+--------------------------+--------------+-----------------------------+-------------------+-------------------+-----------------+-----+
|CODIGO_AREA|CONSOCIO                                   |EMPRESA                                    |DATA_EXTRACAO|HORA_API|LETREIRO_COMPLETO|SENTIDO_OPERACAO|CODIGO_IDENTIFICADOR_LINHA|PREFIXO_ONIBUS|DATA_HORA_CAPTURA_LOCALIZACAO|LATITUDE           |LONGITUDE          |DATA_EXTRACAO_API|TURNO|
+-----------+-------------------------------------------+-------------------------------------------+-------------+--------+-----------------+----------------+--------------------------+--------------+-----------------------------+-------------------+-------------------+-----------------+-----+
|4          |PÊSSEGO TRANSPORTES LTDA                   |PÊSSEGO TRANSPORTES LTDA                   |2023-09-15 

                                                                                

In [79]:
dataframe_prefixo_onibus = dataframes_desagrupados_completo \
    .select(
        dataframes_desagrupados_completo.LATITUDE,
        dataframes_desagrupados_completo.LONGITUDE,
    ) \
    .filter(
        (dataframes_desagrupados_completo.PREFIXO_ONIBUS == '47399') &
        (dataframes_desagrupados_completo.TURNO == 'Manhã') &
        (dataframes_desagrupados_completo.DATA_EXTRACAO == '2023-09-15')
    )



dataframe_prefixo_onibus.show()



+-------------------+-------------------+
|           LATITUDE|          LONGITUDE|
+-------------------+-------------------+
|         -23.598638|         -46.416157|
|         -23.609925|         -46.407969|
|         -23.609623|        -46.4081455|
|         -23.609623|        -46.4081455|
|         -23.609623|        -46.4081455|
|         -23.609623|        -46.4081455|
|         -23.609623|        -46.4081455|
|         -23.609623|        -46.4081455|
|         -23.609623|        -46.4081455|
|        -23.5991505|-46.415709500000006|
|       -23.59419625|       -46.42337975|
|         -23.600084|        -46.4220605|
|-23.595737999999997|         -46.423317|
|         -23.607842|-46.478333500000005|
|-23.599889750000003|-46.484817250000006|
|         -23.592762|        -46.4877805|
|-23.600738999999997|-46.495580000000004|
|       -23.61005975|       -46.49281925|
|       -23.61071875|       -46.49325225|
|       -23.61071875|       -46.49325225|
+-------------------+-------------

                                                                                

In [87]:



# Função para calcular a distância de Haversine
def haversine(lat1, lon1, lat2, lon2):
    # Raio da Terra em quilômetros
    r = 6371.0

    # Converter graus em radianos
    lat1 = math.radians(lat1)
    lon1 = math.radians(lon1)
    lat2 = math.radians(lat2)
    lon2 = math.radians(lon2)

    # Diferença de latitude e longitude
    dlat = lat2 - lat1
    dlon = lon2 - lon1

    # Fórmula de Haversine
    a = math.sin(dlat / 2)**2 + math.cos(lat1) * math.cos(lat2) * math.sin(dlon / 2)**2
    c = 2 * math.atan2(math.sqrt(a), math.sqrt(1 - a))

    # Distância em quilômetros
    distance = r * c

    return distance

# Função para calcular a distância de Haversine usando pandas_udf
@pandas_udf(DoubleType())
def haversine_udf(lat, lon):
    distances = []
    for i in range(len(lat)):
        if i == 0:
            distances.append(0.0)  # Distância zero para o primeiro ponto
        else:
            distance = haversine(lat[i], lon[i], lat[i - 1], lon[i - 1])
            distances.append(distance)
    return pd.Series(distances)

# Adicionar coluna de distâncias ao DataFrame usando a função haversine_udf
df= dataframe_prefixo_onibus.withColumn("DISTANCIA", haversine_udf(dataframe_prefixo_onibus["LATITUDE"], dataframe_prefixo_onibus["LONGITUDE"]))

# Mostrar o DataFrame resultante
df.show()

# Somar as distâncias
soma_distancias = df.selectExpr("sum(DISTANCIA)").collect()[0][0]
print(f'Soma das distâncias: {soma_distancias:.2f} km')

# Encerrar a sessão Spark


                                                                                

+-------------------+-------------------+--------+--------------------+
|           LATITUDE|          LONGITUDE|DISTANCE|           DISTANCIA|
+-------------------+-------------------+--------+--------------------+
|         -23.598638|         -46.416157|       0|                 0.0|
|         -23.609925|         -46.407969|       0|  1.5070517678935427|
|         -23.609623|        -46.4081455|       0|0.038092870953506906|
|         -23.609623|        -46.4081455|       0|                 0.0|
|         -23.609623|        -46.4081455|       0|                 0.0|
|         -23.609623|        -46.4081455|       0|                 0.0|
|         -23.609623|        -46.4081455|       0|                 0.0|
|         -23.609623|        -46.4081455|       0|                 0.0|
|         -23.609623|        -46.4081455|       0|                 0.0|
|        -23.5991505|-46.415709500000006|       0|  1.3964325322674538|
|       -23.59419625|       -46.42337975|       0|  0.9562127650



Soma das distâncias: 20.22 km


                                                                                

In [82]:
from pyspark.sql.functions import col, lit, radians, sin, cos, sqrt, asin,when

In [81]:
def haversine_distance(lat1, lon1, lat2, lon2):
    # Raio da Terra em quilômetros
    R = 6371.0

    # Converter graus para radianos
    lat1_rad = radians(lat1)
    lon1_rad = radians(lon1)
    lat2_rad = radians(lat2)
    lon2_rad = radians(lon2)

    # Diferenças de latitude e longitude
    dlat = lat2_rad - lat1_rad
    dlon = lon2_rad - lon1_rad

    # Fórmula de Haversine
    a = sin(dlat / 2)**2 + cos(lat1_rad) * cos(lat2_rad) * sin(dlon / 2)**2
    c = 2 * asin(sqrt(a))

    # Calcular a distância
    distance = R * c

    return distance

# Adicione uma coluna com a distância acumulada
df = df.withColumn("DISTANCE", lit(0))  # Inicialize a coluna DISTANCE com 0
previous_row = None

In [85]:
dataframe_prefixo_onibus = dataframe_prefixo_onibus.withColumn("DISTANCE", lit(0))  # Inicialize a coluna DISTANCE com 0
previous_row = None
for row in dataframe_prefixo_onibus.rdd.collect():
    if previous_row is not None:
        cumulative_distance = previous_row["DISTANCE"] + haversine_distance(
            previous_row["LATITUDE"],
            previous_row["LONGITUDE"],
            row["LATITUDE"],
            row["LONGITUDE"]
        )
    else:
        cumulative_distance = 0.0

    df = dataframe_prefixo_onibus.withColumn("DISTANCE", when((col("LATITUDE") == row["LATITUDE"]) & (col("LONGITUDE") == row["LONGITUDE"]), cumulative_distance).otherwise(col("DISTANCE")))

    previous_row = row

# Exiba o DataFrame resultante
df.show()

                                                                                

TypeError: Invalid argument, not a string or column: -23.598638 of type <class 'float'>. For column literals, use 'lit', 'array', 'struct' or 'create_map' function.