In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, TimestampType, FloatType, DoubleType
import os

In [0]:
%run ../Utils/DBInstanceUtils

In [0]:
%run ../Utils/ProcessingUtils

In [0]:
%run ../Utils/FormatterUtils

In [0]:
%run ../Utils/FileUtils

In [0]:
def file_to_Df(path, extension, mode="FAILFAST"):
    try:
        df = (spark.read
        .format(extension)
        .option('mode', mode)
        .load(path))
    except Exception as e:
        raise e
    return df

In [0]:
instance = DBInstanceUtils('treinamento_formula1')
file1_name = 'qualifying_split_1'
file2_name = 'qualifying_split_2'
extension = 'json'

file1 = f'{file1_name}.{extension}'
file2 = f'{file2_name }.{extension}'

abs_path = f'{instance.get_filepath()}/qualifying/'.replace('file:','')

schema = {
    "constructorId": IntegerType(),
    "driverId": IntegerType(),
    "number": IntegerType(),
    "position": IntegerType(),
    "q1": StringType(),
    "q2": StringType(),
    "q3": StringType(),
    "qualifyId": IntegerType(),
    "raceId": IntegerType()
}

In [0]:
try:
    df_1 = file_to_Df(f'{instance.get_filepath()}/qualifying/{file1}', extension)
except Exception:
    print('CORRUPTED RECORDS DETECTED, ATTEMPTING TO FIX...')
    try:
      FileUtils.fix_corrupted_file(abs_path, file1, 'json')
    except Exception as fatal_error:
        dbutils.notebook.exit(f'READ FAILED!! - ABORTING PROGRAM\n{fatal_error}')
    finally:
      print('RECORDS FIXED.')

In [0]:
try:
    df_2 = file_to_Df(f'{instance.get_filepath()}/qualifying/{file2}', extension)
except Exception:
    print('CORRUPTED RECORDS DETECTED, ATTEMPTING TO FIX...')
    try:
      FileUtils.fix_corrupted_file(abs_path, file2, 'json')
    except Exception as fatal_error:
        dbutils.notebook.exit(f'READ FAILED!! - ABORTING PROGRAM\n{fatal_error}')
    finally:
      print('RECORDS FIXED.')

In [0]:
df = df_1.union(df_2)

df = (Refine(df)
      .enforce_schema(schema)
      .camelCase_to_snake_case()
      .load())

df.display()

In [0]:
filename    = 'pitstopsRaw'
extension   = 'delta'
file        = f'{filename}.{extension}'
DBInstanceUtils.set_dbfs_savepath(instance, 'mnt', f'formula1/bronze/pitstops', file)

In [0]:
df.write.format("delta").mode("overwrite").save(instance.get_dbfs_savepath())