In [55]:
from google.colab import auth
auth.authenticate_user()
print('Authenticated')


# Google Colab load modules for BigQuery
%load_ext google.cloud.bigquery
%load_ext google.colab.data_table


Authenticated
The google.cloud.bigquery extension is already loaded. To reload it, use:
  %reload_ext google.cloud.bigquery
The google.colab.data_table extension is already loaded. To reload it, use:
  %reload_ext google.colab.data_table


In [56]:
# ETL Weather  Facts
# If using the native Google BigQuery API module:
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
# import credentials
import pandas as pd
import os
import pyarrow
import logging
from datetime import datetime

In [57]:
df = pd.DataFrame
# Set the name of the fact table
fact_name = 'weather'

# Set the GCP Project, dataset and table name
gcp_project = 'cis4400project-384418'
bq_dataset = 'weather_dw'
table_name = fact_name + '_fact'
# Construct the full BigQuery path to the table
fact_table_path = ".".join([gcp_project,bq_dataset,table_name])
file_source_path = '/content'

In [58]:
# Set up logging
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)
current_date = datetime.today().strftime('%Y%m%d')
log_filename = "_".join(["etl",table_name,current_date])+".log"
logging.basicConfig(filename=log_filename, encoding='utf-8', format='%(asctime)s %(message)s', level=logging.DEBUG)
logging.info("=========================================================================")
logging.info(f"Starting ETL Run for {table_name} on date {current_date}")

In [59]:
def load_csv_data_file(logging, file_source_path, file_name, df):
    """
    load_csv_data_file
    Accepts a file source path and a file name
    Loads the file into a data frame
    Exits the program on error
    Returns the dataframe
    """
    file_source = os.path.join(file_source_path, file_name)
    logging.info("Reading source data file: %s",file_source)
    # Read in the source data file for the customers data
    try:
        df = pd.read_csv(file_source)
        # Set all of the column names to lower case letters
        df = df.rename(columns=str.lower)
        logging.info("Read %d records from source data file: %s",df.shape[0],file_source)
        return df
    except:
        logging.error(f"Failed to read file: {file_source}")
        # os._exit(-1)
    return df

In [60]:
def transform_data(logging, df):
    """
    transform_data
    Accepts a data frame
    Performs any specific cleaning and transformation steps on the dataframe
    Returns the modified dataframe
    """    
    # Convert the date_of_birth to a datetime64 data type. 2012-08-21 04:12:16.827
    logging.info("Managing data types.")
    df['date_of_birth'] = pd.to_datetime(df['date_of_birth'], format='%m/%d/%Y')
    # Convert the postal code into a string
    df['postal_code'] =  df['postal_code'].astype(str)
    return df

In [61]:
def create_bigquery_client(logging):
    """
    create_bigquery_client
    Creates a BigQuery client using the path to the service account key file
    for credentials.
    Returns the BigQuery client object
    """
    try:
        # bqclient = bigquery.Client.from_service_account_json(credentials.path_to_service_account_key_file)
        bqclient = bigquery.Client(gcp_project)        
        logging.info(f"Created BigQuery Client: {bqclient}")
        return bqclient
    except Exception as err:
        logging.error("Failed to create BigQuery Client.", err)
        # os._exit(-1)
    return bqclient

In [62]:
def upload_bigquery_table(logging, bqclient, table_path, write_disposition, df):
    """
    upload_bigquery_table
    Accepts a path to a BigQuery table, the write disposition and a dataframe
    Loads the data into the BigQuery table from the dataframe.
    for credentials.
    The write disposition is either
    write_disposition="WRITE_TRUNCATE"  Erase the target data and load all new data.   
    write_disposition="WRITE_APPEND"    Append to the existing table
    """
    try:
        job_config = bigquery.LoadJobConfig(write_disposition=write_disposition)
        # Submit the job
        job = bqclient.load_table_from_dataframe(df, table_path, job_config=job_config)  
        # Show the job results
        job.result()
    except Exception as err:
        logging.error("Failed to load BigQuery Table.", err)
        # os._exit(-1)


In [63]:
def bigquery_table_exists(table_path, bqclient):
    """
    bigquery_table_exists
    Accepts a path to a BigQuery table
    Checks if the BigQuery table exists.
    Returns True or False
    """    
    try:
        bqclient.get_table(table_path)  # Make an API request.
        return True
    except NotFound:
        # print("Table {} is not found.".format(table_id))
        return False

In [64]:
def query_bigquery_table(logging, table_path, bqclient, surrogate_key):
    """
    query_bigquery_table
    Accepts a path to a BigQuery table and the name of the surrogate key
    Queries the BigQuery table but leaves out the update_timestamp and surrogate key columns
    Returns the dataframe
    """    
    bq_df = pd.DataFrame
    # sql_query = 'SELECT * EXCEPT ( update_timestamp, '+surrogate_key+') FROM `' + table_path + '`'
    sql_query = 'SELECT * FROM `' + table_path + '`'
    logging.info("Running query: %s", sql_query)
    bq_df = bqclient.query(sql_query).to_dataframe()
    return bq_df

In [65]:
def add_surrogate_key(df, dimension_name='customers', offset=1):
    """
    add_surrogate_key  
    Accepts a data frame and inserts an integer identifier as the first column
    Returns the modified dataframe
    """
    # Reset the index
    df.reset_index(drop=True, inplace=True)
    # Add the new surrogate key starting from offset
    df.insert(0, dimension_name+'_dim_id', df.index+offset)
    return df

In [66]:
def add_update_date(df, current_date):
    """
    add_update_date
    Accepts a data frame and inserts the current date as a new field
    Returns the modified dataframe
    """
    df['update_date'] = pd.to_datetime(current_date)
    return df

In [67]:
def add_update_timestamp(df):
    """
    add_update_timestamp
    Accepts a data frame and inserts the current datetime as a new field
    Returns the modified dataframe
    """
    df['update_timestamp'] = pd.to_datetime('now', utc=True).replace(microsecond=0)
    return df

In [68]:
def build_new_table(logging, bqclient, table_path, df):
    """
    build_new_table
    Accepts a path to a dimensional table, the dimension name and a data frame 
    Add the surrogate key and a record timestamp to the data frame
    Inserts the contents of the dataframe to the dimensional table.
    """
    logging.info("Target table %s does not exit", table_path)
    upload_bigquery_table(logging, bqclient, table_path, "WRITE_TRUNCATE", df)

In [69]:
def insert_existing_table(logging, bqclient, table_path, df):
    """
    insert_existing_table
    Accepts a path to a dimensional table, the dimension name and a data frame 
    Compares the new data to the existing data in the table.
    Inserts the new/modified records to the existing table
    """
    logging.info("Target table %s exits. Appending records.", table_path)
    upload_bigquery_table(logging, bqclient, table_path, "WRITE_APPEND", df)

In [70]:
def dimension_lookup(logging, dimension_name='date', lookup_columns=['wdate'], df=df):
    """
    dimension_lookup
    Lookup the lookup_columns in the dimension_name and return the associated surrogate keys
    Returns dataframe augmented with the surrogate keys
    """
    bq_df = pd.DataFrame
    logging.info("Lookup dimension %s.", dimension_name)
    surrogate_key = dimension_name+"_dim_id"
    dimension_table_path = ".".join([gcp_project,bq_dataset,dimension_name+"_dimension"])
    # Fetch the existing table
    bq_df = query_bigquery_table(logging, dimension_table_path, bqclient, surrogate_key)
    print(bq_df)
    # Melt the dimension dataframe into an index with the lookup columns
    m = bq_df.melt(id_vars=lookup_columns, value_vars=surrogate_key)
    # print(m)
    # Rename the "value" column to the surrogate key column name
    m=m.rename(columns={"value":surrogate_key})
    # Merge with the fact table record
    df = df.merge(m, on=lookup_columns, how='left')
    # Drop the "variable" column and the lookup columns
    df = df.drop(columns=lookup_columns)
    df = df.drop(columns="variable")
    return df

In [71]:
def date_dimension_lookup(logging, dimension_name='date', lookup_column='create_date', df=df):
    """
    dimension_lookup
    Lookup the lookup_columns in the dimension_name and return the associated surrogate keys
    Returns dataframe augmented with the surrogate keys
    """
    bq_df = pd.DataFrame
    logging.info("Lookup date dimension on column %s.", lookup_column)
    surrogate_key = dimension_name+"_dim_id"
    dimension_table_path = ".".join([gcp_project,bq_dataset,dimension_name+"_dimension"])
    # Fetch the existing table
    bq_df = query_bigquery_table(logging, dimension_table_path, bqclient, surrogate_key)
    bq_df["full_date"] = pd.to_datetime(bq_df.full_date, format="%m/%d/%Y")
    # Return just the date portion
    bq_df["full_date"] = bq_df.full_date.dt.date
    
    # Extract the date from 'created_date' column
    df[lookup_column] = pd.to_datetime(df[lookup_column], format="%m/%d/%Y")
    # Strip off the time portion
    df[lookup_column+"_time"] = df[lookup_column].dt.strftime("%H:%M:%S")
    # Return just the date portion
    df[lookup_column] = df[lookup_column].dt.date
    
    # Melt the dimension dataframe into an index with the lookup columns
    m = bq_df.melt(id_vars='full_date', value_vars=surrogate_key)
    # Rename the "value" column to the surrogate key column name
    m=m.rename(columns={"value":lookup_column+"_dim_id"})
    
    # Merge with the fact table record on the created_date
    df = df.merge(m, left_on=lookup_column, right_on='full_date', how='left')

    # Drop the "variable" column and the lookup columns
    df = df.drop(columns=lookup_column)
    df = df.drop(columns="variable")
    df = df.drop(columns="full_date")
    return df

In [72]:
if __name__ == "__main__": 
    df = pd.DataFrame
    # Create the BigQuery Client
    bqclient = create_bigquery_client(logging)
    # Load in the data file
    # df = load_csv_data_file(logging, file_source_path, "weather_data_5_boroughs_daily.csv", df)
    df = load_csv_data_file(logging, file_source_path, "weather_data_5_boroughs_daily_2.csv", df)
    # print(df)

    # If city is empty, fill it in with NEW YORK
    ##df.city = df.city.fillna('NEW YORK')
    # Consider removing columns that we will never use  df.drop([....])

    # Lookup the agency dimension record  agency_dim_id
    #df = dimension_lookup(logging, dimension_name='agency', lookup_columns=['agency', 'agency_name'], df=df)
    # df = load_csv_data_file(logging, file_source_path, "weather_data_5_boroughs_daily.csv", df)
    # print(df)
    # Lookup the location dimension record  location_dim_id
    df = dimension_lookup(logging, dimension_name='location', lookup_columns=['borough', 'city', 'state', 'zipcode'], df=df)
    df = date_dimension_lookup(logging, dimension_name='date', lookup_column='wdate', df=df)

    # print(df)

    # A list of all of the surrogate keys and columns we will keep
    surrogate_keys=['location_dim_id','wdate_dim_id','temperature_max','temperature_avg','temperature_m']

    # Remove all of the other non-surrogate key columns
    df = df[surrogate_keys]

    # See if the target table exists
    target_table_exists = bigquery_table_exists(fact_table_path, bqclient )
    # If the target table does not exist, load all of the data into a new table
    if not target_table_exists:
        build_new_table(logging, bqclient, fact_table_path, df)
    # If the target table exists, then perform an incremental load    
    if target_table_exists:
        insert_existing_table(logging, bqclient, fact_table_path, df)

   location_dim_id  latitude  longitude        borough              city  \
0                1   40.8616   -73.8809          Bronx  Botanical Garden   
1                2   40.6215   -74.0096       Brooklyn     Dyker Heights   
2                3   40.7638   -73.9918      Manhattan          New York   
3                4   40.7557   -73.8831         Queens   Jackson Heights   
4                5   40.5674   -74.1343  Staten Island      Richmondtown   
5                6   40.5674   -74.1343  Staten Island      Richmondtown   

  state  zipcode          update_timestamp  
0    NY    10458 2023-05-18 16:20:19+00:00  
1    NY    11228 2023-05-18 16:20:19+00:00  
2    NY    10018 2023-05-18 16:20:19+00:00  
3    NY    11372 2023-05-18 16:20:19+00:00  
4    NY    10306 2023-05-18 16:20:19+00:00  
5    NY    10308 2023-05-18 16:20:19+00:00  


In [73]:
!tail  -45  etl__weather_fact_20230515.log

tail: cannot open 'etl__weather_fact_20230515.log' for reading: No such file or directory
