In [1]:
from delta import DeltaTable
import requests
from pyspark.sql import SparkSession, Row, Window
from pyspark.sql.functions import *
from pyspark.sql.types import *
import time
import json
from pyspark.sql.functions import explode

spark = (SparkSession.builder
             .appName('lab') # Name the app
             .config("hive.metastore.uris", "thrift://metastore:9083") # Set external Hive Metastore
             .config("hive.metastore.schema.verification", "false") # Prevent some errors
             .config("spark.sql.repl.eagerEval.enabled", True)
             .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
             .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
             .enableHiveSupport()
             .getOrCreate())

In [11]:
spark.sql(" SELECT * FROM  delta.stage.posicao WHERE  linha.c = '208V-10'")

AnalysisException: spark_catalog requires a single-part namespace, but got [delta, stage]

In [31]:
# Carregando arquivos estaticos
#linhas
df_linhas = spark.read.csv("s3a://raw/GTFS/linhas",header=True)

#carrega paradas
df_paradas = spark.read.csv("s3a://raw/GTFS/paradas",header=True)

#carrega trips
df_trips = spark.read.csv("s3a://raw/GTFS/trips",header=True)

In [32]:
#df_empresa.show()

In [7]:
#carrega corredores
df_corredor = spark.read.json("s3a://raw/olhovivo/corredor")
df_corredor = df_corredor.select(\
                   col('cc').alias('Codigo_Parada') \
                   ,col('nc').alias('Nome_Parada'))
df_corredor = df_corredor.dropDuplicates(["Codigo_Parada", "Nome_Parada"])

#carrega empresas
df_empresa = spark.read.json("s3a://raw/olhovivo/empresa")
df_empresa = df_empresa.select(\
                   explode('e').alias('Empresa') \
                   ,col('hr').alias('Hora'))
window_spec = Window.partitionBy("Empresa.a").orderBy(col("Hora").desc())
df_empresa_tratado = df_empresa.withColumn("row_num", row_number().over(window_spec)) \
                       .filter(col("row_num") == 1) \
                       .drop("row_num")


#carrega posições
df_posicao = spark.read.json("s3a://raw/olhovivo/posicao")
df_posicao = df_posicao.select(\
                    col('hr').alias('Hora') \
                    ,explode('l').alias('Linha'))
                   

In [9]:
df_posicao.head(20)

[Row(Hora='09:50', Linha=Row(c='1765-10', cl=1782, lt0='METRÔ TUCURUVI', lt1='JD. CABUÇU', qv=5, sl=1, vs=[Row(a=True, p=22847, px=-46.569042, py=-23.466963, ta='2024-09-20T12:50:00Z'), Row(a=True, p=22845, px=-46.60007, py=-23.480282, ta='2024-09-20T12:50:01Z'), Row(a=True, p=22812, px=-46.569042, py=-23.466963, ta='2024-09-20T12:50:12Z'), Row(a=True, p=22846, px=-46.569042, py=-23.466963, ta='2024-09-20T12:50:14Z'), Row(a=True, p=22344, px=-46.573944, py=-23.459726, ta='2024-09-20T12:49:41Z')])),
 Row(Hora='09:50', Linha=Row(c='1744-10', cl=33312, lt0='METRÔ SANTANA', lt1='LAUZANE PAULISTA', qv=6, sl=2, vs=[Row(a=True, p=22578, px=-46.64165, py=-23.472612, ta='2024-09-20T12:50:12Z'), Row(a=True, p=22284, px=-46.637234, py=-23.476126, ta='2024-09-20T12:49:38Z'), Row(a=True, p=22738, px=-46.64178, py=-23.472807, ta='2024-09-20T12:50:04Z'), Row(a=True, p=22581, px=-46.638428, py=-23.487814, ta='2024-09-20T12:49:40Z'), Row(a=True, p=22577, px=-46.641495, py=-23.472376, ta='2024-09-20T12:

In [5]:
#df_empresa - df_empresa.withColumn("e",explode(df_empresa['e'])) 

In [6]:
df_posicao.printSchema()


#df_posicao.select(\
#                   explode('e').alias('Empresa') \
#                   ,col('hr').alias('Hora')).printSchema()

root
 |-- Hora: string (nullable = true)
 |-- Linha: struct (nullable = true)
 |    |-- c: string (nullable = true)
 |    |-- cl: long (nullable = true)
 |    |-- lt0: string (nullable = true)
 |    |-- lt1: string (nullable = true)
 |    |-- qv: long (nullable = true)
 |    |-- sl: long (nullable = true)
 |    |-- vs: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- a: boolean (nullable = true)
 |    |    |    |-- p: long (nullable = true)
 |    |    |    |-- px: double (nullable = true)
 |    |    |    |-- py: double (nullable = true)
 |    |    |    |-- ta: string (nullable = true)



In [25]:
#Delta Table Corredor
DeltaTable.createIfNotExists(spark) \
    .tableName("stage.corredor")\
    .addColumns(df_corredor.schema)\
    .execute()

deltaTable = DeltaTable.forName(spark, "stage.corredor")
#adiconar condicao no whenMatchedUpdateAll para o dado antigo nao sobrepor o dado mais recente
deltaTable.alias('destiny') \
.merge(
df_corredor.alias('source'),
'source.Codigo_Parada = destiny.Codigo_Parada'
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

<delta.tables.DeltaTable at 0x7f4aa4fc5ba0>

In [36]:
#Delta Table Empresa
DeltaTable.createIfNotExists(spark) \
    .tableName("stage.empresa")\
    .addColumns(df_empresa_tratado.schema)\
    .execute()

deltaTable = DeltaTable.forName(spark, "stage.empresa")
#adiconar condicao no whenMatchedUpdateAll para o dado antigo nao sobrepor o dado mais recente
deltaTable.alias('destiny') \
.merge(
df_empresa_tratado.alias('source'),
'source.Empresa.e = destiny.Empresa.e'
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

In [34]:
#df_empresa_tratado.show()

+--------------------+-----+
|             Empresa| Hora|
+--------------------+-----+
|{1, [{1, 37, GATO...|21:30|
|{2, [{2, 92, SAMB...|21:30|
|{3, [{3, 50, VIAÇ...|21:30|
|{4, [{4, 80, AMBI...|21:30|
|{5, [{5, 51, VIA ...|21:30|
|{6, [{6, 53, VIAÇ...|21:30|
|{7, [{7, 27, CAMP...|21:30|
|{8, [{8, 37, GATO...|21:30|
+--------------------+-----+



In [None]:
#Delta Table Posicao
DeltaTable.createIfNotExists(spark) \
    .tableName("stage.posicao")\
    .addColumns(df_posicao.schema)\
    .execute()

#adiconar condicao no whenMatchedUpdateAll para o dado antigo nao sobrepor o dado mais recente
deltaTable = DeltaTable.forName(spark, "stage.posicao")

deltaTable.alias('destiny') \
.merge(
df_posicao.alias('source'),
'source.Linha.cl = destiny.Linha.cl and source.Linha.sl = destiny.Linha.sl'
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

In [None]:
#Delta Table Linhas
DeltaTable.createIfNotExists(spark) \
    .tableName("stage.linhas")\
    .addColumns(df_linhas.schema)\
    .execute()


#adiconar condicao no whenMatchedUpdateAll para o dado antigo nao sobrepor o dado mais recente
deltaTable = DeltaTable.forName(spark, "stage.linhas")

deltaTable.alias('destiny') \
.merge(
df_linhas.alias('source'),
'source.route_id = destiny.route_id'
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

In [None]:
#Delta Table Paradas
DeltaTable.createIfNotExists(spark) \
    .tableName("stage.paradas")\
    .addColumns(df_paradas.schema)\
    .execute()

#adiconar condicao no whenMatchedUpdateAll para o dado antigo nao sobrepor o dado mais recente
deltaTable = DeltaTable.forName(spark, "stage.paradas")

deltaTable.alias('destiny') \
.merge(
df_paradas.alias('source'),
'source.stop_id = destiny.stop_id'
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

In [None]:
#Delta Table Trips
DeltaTable.createIfNotExists(spark) \
    .tableName("stage.trips")\
    .addColumns(df_trips.schema)\
    .execute()

#adiconar condicao no whenMatchedUpdateAll para o dado antigo nao sobrepor o dado mais recente
deltaTable = DeltaTable.forName(spark, "stage.trips")

deltaTable.alias('destiny') \
.merge(
df_trips.alias('source'),
'source.route_id = destiny.route_id and source.direction_id = destiny.direction_id'
) \
.whenMatchedUpdateAll() \
.whenNotMatchedInsertAll() \
.execute()

In [10]:
 #adiconar condicao no whenMatchedUpdateAll para o dado antigo nao sobrepor o dado mais recente

#Criando tabelas caso não existam
# try:
#     df_corredor.write.format("delta").saveAsTable("stage.corredor")
# except:
#     pass

In [16]:
# try:
#     df_empresa.write.format("delta").saveAsTable("stage.empresa")
# except:
#     pass

In [11]:
# df_empresa.printSchema()

root
 |-- Empresa: struct (nullable = true)
 |    |-- a: long (nullable = true)
 |    |-- e: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- a: long (nullable = true)
 |    |    |    |-- c: long (nullable = true)
 |    |    |    |-- n: string (nullable = true)
 |-- Hora: string (nullable = true)



In [5]:
# try:
#     df_posicao.write.format("delta").saveAsTable("stage.posicao")
# except:
#     pass

In [None]:
# window_spec = Window.partitionBy("Linha.c").orderBy(col("Hora").desc())
# df_empresa_tratado = df_empresa.withColumn("row_num", row_number().over(window_spec)) \
#                        .filter(col("row_num") == 1) \
#                        .drop("row_num")

In [9]:
#df_posicao.printSchema()
#df_posicao.show(truncate=False)

Hora,Linha
20:42,"{7060-10, 1889, T..."
20:42,"{3053-10, 973, ME..."
20:42,"{6041-10, 1218, S..."
20:42,"{9191-10, 33578, ..."
20:42,"{9009-10, 33645, ..."
20:42,"{3785-10, 33831, ..."
20:42,"{6450-10, 32827, ..."
20:42,"{875H-10, 1340, M..."
20:42,"{2727-10, 942, CO..."
20:42,"{3736-10, 33804, ..."


In [12]:
# try:
#     df_linhas.write.format("delta").saveAsTable("stage.linhas")
# except:
#     pass

In [11]:
#df_linhas.show()

+--------+---------+----------------+--------------------+----------+-----------+----------------+
|route_id|agency_id|route_short_name|     route_long_name|route_type|route_color|route_text_color|
+--------+---------+----------------+--------------------+----------+-----------+----------------+
| 1012-10|        1|         1012-10|Term. Jd. Britani...|         3|     509E2F|          FFFFFF|
| 1012-21|        1|         1012-21|Term. Jd. Britâni...|         3|     509E2F|          FFFFFF|
| 1015-10|        1|         1015-10|Term. Jd. Britâni...|         3|     509E2F|          FFFFFF|
| 1016-10|        1|         1016-10|Cem. Do Horto - S...|         3|     002F6C|          FFFFFF|
| 1017-10|        1|         1017-10|Perus - Conexão V...|         3|     509E2F|          FFFFFF|
| 1018-10|        1|         1018-10|Vl. Rosa - Metrô ...|         3|     002F6C|          FFFFFF|
| 1019-10|        1|         1019-10|Sol Nascente - Te...|         3|     509E2F|          FFFFFF|
| 1020-10|

In [15]:
# try:
#     df_paradas.write.format("delta").saveAsTable("stage.paradas")
# except:
#     pass

In [14]:
#df_paradas.show()

+-------+----------------+---------+----------+----------+
|stop_id|       stop_name|stop_desc|  stop_lat|  stop_lon|
+-------+----------------+---------+----------+----------+
|  18848|        Clínicas|     null|-23.554022|-46.671108|
|  18849|   Vila Madalena|     null|-23.546498|-46.691141|
|  18850|      Consolação|     null|-23.558094|-46.660205|
|  18851|       Conceição|     null|-23.635039|-46.641239|
|  18852|       Jabaquara|     null|-23.646033|-46.641028|
|  18853|       São Judas|     null|-23.625882|-46.640936|
|  18854|           Saúde|     null|-23.618245|-46.639139|
|  18855| Praça Da Árvore|     null|-23.610583|-46.637918|
|  18856|      Santa Cruz|     null|-23.598541|-46.636638|
|  18857|    Vila Mariana|     null|-23.589359|-46.634677|
|  18858|      Brigadeiro|     null|-23.567615|-46.649027|
|  18859|    Trianon-masp|     null|-23.563570|-46.653893|
|  18860|        Ana Rosa|     null|-23.581203|-46.638489|
|  18861|         Paraíso|     null|-23.575400|-46.64070

In [18]:
# try:
#     df_trips.write.format("delta").saveAsTable("stage.trips")
# except:
#     pass

In [17]:
#df_trips.show(20)

+--------+----------+---------+--------------------+------------+--------+
|route_id|service_id|  trip_id|       trip_headsign|direction_id|shape_id|
+--------+----------+---------+--------------------+------------+--------+
| 1012-10|       USD|1012-10-0|      Jd. Monte Belo|           0|   81072|
| 1012-10|       USD|1012-10-1|  Term. Jd. Britania|           1|   81073|
| 1012-21|       U__|1012-21-0|         Jd. Rosinha|           0|   81195|
| 1015-10|       USD|1015-10-0|Chác. Maria Trindade|           0|   81148|
| 1016-10|       USD|1016-10-0|  Shop. Center Norte|           0|   72283|
| 1016-10|       USD|1016-10-1|       Cem. Do Horto|           1|   82172|
| 1017-10|       USD|1017-10-0|   Conexão Vl. Iório|           0|   72355|
| 1017-10|       USD|1017-10-1|               Perus|           1|   72356|
| 1018-10|       USD|1018-10-0|       Metrô Santana|           0|   71029|
| 1018-10|       USD|1018-10-1|            Vl. Rosa|           1|   71030|
| 1019-10|       USD|1019

In [38]:
# df_teste.show()

+-------------+-------------------+
|Codigo_Parada|        Nome_Parada|
+-------------+-------------------+
|            9|Expresso Tiradentes|
|            3|    Inajar de Souza|
|           10|     Paes de Barros|
|            8|        Campo Limpo|
|            7|        Parelheiros|
|            2|        Santo Amaro|
|            1|           Pirituba|
+-------------+-------------------+



In [8]:
# spark.sql('show databases').show()

+---------+
|namespace|
+---------+
| business|
|  default|
|    stage|
+---------+



In [28]:
# spark.sql("show tables from stage").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|    stage| corredor|      false|
|    stage|  empresa|      false|
|    stage|  posicao|      false|
|    stage|   linhas|      false|
|    stage|  paradas|      false|
|    stage|    trips|      false|
+---------+---------+-----------+



In [35]:
#spark.sql("describe table stage.corredor_teste").show()
#spark.sql("drop table stage.empresa").show()

++
||
++
++



In [22]:
# DeltaTable.create(spark) \
#     .tableName("stage.teste_caio")\
#     .addColumns(df_corredor.schema)\
#     .execute()


<delta.tables.DeltaTable at 0x7f4aa4f2b940>

In [23]:
# deltaTable = DeltaTable.forName(spark, "stage.teste_caio")

# deltaTable.alias('destiny') \
# .merge(
# df_corredor.alias('source'),
# 'source.Codigo_Parada = destiny.Codigo_Parada'
# ) \
# .whenMatchedUpdateAll() \
# .whenNotMatchedInsertAll() \
# .execute()