<a href="https://colab.research.google.com/github/professorholowczak/Data_Warehousing/blob/main/etl_generate_date_dimension.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ETL Code Example for the Date Dimension in NYC 311 Data

This example code creates a date dimension table in Google BigQuery.


In [None]:
# Date Dimension
# If using the native Google BigQuery API module:
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import pandas as pd
import os
import pyarrow
import logging
from datetime import datetime

In [None]:
# If using a service account key file, save the path to that file in credentials.py and import credentials
import credentials

# If using Google CoLab, uncomment the following lines and Authenticate using CoLab
# from google.colab import auth
# auth.authenticate_user()
# print('Authenticated')
# Google Colab load modules for BigQuery
# %load_ext google.cloud.bigquery
# %load_ext google.colab.data_table

In [None]:
# Set the name of the dimension
dimension_name = 'date'

# Set the name of the surrogate key
surrogate_key = dimension_name+'_dim_id'

# Set the name of the business key
business_key = dimension_name+'_id'

# Set the GCP Project, dataset and table name
gcp_project = 'put your GCP project name here'
bq_dataset = 'nyc_311_complaints_dw'
table_name = dimension_name+'_dimension'
# Construct the full BigQuery path to the table
dimension_table_path = ".".join([gcp_project,bq_dataset,table_name])

# Set the path to the source data files. Use double-slash for Windows paths C:\\myfolder
# For Linux use forward slashes    /home/username/python_etl
# For Mac use forward slashes      /users/username/python_etl
file_source_path = 'c:\\Python_ETL'
file_source_path = 'C:\\Users\\rholo\\OneDrive\\Documents\\classes\\4400\\311'


In [None]:
# Set up logging
current_date = datetime.today().strftime('%Y%m%d')
log_filename = "_".join(["etl",dimension_name,current_date])+".log"
logging.basicConfig(filename=log_filename, encoding='utf-8', format='%(asctime)s %(message)s', level=logging.DEBUG)
logging.info("=========================================================================")
logging.info("Starting ETL Run for dimension "+dimension_name+" on date "+current_date)


In [None]:
def generate_date_dimension(start='2016-01-01', end='2026-12-31'):
    """
    generate_date_dimension
    Creates a calendar of all dates between 'start' and 'end'
    Then adds additional columns of day, week and month information in various formats
    See this for format details: https://docs.python.org/3/library/datetime.html#strftime-and-strptime-format-codes
    Returns a new dataframe
    """
    df = pd.DataFrame({"full_date": pd.date_range(start, end)})
    df["weekday_name"] = df.full_date.dt.strftime("%A")
    df["day_of_week"] = df.full_date.dt.strftime("%w")
    df["month_name"] = df.full_date.dt.strftime("%B")
    df["day_of_month"] = df.full_date.dt.strftime("%d")
    df["month_of_year"] = df.full_date.dt.strftime("%m")
    df["quarter"] = df.full_date.dt.quarter
    df["year"] = df.full_date.dt.strftime("%Y")
    return df


In [None]:
def create_bigquery_client(logging):
    """
    create_bigquery_client
    Creates a BigQuery client using the path to the service account key file
    for credentials.
    Returns the BigQuery client object
    """
    try:
        # If using  a service account key file create the client this way:
        bqclient = bigquery.Client.from_service_account_json(credentials.path_to_service_account_key_file)
        # If using Google Colab authentication then create the client this way:
        # bqclient = bigquery.Client(gcp_project)
        logging.info("Created BigQuery Client: %s",bqclient)
        return bqclient
    except Exception as err:
        logging.error("Failed to create BigQuery Client.", err)
        os._exit(-1)
    return bqclient


In [None]:
def upload_bigquery_table(logging, bqclient, table_path, write_disposition, df):
    """
    upload_bigquery_table
    Accepts a path to a BigQuery table, the write disposition and a dataframe
    Loads the data into the BigQuery table from the dataframe.
    for credentials.
    The write disposition is either
    write_disposition="WRITE_TRUNCATE"  Erase the target data and load all new data.
    write_disposition="WRITE_APPEND"    Append to the existing table
    """
    try:
        logging.info("Creating BigQuery Job configuration with write_disposition=%s", write_disposition)
        # Set up a BigQuery job configuration with the write_disposition.
        job_config = bigquery.LoadJobConfig(write_disposition=write_disposition)
        # Submit the job
        logging.info("Submitting the BigQuery job")
        job = bqclient.load_table_from_dataframe(df, table_path, job_config=job_config)
        # Show the job results
        logging.info("Job  results: %s",job.result())
    except Exception as err:
        logging.error("Failed to load BigQuery Table. %s", err)
        #os._exit(-1)


In [None]:
def bigquery_table_exists(bqclient, table_path):
    """
    bigquery_table_exists
    Accepts a path to a BigQuery table
    Checks if the BigQuery table exists.
    Returns True or False
    """
    try:
        bqclient.get_table(table_path)  # Make an API request.
        return True
    except NotFound:
        return False

In [None]:
def query_bigquery_table(logging, table_path, bqclient, surrogate_key):
    """
    query_bigquery_table
    Accepts a path to a BigQuery table and the name of the surrogate key
    Queries the BigQuery table but leaves out the update_timestamp and surrogate key columns
    Returns the dataframe
    """
    bq_df = pd.DataFrame
    sql_query = 'SELECT * EXCEPT ( update_timestamp, '+surrogate_key+') FROM `' + table_path + '`'
    logging.info("Running query: %s", sql_query)
    try:
        bq_df = bqclient.query(sql_query).to_dataframe()
    except Exception as err:
        logging.info("Error querying the table. %s", err)
    return bq_df

In [None]:
def add_surrogate_key(df, dimension_name='customers', offset=1):
    """
    add_surrogate_key
    Accepts a data frame and inserts an integer identifier as the first column
    Returns the modified dataframe
    """
    # Reset the index to count from 0
    df.reset_index(drop=True, inplace=True)
    # Add the new surrogate key starting from offset
    df.insert(0, dimension_name+'_dim_id', df.index+offset)
    return df

In [None]:
def add_update_date(df, current_date):
    """
    add_update_date
    Accepts a data frame and inserts the current date as a new field
    Returns the modified dataframe
    """
    df['update_date'] = pd.to_datetime(current_date)
    return df

In [None]:
def add_update_timestamp(df):
    """
    add_update_timestamp
    Accepts a data frame and inserts the current datetime as a new field
    Returns the modified dataframe
    """
    df['update_timestamp'] = pd.to_datetime('now', utc=True).replace(microsecond=0)
    return df

In [None]:
def build_new_table(logging, bqclient, dimension_table_path, dimension_name, df):
    """
    build_new_table
    Accepts a path to a dimensional table, the dimension name and a data frame
    Add the surrogate key and a record timestamp to the data frame
    Inserts the contents of the dataframe to the dimensional table.
    """
    logging.info("Target dimension table %s does not exit", dimension_table_path)
    # Add a surrogate key
    df = add_surrogate_key(df, dimension_name, 1)
    # Add the update timestamp
    df = add_update_timestamp(df)
    # Upload the dataframe to the BigQuery table
    upload_bigquery_table(logging, bqclient, dimension_table_path, "WRITE_TRUNCATE", df)

In [None]:
def insert_existing_table(logging, bqclient, dimension_table_path, dimension_name, surrogate_key, df):
    """
    insert_existing_table
    Accepts a path to a dimensional table, the dimension name and a data frame
    Compares the new data to the existing data in the table.
    Inserts the new/modified records to the existing table
    """
    bq_df = pd.DataFrame
    logging.info("Target dimension table %s exits. Checking for differences.", dimension_table_path)
    # Fetch the existing table
    bq_df = query_bigquery_table(logging, dimension_table_path, bqclient, surrogate_key)
    # Compare with the new data set
    new_records_df = pd.concat([df,bq_df]).drop_duplicates(keep=False)
    logging.info("Found %d new records.", new_records_df.shape[0])
    if new_records_df.shape[0] > 0:
        # Set the surrogate key for the new records. bq_df.shape[0] is number of records already in the database
        new_surrogate_key_value = bq_df.shape[0]+1
        new_records_df = add_surrogate_key(new_records_df, dimension_name, new_surrogate_key_value)
        # Add the current date for the new records
        new_records_df = add_update_timestamp(new_records_df)
        # Upload the new records into the dimension table
        upload_bigquery_table(logging, bqclient, dimension_table_path, "WRITE_APPEND", new_records_df)

In [None]:
if __name__ == "__main__":
    df = pd.DataFrame
    # Generate the date dimension - change the date range to match your data
    df = generate_date_dimension(start='2016-01-01', end='2026-12-31')
    # Create the BigQuery Client
    bqclient = create_bigquery_client(logging)
    # See if the target dimensional table exists
    target_table_exists = bigquery_table_exists(bqclient, dimension_table_path  )
    # If the target dimension table does not exist, load all of the data into a new table
    if not target_table_exists:
        build_new_table(logging, bqclient, dimension_table_path, dimension_name, df)
    # If the target table exists, then perform an incremental load
    if target_table_exists:
        print("Date dimension already exists. Will not overwrite it")