In [0]:

import delta


def table_exists(catalog, database, table):
  count = (spark.sql(f"SHOW TABLES FROM {catalog}.{database}")
           .filter(f"database = '{database}' AND tableName = '{table}' ")
           .count())
  
  return count == 1

In [0]:
catalog = "bronze"
schema = "f1_world"
tableName = dbutils.widgets.get("tableName")
output_streaming = f"{source_path}streaming_source/"
source_path = f"/Volumes/raw/{schema}/"
TARGET_RACEID = 1141
files_split = ["constructor_results" ,"constructor_standings", "driver standings", "lap_times", "pit_stops", "qualifying", "results", "sprint_results"]

In [0]:
class Ingestor:

    def __init__(self, catalog, schemaname, tablename, data_format, target_race_id):
        self.catalog = catalog
        self.schemaname = schemaname
        self.tablename = tablename
        self.data_format = data_format
        self.target_race_id = target_race_id
        self.set_schema()



    def set_schema(self):
        self.data_schema = dbutils.import_schema(self.tablename)


    def load(self,path):
        df = (spark.read
            .format(self.data_format)
            .schema(self.data_schema)
            .options(header="true")
            .option("nullValue", "\\N")
            .load(path))
        return df
        
    def save_batch(self,df):
        (df.coalesce(1)
            .write
            .format("delta")
            .mode("overwrite")
            .saveAsTable(f"{self.catalog}.{self.schemanema}.{self.tablename}"))
        

    def save_streaming(self,df):
        (df.coalesce(1)
            .write
            .format("parquet")
            .mode("overwrite")
            .save(f"/Volumes/raw/{self.schemaname}/streaming_source.{self.tablename}"))


    def execute_batch(self,path):
        df = self.load(path)

        if self.target_race_id:
            print(f"Filtrando Batch sem a corrida {self.target_race_id}...")
            df_batch = df.filter(df.raceId != self.target_race_id)
            self.save_batch(df_batch)
        else:
            self.save_batch(df)

    def execute_streaming(self,path):
        df = self.load(path)
        
        if self.target_race_id:
            print(f"Filtrando Streaming da corrida {self.target_race_id}...")
            df_streaming = df.filter(df.raceId == self.target_race_id)
            self.save_streaming(df_streaming)
        else:
            self.save_streaming(df)

In [0]:
if not table_exists(catalog, schema, tableName):
  print(f"Criando tabela {tableName}")

  if tableName in files_split:

    df_split = (spark.read
          .format("csv")
          .options(header="true")
          .load(f"{source_path}{tableName}/"))
    
    # Consolidando full load sem a corrida alvo
    df_batch = df_split.filter(df_split.raceId != TARGET_RACEID)
    (df_batch.coalesce(1)
            .write
            .format("delta")
            .mode("overwrite")
            .saveAsTable(f"{catalog}.{schema}.{tableName}"))

    print("Tabela delta criado com sucesso")
    
    # Separando streaming
    df_streaming = df_split.filter(df_split.raceId == TARGET_RACEID)
    (df_streaming.coalesce(1)
            .write
            .format("parquet")
            .mode("overwrite")
            .save(f"{output_streaming}{tableName}"))

  else:
    df_full = (spark.read
          .format("csv")
          .options(header="true")
          .load(f"{source_path}{tableName}/"))

    (df_full.coalesce(1)
            .write
            .format("delta")
            .mode("overwrite")
            .saveAsTable(f"{catalog}.{schema}.{tableName}"))
else:
  print(f"Tabela {tableName} ja existe")
                                        
    