<a href="https://colab.research.google.com/github/rabulhasan/CIS4400/blob/main/Untitled2.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:


# If using the native Google BigQuery API module:
from google.cloud import bigquery
from google.cloud.exceptions import NotFound
import pandas as pd
import os
import pyarrow
import logging
from datetime import datetime
import credentials
from google.oauth2 import service_account
# If using a service account key file, save the path to that file in credentials.py and import credentials
path_to_service_account_key_file = "path_to_service_key.json"
#!pip install credentials
# Set the name of the dimension
dimension_name = 'agency'

# Set the name of the surrogate key
surrogate_key = f"{dimension_name}_dim_id"

# Set the name of the business key
business_key = f'{dimension_name}_id'

# Set the GCP Project, dataset and table name
gcp_project = 'gcp_project_name'
bq_dataset = 'gcp_project_dataset'
table_name = f"{dimension_name}_dimension"
# Construct the full BigQuery path to the table
dimension_table_path = f"{gcp_project}.{bq_dataset}.{table_name}"

# Set the path to the source data files. Use double-slash for Windows paths C:\\myfolder
# For Linux use forward slashes    /home/username/python_etl
# For Mac use forward slashes      /users/username/python_etl
# file_source_path = 'c:\\Python_ETL'
# file_source_path = 'C:\\Users\\rholo\\OneDrive\\Documents\\classes\\4400\\311'
file_source_path = '/Contnet'
def transform_data(df: pd.DataFrame):
    """
    transform_data
    Accepts a data frame
    Performs any specific cleaning and transformation steps on the dataframe
    Returns the modified dataframe
    This function can be modified based on required changes
    """
    # Select the columns for this dimension
    column_list = ['agency','agency_name']
    df = df[column_list]
    # Remove duplicates
    df = df.drop_duplicates()
    return df
def create_bigquery_client():
    """
    create_bigquery_client
    Creates a BigQuery client using the path to the service account key file
    for credentials.
    Returns the BigQuery client object
    """
    try:
        # If authenticating using a service account key file, use the following code:
        # bqclient = bigquery.Client.from_service_account_json(credentials.path_to_service_account_key_file)
        # Google Colab authentication already completed
        bqclient = bigquery.Client(gcp_project)
        return bqclient
    except Exception as err:
        print("error")
        # os._exit(-1)
    return bqclient
def upload_bigquery_table(bqclient, table_path, write_disposition, df):
    """
    upload_bigquery_table
    Accepts a path to a BigQuery table, the write disposition and a dataframe
    Loads the data into the BigQuery table from the dataframe.
    for credentials.
    The write disposition is either
    write_disposition="WRITE_TRUNCATE"  Erase the target data and load all new data.
    write_disposition="WRITE_APPEND"    Append to the existing table
    """
    try:
        # Set up a BigQuery job configuration with the write_disposition.
        job_config = bigquery.LoadJobConfig(write_disposition=write_disposition)

        # Submit the job
        print(type(bqclient))
        job = bqclient.load_table_from_dataframe(df, table_path, job_config=job_config)
        # Show the job results
    except Exception as err:
        print(err)
        #os._exit(-1)
def bigquery_table_exists(bqclient, table_path):
    """
    bigquery_table_exists
    Accepts a path to a BigQuery table
    Checks if the BigQuery table exists.
    Returns True or False
    """
    try:
        bqclient.get_table(table_path)  # Make an API request.
        return True
    except NotFound:
        return False
def query_bigquery_table(table_path, bqclient, surrogate_key):
    """
    query_bigquery_table
    Accepts a path to a BigQuery table and the name of the surrogate key
    Queries the BigQuery table but leaves out the update_timestamp and surrogate key columns
    Returns the dataframe
    """
    bq_df = pd.DataFrame
    sql_query = 'SELECT * EXCEPT ( update_timestamp, '+surrogate_key+') FROM `' + table_path + '`'
    try:
        bq_df = bqclient.query(sql_query).to_dataframe()
    except Exception as err:
        print("error")
    return bq_df
def add_surrogate_key(df, dimension_name='customers', offset=1):
    """
    add_surrogate_key
    Accepts a data frame and inserts an integer identifier as the first column
    Returns the modified dataframe
    """
    # Reset the index to count from 0
    df.reset_index(drop=True, inplace=True)
    # Add the new surrogate key starting from offset
    df.insert(0, dimension_name+'_dim_id', df.index+offset)
    return df
def build_new_table(bqclient, dimension_table_path, dimension_name, df):
    """
    build_new_table
    Accepts a path to a dimensional table, the dimension name and a data frame
    Add the surrogate key and a record timestamp to the data frame
    Inserts the contents of the dataframe to the dimensional table.
    """
    # Add a surrogate key
    df = add_surrogate_key(df, dimension_name, 1)
    # Add the update timestamp
    # Upload the dataframe to the BigQuery table
    upload_bigquery_table(bqclient, dimension_table_path, "WRITE_TRUNCATE", df)
# Program main
# Load the CSV File into a dataframe
# Transform the Dataframe
# Create a BigQuery client
# See if the target dimension table exists
#    If not exists, load the data into a new table
#    If exists, insert new records into the table
if __name__ == "__main__":
    df = pd.DataFrame
    # Load in the data file
    with open(file_source_path, 'r') as data:
            df = pd.read_csv(data)
        # Set all of the column names to lower case letters
    df = df.rename(columns=str.lower)


    #df = load_csv_data_file(file_source_path, "my_311_data_WaterQuality.csv", df)
    # Transform the data
    df = transform_data(df)
    # Create the BigQuery Client
    # setup enviroment parameters to connect to BQ project
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_to_service_account_key_file

    # Construct a BigQuery client object
    bqclient = bigquery.Client()

    # See if the target dimensional table exists
    target_table_exists = bigquery_table_exists(bqclient, dimension_table_path  )

    # If the target dimension table does not exist, load all of the data into a new table
    if not target_table_exists:
        build_new_table( bqclient, dimension_table_path, dimension_name, df)
    # If the target table exists, then perform an incremental load

