In [1]:
import os
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = 'Key/pragmatic-bongo-404116-e2d94f71da27.json'

### Data Model
#### Fact Table: 
##### Collisions_Fact
- COLLISION_ID (Primary Key, String)
- DATE_ID (Foreign Key to DimDate)
- LOCATION_ID (Foreign Key to DimLocation, Int)
- NUMBER_OF_PERSONS_INJURED (Int)
- NUMBER_OF_PERSONS_KILLED (Int)
- NUMBER_OF_PEDESTRIANS_INJURED (Int)
- NUMBER_OF_PEDESTRIANS_KILLED (Int)
- NUMBER_OF_CYCLIST_INJURED (Int)
- NUMBER_OF_CYCLIST_KILLED (Int)
- NUMBER_OF_MOTORIST_INJURED (Int)
- NUMBER_OF_MOTORIST_KILLED (Int)
- CONTRIBUTING_FACTOR_VEHICLE_1 (String)
- CONTRIBUTING_FACTOR_VEHICLE_2 (String)
- CONTRIBUTING_FACTOR_VEHICLE_3 (String)
- CONTRIBUTING_FACTOR_VEHICLE_4 (String)
- CONTRIBUTING_FACTOR_VEHICLE_5 (String)
- VEHICLE_TYPE_CODE_1 (String)
- VEHICLE_TYPE_CODE_2 (String)
- VEHICLE_TYPE_CODE_3 (String)
- VEHICLE_TYPE_CODE_4 (String)
- VEHICLE_TYPE_CODE_5 (String)
- INJURIES (Boolean)

#### Dimension Tables:
##### Date Dimension (DimDate)
- DATE_ID (Primary Key, Int)
- YEAR (Int)
- MONTH (Int)
- DAY (Int)
- DATE (Date)
- CRASH_TIME (String)

##### Location Dimension (DimLocation)
- LOCATION_ID (Primary Key, Int)
- ZIP_CODE (Int)
- BOROUGH (String)
- LATITUDE (Float)
- LONGITUDE (Float)
- ON_STREET_NAME (String)
- CROSS_STREET_NAME (String)
- OFF_STREET_NAME (String)

#### Create BigQuery Dataset (Data Warehouse)

In [None]:
from google.cloud import bigquery
def create_bigquery_dataset(project_id, dataset_name):
    """Creates a BigQuery dataset."""
    bigquery_client = bigquery.Client(project=project_id)
    dataset_id = f"{project_id}.{dataset_name}"
    dataset = bigquery.Dataset(dataset_id)
    dataset.location = "US"
    bigquery_client.create_dataset(dataset)
    print(f"Dataset {dataset_id} created.")

project_id = 'pragmatic-bongo-404116'
dataset_name = 'motor_vehicle_collisions'  
create_bigquery_dataset(project_id, dataset_name)

#### Create Tables in BigQuery

In [38]:
from google.cloud import bigquery
from google.oauth2 import service_account

# Get the path to the service account key file from the environment variable
service_account_path = os.environ.get('GOOGLE_APPLICATION_CREDENTIALS')

# Set your Google Cloud credentials using the environment variable
credentials = service_account.Credentials.from_service_account_file(service_account_path)
# Initialize a BigQuery client
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

# Define your dataset and table names
dataset_name = 'motor_vehicle_collisions'
fact_table_name = 'Collisions_Fact'
date_dim_table_name = 'DimDate'
location_dim_table_name = 'DimLocation'

# Create the dataset
dataset_ref = client.dataset(dataset_name)
client.get_dataset(dataset_ref)

# Define the schema for the fact table
fact_table_schema = [
    bigquery.SchemaField('COLLISION_ID', 'STRING', mode='REQUIRED'),
    bigquery.SchemaField('DATE_ID', 'STRING'),
    bigquery.SchemaField('LOCATION_ID', 'INTEGER'),
    bigquery.SchemaField('NUMBER_OF_PERSONS_INJURED', 'INTEGER'),
    bigquery.SchemaField('NUMBER_OF_PERSONS_KILLED', 'INTEGER'),
    bigquery.SchemaField('NUMBER_OF_PEDESTRIANS_INJURED', 'INTEGER'),
    bigquery.SchemaField('NUMBER_OF_PEDESTRIANS_KILLED', 'INTEGER'),
    bigquery.SchemaField('NUMBER_OF_CYCLIST_INJURED', 'INTEGER'),
    bigquery.SchemaField('NUMBER_OF_CYCLIST_KILLED', 'INTEGER'),
    bigquery.SchemaField('NUMBER_OF_MOTORIST_INJURED', 'INTEGER'),
    bigquery.SchemaField('NUMBER_OF_MOTORIST_KILLED', 'INTEGER'),
    bigquery.SchemaField('CONTRIBUTING_FACTOR_VEHICLE_1', 'STRING'),
    bigquery.SchemaField('CONTRIBUTING_FACTOR_VEHICLE_2', 'STRING'),
    bigquery.SchemaField('CONTRIBUTING_FACTOR_VEHICLE_3', 'STRING'),
    bigquery.SchemaField('CONTRIBUTING_FACTOR_VEHICLE_4', 'STRING'),
    bigquery.SchemaField('CONTRIBUTING_FACTOR_VEHICLE_5', 'STRING'),
    bigquery.SchemaField('VEHICLE_TYPE_CODE_1', 'STRING'),
    bigquery.SchemaField('VEHICLE_TYPE_CODE_2', 'STRING'),
    bigquery.SchemaField('VEHICLE_TYPE_CODE_3', 'STRING'),
    bigquery.SchemaField('VEHICLE_TYPE_CODE_4', 'STRING'),
    bigquery.SchemaField('VEHICLE_TYPE_CODE_5', 'STRING'),
    bigquery.SchemaField('INJURIES', 'BOOL')
]

# Define the schema for the date dimension table
date_dim_table_schema = [
    bigquery.SchemaField('DATE_ID', 'INTEGER', mode='REQUIRED'),
    bigquery.SchemaField('YEAR', 'INTEGER'),
    bigquery.SchemaField('MONTH', 'INTEGER'),
    bigquery.SchemaField('DAY', 'INTEGER'),
    bigquery.SchemaField('DATE', 'DATE'),
    bigquery.SchemaField('CRASH_TIME', 'STRING')
]


# Define the schema for the location dimension table
location_dim_table_schema = [
    bigquery.SchemaField('LOCATION_ID', 'INTEGER', mode='REQUIRED'),
    bigquery.SchemaField('ZIP_CODE', 'INTEGER'),
    bigquery.SchemaField('BOROUGH', 'STRING'),
    bigquery.SchemaField("LATITUDE", "FLOAT"),
    bigquery.SchemaField("LONGITUDE", "FLOAT"),
    bigquery.SchemaField("ON_STREET_NAME", "STRING"),
    bigquery.SchemaField("CROSS_STREET_NAME", "STRING"),
    bigquery.SchemaField("OFF_STREET_NAME", "STRING")
]

# Create the tables
fact_table_ref = dataset_ref.table(fact_table_name)
try:
    client.get_table(fact_table_ref)
    print(f"Table {fact_table_name} already exists in the dataset {dataset_name}.")
except:
    fact_table = bigquery.Table(fact_table_ref, schema=fact_table_schema)
    client.create_table(fact_table)
    print(f"{fact_table_name} Created")

date_dim_table_ref = dataset_ref.table(date_dim_table_name)
try:
    client.get_table(date_dim_table_ref)
    print(f"Table {date_dim_table_name} already exists in the dataset {dataset_name}.")
except:
    date_dim_table = bigquery.Table(date_dim_table_ref, schema=date_dim_table_schema)
    client.create_table(date_dim_table)
    print(f"{date_dim_table_name} Created")

location_dim_table_ref = dataset_ref.table(location_dim_table_name)
try:
    client.get_table(location_dim_table_ref)
    print(f"Table {location_dim_table_name} already exists in the dataset {dataset_name}.")
except: 
    location_dim_table = bigquery.Table(location_dim_table_ref, schema=location_dim_table_schema)
    client.create_table(location_dim_table)
    print(f"{location_dim_table_name} Created")


Collisions_Fact Created
Table DimDate already exists in the dataset motor_vehicle_collisions.
Table DimTime already exists in the dataset motor_vehicle_collisions.
Table DimLocation already exists in the dataset motor_vehicle_collisions.


##### Load Data into BigQuery Tables

In [39]:
# Function to upload data to BigQuery from a DataFrame
def upload_data_from_dataframe(df, table_ref):
    job_config = bigquery.LoadJobConfig()
    job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE
    job_config.autodetect = True
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()  # Wait for the job to complete

# Split your DataFrame into the respective dimension and fact DataFrames
# fact_df, date_dim_df, time_dim_df, location_dim_df = split_your_dataframe(df_transformed)
def split_df(df):
    fact_cols = [
    "COLLISION_ID", "DATE_ID", "LOCATION_ID", 
    "NUMBER_OF_PERSONS_INJURED", "NUMBER_OF_PERSONS_KILLED", 
    "NUMBER_OF_PEDESTRIANS_INJURED", "NUMBER_OF_PEDESTRIANS_KILLED", 
    "NUMBER_OF_CYCLIST_INJURED", "NUMBER_OF_CYCLIST_KILLED", 
    "NUMBER_OF_MOTORIST_INJURED", "NUMBER_OF_MOTORIST_KILLED", 
    "CONTRIBUTING_FACTOR_VEHICLE_1", "CONTRIBUTING_FACTOR_VEHICLE_2", 
    "CONTRIBUTING_FACTOR_VEHICLE_3", "CONTRIBUTING_FACTOR_VEHICLE_4", 
    "CONTRIBUTING_FACTOR_VEHICLE_5", "VEHICLE_TYPE_CODE_1", 
    "VEHICLE_TYPE_CODE_2", "VEHICLE_TYPE_CODE_3", 
    "VEHICLE_TYPE_CODE_4", "VEHICLE_TYPE_CODE_5", "INJURIES"]
    
    date_cols = [
    "DATE_ID", "YEAR", "MONTH", "DAY", "CRASH_DATE", "CRASH_TIME"]
    

    location_cols = [
    "LOCATION_ID", "ZIP_CODE", "BOROUGH", "LATITUDE", 
    "LONGITUDE", "ON_STREET_NAME", "CROSS_STREET_NAME", "OFF_STREET_NAME"]

    fact_df = df[fact_cols]
    date_dim_df = df[date_cols]
    location_dim_df = df[location_cols]
    
    # Return the split DataFrames
    return fact_df, date_dim_df, location_dim_df

fact_df, date_dim_df, location_dim_df = split_df(df)

# Upload the data to BigQuery
upload_data_from_dataframe(fact_df, fact_table_ref)
upload_data_from_dataframe(date_dim_df, date_dim_table_ref)
upload_data_from_dataframe(location_dim_df, location_dim_table_ref)