## Create raw table for the Formula 1 warehouse

## Load data files into BQ tables



### Create BQ dataset for storing data

In [None]:
from google.cloud import bigquery

project_id = "saffatandsourik"
dataset = "formula1_raw"
region = "us-central1"

bq_client = bigquery.Client()

dataset_id = bigquery.Dataset(f"{project_id}.{dataset}")
dataset_id.location = region
resp = bq_client.create_dataset(dataset_id, exists_ok=True)
print("Created dataset {}.{}".format(bq_client.project, resp.dataset_id))

Created dataset saffatandsourik.formula1_raw


### Common Functions

In [None]:
from google.cloud import bigquery

project_id = "saffatandsourik"
bucket = "formula1-ss"
parent_folder = "initial-loads"
region = "us-central1"
dataset = "formula1_raw"

bq_client = bigquery.Client()

def create_load_table_from_csv(folder, file_name, table, schema, delimiter=",", quote_character="\""):

  uri = f"gs://{bucket}/{parent_folder}/{folder}/{file_name}"
  table_id = f"{project_id}.{dataset}.{table}"

  table = bigquery.Table(table_id, schema=schema)
  table = bq_client.create_table(table, exists_ok=True)
  print("Created table {}".format(table.table_id))

  # remove the data_source and load_time fields before loading the data,
  # neither one is present in the csv
  del schema[-1]
  del schema[-1]
  print(schema)

  job_config = bigquery.LoadJobConfig(
        schema=schema,
        skip_leading_rows=1,
        source_format=bigquery.SourceFormat.CSV,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND,
        field_delimiter=delimiter,
        quote_character=quote_character,
        allow_jagged_rows=True,
        ignore_unknown_values=True
      )

  load_job = bq_client.load_table_from_uri(uri, table_id, job_config=job_config)
  load_job.result()

  destination_table = bq_client.get_table(table_id)
  print("Loaded {} rows.".format(destination_table.num_rows))


def create_load_table_from_json(folder, file_name, table, schema):

  table_id = f"{project_id}.{dataset}.{table}"

  table = bigquery.Table(table_id, schema=schema)
  table = bq_client.create_table(table, exists_ok=True)
  print("Created table {}".format(table.table_id))

  # remove the data_source and load_time fields before loading the data,
  # neither one is present in the json
  del schema[-1]
  del schema[-1]

  #print(schema)

  job_config = bigquery.LoadJobConfig(schema=schema,
      source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
      write_disposition = "WRITE_EMPTY"
  )

  uri = f"gs://{bucket}/{parent_folder}/{folder}/{file_name}"

  load_job = bq_client.load_table_from_uri(
      uri,
      table_id,
      location=region,
      job_config=job_config,
  )

  load_job.result()

  destination_table = bq_client.get_table(table_id)
  print("Loaded {} rows.".format(destination_table.num_rows))


### Create and load OpenF1 tables

In [None]:
folder = "openf1"
file_name = "drivers.csv"
table = "drivers_openf1"
delimiter = ","

schema = [
    bigquery.SchemaField("driver_number", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("broadcast_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("name_acronym", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("team_name", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("team_colour", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("first_name", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("last_name", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("headshot_url", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("country_code", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("session_key", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("meeting_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openf1'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]


create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table drivers_openf1
[SchemaField('driver_number', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('broadcast_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('full_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('name_acronym', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('team_name', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('team_colour', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('first_name', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('last_name', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('headshot_url', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('country_code', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('session_key', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('meeting_key', 'INTEGER', 'REQUIRED', None, None, (), None)]
Loaded 37144 rows.


In [None]:
folder = "openf1"
file_name = "meetings.csv"
table = "meetings"
delimiter = ","

schema = [
    bigquery.SchemaField("meeting_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("meeting_official_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("location", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("country_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("country_code", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("circuit_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("circuit_short_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("date_start", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("gmt_offset", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("meeting_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("meeting_code", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openf1'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]



create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table meetings
[SchemaField('meeting_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('meeting_official_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('location', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('country_key', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('country_code', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('country_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('circuit_key', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('circuit_short_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('date_start', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('gmt_offset', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('meeting_key', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('year', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('meeting_code', 'STRING', 'NULLABLE', None, None, (), None)]
Loaded 376 rows.


In [None]:
folder = "openf1"
file_name = "pit.csv"
table = "pit"
delimiter = ","

schema = [
    bigquery.SchemaField("pit_duration", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("lap_number", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("driver_number", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("date", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("session_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("meeting_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openf1'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table pit
[SchemaField('pit_duration', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('lap_number', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('driver_number', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('date', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('session_key', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('meeting_key', 'INTEGER', 'REQUIRED', None, None, (), None)]
Loaded 97648 rows.


In [None]:
folder = "openf1"
file_name = "sessions.csv"
table = "sessions"
delimiter = ","

schema = [
    bigquery.SchemaField("location", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("country_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("country_code", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("country_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("circuit_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("circuit_short_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("session_type", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("session_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("date_start", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("date_end", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("gmt_offset", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("session_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("meeting_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openf1'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table sessions
[SchemaField('location', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('country_key', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('country_code', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('country_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('circuit_key', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('circuit_short_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('session_type', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('session_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('date_start', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('date_end', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('gmt_offset', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('session_key', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('meeting_key', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('year', 'INTEGER', 'R

In [None]:
folder = "openf1"
file_name = "laps.csv"
table = "laps"
delimiter = ","

schema = [
    bigquery.SchemaField("meeting_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("session_key", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("driver_number", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("i1_speed", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("i2_speed", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("st_speed", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("date_start", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("lap_duration", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("is_pit_out_lap", "BOOLEAN", mode="REQUIRED"),
    bigquery.SchemaField("duration_sector_1", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("duration_sector_2", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("duration_sector_3", "FLOAT", mode="NULLABLE"),
    bigquery.SchemaField("segments_sector_1", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("segments_sector_2", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("segments_sector_3", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("lap_number", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openf1'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]


create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table laps
[SchemaField('meeting_key', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('session_key', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('driver_number', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('i1_speed', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('i2_speed', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('st_speed', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('date_start', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('lap_duration', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('is_pit_out_lap', 'BOOLEAN', 'REQUIRED', None, None, (), None), SchemaField('duration_sector_1', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('duration_sector_2', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('duration_sector_3', 'FLOAT', 'NULLABLE', None, None, (), None), SchemaField('segments_sector_1', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('segments_secto

### Create and load Ergast tables

In [None]:
folder = "ergast"
file_name = "races.csv"
table = "ergast_races"
delimiter = ","

schema = [
    bigquery.SchemaField("race_id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("round", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("circuit_id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("date", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("time", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("url", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("fp1_date", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("fp1_time", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("fp2_date", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("fp2_time", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("fp3_date", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("fp3_time", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("quali_date", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("quali_time", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("sprint_date", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("sprint_time", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'ergast'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)


Created table ergast_races
[SchemaField('race_id', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('year', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('round', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('circuit_id', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('date', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('time', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('url', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('fp1_date', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('fp1_time', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('fp2_date', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('fp2_time', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('fp3_date', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('fp3_time', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('quali_date',

In [None]:
folder = "ergast"
file_name = "lap_times.csv"
table = "ergast_lap_times"
delimiter = ","

schema = [
    bigquery.SchemaField("Year", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("Round", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("Race Name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("Circuit Name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("Location", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("Country", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("Race Date", "DATE", mode="REQUIRED"),
    bigquery.SchemaField("Lap Number", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("Driver ID", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("Position", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("Lap Time", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'ergast'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]



create_load_table_from_csv(folder, file_name, table, schema, delimiter)


Created table ergast_lap_times
[SchemaField('Year', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('Round', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('Race Name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('Circuit Name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('Location', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('Country', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('Race Date', 'DATE', 'REQUIRED', None, None, (), None), SchemaField('Lap Number', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('Driver ID', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('Position', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('Lap Time', 'STRING', 'REQUIRED', None, None, (), None)]
Loaded 25596 rows.


In [None]:
folder = "ergast"
file_name = "race_results.csv"
table = "race_results"
delimiter = ","

schema = [
    bigquery.SchemaField("season", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("round", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("race_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("date", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("time", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("circuit", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("circuit_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("location", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("country", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("number", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("position", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("position_text", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("points", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("laps", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("status", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("driver", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("permanent_number", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("code", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("given_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("family_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("date_of_birth", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("nationality", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("constructor", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("constructor_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("constructor_nationality", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("grid", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("total_time", "STRING", mode="REQUIRED"),  # Some missing values
    bigquery.SchemaField("fastest_lap_rank", "STRING", mode="REQUIRED"),  # Some missing values
    bigquery.SchemaField("fastest_lap_lap", "STRING", mode="REQUIRED"),  # Some missing values
    bigquery.SchemaField("fastest_lap_time", "STRING", mode="REQUIRED"),  # Some missing values
    bigquery.SchemaField("fastest_lap_speed", "STRING", mode="REQUIRED"),  # Some missing values
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'ergast'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]






create_load_table_from_csv(folder, file_name, table, schema, delimiter)


Created table race_results
[SchemaField('season', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('round', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('race_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('date', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('time', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('circuit', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('circuit_id', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('location', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('country', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('number', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('position', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('position_text', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('points', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('laps', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('st

In [None]:
folder = "ergast"
file_name = "drivers.csv"  # Ensure correct file name
table = "ergast_drivers"
delimiter = ","

schema = [
    bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("driver_id", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("permanent_number", "INTEGER", mode="NULLABLE"),
    bigquery.SchemaField("code", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("given_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("family_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("date_of_birth", "DATE", mode="REQUIRED"),
    bigquery.SchemaField("nationality", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("wikipedia_url", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'ergast'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]



create_load_table_from_csv(folder, file_name, table, schema, delimiter)



Created table ergast_drivers
[SchemaField('year', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('driver_id', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('permanent_number', 'INTEGER', 'NULLABLE', None, None, (), None), SchemaField('code', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('given_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('family_name', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('date_of_birth', 'DATE', 'REQUIRED', None, None, (), None), SchemaField('nationality', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('wikipedia_url', 'STRING', 'NULLABLE', None, None, (), None)]
Loaded 276 rows.


In [None]:
folder = "ergast"
file_name = "qualifying_results.csv"
table = "qualifying_results"
delimiter = ","

schema = [
    bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("round", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("race_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("circuit_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("date", "DATE", mode="REQUIRED"),
    bigquery.SchemaField("position", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("driver_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("driver_nationality", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("constructor_name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("constructor_nationality", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("q1_time", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("q2_time", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("q3_time", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'openf1'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)



### Upload Historical Car

In [None]:
folder = "historical_car"
file_name = "historical_car.csv"
table = "historical_cars"
delimiter = ","

schema = [
    bigquery.SchemaField("pos", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("driver", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("nationality", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("car", "STRING", mode="NULLABLE"),
    bigquery.SchemaField("pts", "FLOAT", mode="REQUIRED"),
    bigquery.SchemaField("year", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("code", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("_data_source", "STRING", mode="REQUIRED", default_value_expression="'historical_car'"),
    bigquery.SchemaField("_load_time", "TIMESTAMP", mode="REQUIRED", default_value_expression="CURRENT_TIMESTAMP"),
]

create_load_table_from_csv(folder, file_name, table, schema, delimiter)

Created table historical_cars
[SchemaField('pos', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('driver', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('nationality', 'STRING', 'REQUIRED', None, None, (), None), SchemaField('car', 'STRING', 'NULLABLE', None, None, (), None), SchemaField('pts', 'FLOAT', 'REQUIRED', None, None, (), None), SchemaField('year', 'INTEGER', 'REQUIRED', None, None, (), None), SchemaField('code', 'STRING', 'REQUIRED', None, None, (), None)]
Loaded 1660 rows.


### Create table for circuit unstructured data

In [None]:
from google.cloud import bigquery, storage
import json
import tempfile
import time

bq_client = bigquery.Client()
storage_client = storage.Client()

schema = [
    bigquery.SchemaField("circuit_name", "STRING"),
    bigquery.SchemaField("city", "STRING"),
    bigquery.SchemaField("country", "STRING"),
    bigquery.SchemaField("latitude", "FLOAT"),
    bigquery.SchemaField("longitude", "FLOAT"),
    bigquery.SchemaField("capacity", "INTEGER"),
    bigquery.SchemaField("fia_grade", "STRING"),
    bigquery.SchemaField("circuit_status", "STRING")
]

def load_json_batch(bucket_name, folder, table_id, schema, batch_size=10):
    bucket = storage_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder))

    if not blobs:
        return

    batch = []
    for blob in blobs:
        if blob.name.endswith(".json"):
            json_string = blob.download_as_text()
            batch.append(json_string)

            if len(batch) >= batch_size:
                upload_to_bigquery(batch, table_id, schema)
                batch = []
                time.sleep(10)

    if batch:
        upload_to_bigquery(batch, table_id, schema)

def upload_to_bigquery(json_batch, table_id, schema):
    with tempfile.NamedTemporaryFile("w", suffix=".json", delete=False) as temp_file:
        for json_string in json_batch:
            temp_file.write(json_string.strip() + "\n")

        temp_file_path = temp_file.name

    job_config = bigquery.LoadJobConfig(
        schema=schema,
        source_format=bigquery.SourceFormat.NEWLINE_DELIMITED_JSON,
        write_disposition=bigquery.WriteDisposition.WRITE_APPEND
    )

    with open(temp_file_path, "rb") as source_file:
        load_job = bq_client.load_table_from_file(source_file, table_id, job_config=job_config)
        load_job.result()

load_json_batch("formula1-ss", "initial-loads/racerpedia/llm_text_fixed/", "formula1_raw.circuits", schema)

In [None]:
import json
from google.cloud import storage

# Initialize Google Cloud Storage client
storage_client = storage.Client()

# Define GCS bucket and folder paths
bucket_name = "formula1-ss"
input_folder = "initial-loads/racerpedia/llm_text/"
output_folder = "initial-loads/racerpedia/llm_text_fixed/"

bucket = storage_client.bucket(bucket_name)
blobs = list(bucket.list_blobs(prefix=input_folder))

for blob in blobs:
    if blob.name.endswith(".json"):
        print(f"Processing {blob.name}")

        # Read JSON file from GCS
        json_data = blob.download_as_text()

        try:
            # Parse JSON content
            parsed_data = json.loads(json_data)

            # Convert to Newline Delimited JSON (NDJSON)
            if isinstance(parsed_data, dict):
                ndjson_data = json.dumps(parsed_data)
            elif isinstance(parsed_data, list):
                ndjson_data = "\n".join(json.dumps(obj) for obj in parsed_data)
            else:
                print(f"Skipping {blob.name}, invalid structure")
                continue

            # Upload corrected file to GCS
            new_blob = bucket.blob(output_folder + blob.name.split("/")[-1])
            new_blob.upload_from_string(ndjson_data, content_type="application/json")

            print(f"Uploaded corrected file: {new_blob.name}")

        except json.JSONDecodeError:
            print(f"Invalid JSON format in {blob.name}, skipping")


Processing initial-loads/racerpedia/llm_text/Albert_Park_Circuit_1_9.json
Uploaded corrected file: initial-loads/racerpedia/llm_text_fixed/Albert_Park_Circuit_1_9.json
Processing initial-loads/racerpedia/llm_text/Algarve_International_Circuit_1_13.json
Uploaded corrected file: initial-loads/racerpedia/llm_text_fixed/Algarve_International_Circuit_1_13.json
Processing initial-loads/racerpedia/llm_text/Australian_Grand_Prix_1_29.json
Uploaded corrected file: initial-loads/racerpedia/llm_text_fixed/Australian_Grand_Prix_1_29.json
Processing initial-loads/racerpedia/llm_text/Autódromo_Hermanos_Rodríguez_1_13.json
Uploaded corrected file: initial-loads/racerpedia/llm_text_fixed/Autódromo_Hermanos_Rodríguez_1_13.json
Processing initial-loads/racerpedia/llm_text/Bahrain_International_Circuit_1_13.json
Uploaded corrected file: initial-loads/racerpedia/llm_text_fixed/Bahrain_International_Circuit_1_13.json
Processing initial-loads/racerpedia/llm_text/Baku_City_Circuit_1_5.json
Uploaded corrected

In [None]:
%%bigquery
  ALTER TABLE formula1_raw.circuits
  ADD COLUMN _data_source STRING,
  ADD COLUMN _load_time TIMESTAMP


Query is running:   0%|          |

In [None]:
%%bigquery
  UPDATE formula1_raw.circuits
  SET _data_source = 'racerpedia',
      _load_time = CURRENT_TIMESTAMP()
  WHERE _data_source IS NULL OR _load_time IS NULL

Query is running:   0%|          |