In [1]:
import os
from google.cloud import bigquery
import pandas as pd
import json

In [2]:
credentials = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] ="path to key"

In [3]:

def load_csv_to_bigquery(csv_path, project_id, table_name, schema_file_path):
    dataset_name = 'staging'
    # Create a BigQuery client using your service account key file
    #credentials = service_account.Credentials.from_service_account_file(key_path)
    client = bigquery.Client(project=project_id)

    # Read the CSV file into a Pandas dataframe
    df = pd.read_csv(csv_path, encoding='ISO-8859-1')

    # Replace spaces and dashes with underscores in column names
    df.columns = df.columns.str.lower().str.replace(' ', '_').str.replace('-', '_')

    # Create the BigQuery dataset if it doesn't exist
    dataset_ref = client.dataset(dataset_name)
    try:
        client.get_dataset(dataset_ref)
        print("Dataset {} already exists".format(dataset_name))
    except:
        print("Creating dataset {}".format(dataset_name))
        dataset = bigquery.Dataset(dataset_ref)
        client.create_dataset(dataset)

    # Set the destination table for the data
    table_ref = dataset_ref.table(table_name)

    # Define the schema of the table
    with open(schema_file_path) as schema_file:
        schema_json = json.load(schema_file)

    # Replace spaces and hyphens with underscores in field names
    for field in schema_json:
        field['name'] = field['name'].replace(' ', '_').replace('-', '_')

    # Create the schema field objects
    schema = [bigquery.SchemaField.from_api_repr(field) for field in schema_json]

    # Create the table in BigQuery
    table = bigquery.Table(table_ref, schema=schema)
    table = client.create_table(table)

    # Load the data into the table
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.skip_leading_rows = 1
    job_config.autodetect = False # Set to True to automatically detect schema, False to use schema defined above
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()

    print("Data uploaded to BigQuery successfully.")


In [4]:
#load data from local machine to bigquery using schema autodetect - staging area
def load_csv_to_bigquery_autodetect(csv_path, project_id, table_name):
    dataset_name = 'staging'
    # Create a BigQuery client 
    bq_client = bigquery.Client(project=project_id)

    # Create the BigQuery dataset if it doesn't exist
    dataset_ref = bq_client.dataset(dataset_name)
    try:
        bq_client.get_dataset(dataset_ref)
        print("Dataset {} already exists".format(dataset_name))
    except:
        print("Creating dataset {}".format(dataset_name))
        dataset = bigquery.Dataset(dataset_ref)
        bq_client.create_dataset(dataset)

    # Set the destination table for the data
    table_ref = dataset_ref.table(table_name)

    # Create the table in BigQuery with schema autodetection
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.skip_leading_rows = 1
    job_config.autodetect = True # Set to True to automatically detect schema

    with open(csv_path, "rb") as source_file:
        job = bq_client.load_table_from_file(
            source_file,
            table_ref,
            job_config=job_config
        )
        job.result()  # Wait for the job to complete.

    print("Data uploaded to BigQuery successfully.")


In [None]:

def load_xlsx_to_bigquery(xlsx_path, project_id, table_name, schema_file_path):
    dataset_name = 'staging'
    # Create a BigQuery client using your service account key file
    #credentials = service_account.Credentials.from_service_account_file(key_path)
    client = bigquery.Client(project=project_id)

    # Read the xlsx file into a Pandas dataframe
    df = pd.read_excel(xlsx_path, engine='openpyxl')

    # Remove time part from datetime columns
    for column in df.columns:
        if df[column].dtype == 'datetime64[ns]':
            df[column] = df[column].dt.date

    # Replace spaces with underscores in column names
    df.columns = df.columns.str.lower().str.replace(' ', '_').str.replace('-', '_')

    # Create the BigQuery dataset if it doesn't exist
    dataset_ref = client.dataset(dataset_name)
    try:
        client.get_dataset(dataset_ref)
        print("Dataset {} already exists".format(dataset_name))
    except:
        print("Creating dataset {}".format(dataset_name))
        dataset = bigquery.Dataset(dataset_ref)
        client.create_dataset(dataset)

    # Set the destination table for the data
    table_ref = dataset_ref.table(table_name)

    # Define the schema of the table
    with open(schema_file_path) as schema_file:
        schema_json = json.load(schema_file)

    # Replace spaces with underscores in schema field names
    for field in schema_json:
        field['name'] = field['name'].replace(' ', '_')

    # Create the schema field objects
    schema = [bigquery.SchemaField.from_api_repr(field) for field in schema_json]

    # Create the table in BigQuery
    table = bigquery.Table(table_ref, schema=schema)
    table = client.create_table(table)

    # Load the data into the table
    job_config = bigquery.LoadJobConfig()
    job_config.source_format = bigquery.SourceFormat.CSV
    job_config.skip_leading_rows = 1
    job_config.autodetect = False # Set to True to automatically detect schema, False to use schema defined above
    job = client.load_table_from_dataframe(df, table_ref, job_config=job_config)
    job.result()

    print("Data uploaded to BigQuery successfully.")


In [5]:
def clean_bigquery_table(project_id, table_id, remove_nulls=False, remove_duplicates=False, date_columns=None, columns_to_check=None):
    """
    Clean a BigQuery table by removing null values and/or duplicates, and rounding float columns.

    Args:
        project_id (str): The Google Cloud Project ID.
        table_id (str): The BigQuery table ID.
        remove_nulls (bool, optional): Whether to remove rows with null values. Defaults to False.
        columns_to_check (list, optional): List of columns to check for null values or duplicates. Defaults to None (all columns).
        remove_duplicates (bool, optional): Whether to remove duplicate rows. Defaults to False.
        date_columns (list, optional): List of columns to convert to date format. Defaults to None.
        float_rounding_decimal (int, optional): Number of decimal places to round float columns. Defaults to 2.

    Returns:
        None
    """
    client = bigquery.Client()
    table_ref = client.get_table(table_id)
    table = client.get_table(table_ref)

    if columns_to_check is None:
        columns_to_check = [field.name for field in table.schema]

    sql_base = f"SELECT * FROM `{table_id}`"
    sql_conditions = []

    if remove_nulls:
        not_null_conditions = [f"{column} IS NOT NULL" for column in columns_to_check]
        sql_conditions.append(" AND ".join(not_null_conditions))

    if remove_duplicates:
        row_number_clause = f", ROW_NUMBER() OVER (PARTITION BY {', '.join(columns_to_check)} ORDER BY {', '.join([field.name for field in table.schema])}) AS row_number"
        deduplicate_condition = "row_number = 1"
    else:
        row_number_clause = ""
        deduplicate_condition = "TRUE"

    if sql_conditions:
        sql_condition = "WHERE " + " AND ".join(sql_conditions)
    else:
        sql_condition = ""

    # Handle date column transformation, float column rounding, and make all columns lower case
    select_columns = []
    if date_columns is None:
        date_columns = []

    for column in table.schema:
        if column.field_type == "FLOAT":
            select_columns.append(f"ROUND({column.name}, {2}) AS {column.name.lower()}")
        elif column.name in date_columns:
            select_columns.append(f"CAST({column.name} AS DATE) AS {column.name.lower()}")
            select_columns.append(f"EXTRACT(YEAR FROM CAST({column.name} AS DATE)) AS {column.name.lower()}_year")
            select_columns.append(f"EXTRACT(MONTH FROM CAST({column.name} AS DATE)) AS {column.name.lower()}_month")
            select_columns.append(f"EXTRACT(DAY FROM CAST({column.name} AS DATE)) AS {column.name.lower()}_day")
        else:
            select_columns.append(column.name.lower())

    sql = f"SELECT {', '.join(select_columns)} FROM (SELECT {', '.join(select_columns)}{row_number_clause} FROM ({sql_base}) AS subquery {sql_condition}) AS subquery_with_row_number WHERE {deduplicate_condition}"


    # Execute the query and save the results to a new table
    new_table_id = f"{project_id}.{table_ref.dataset_id}.{table_ref.table_id}_cleaned"
    new_table_ref = client.dataset(table_ref.dataset_id).table(f"{table_ref.table_id}_cleaned")

    job_config = bigquery.QueryJobConfig(destination=new_table_ref)
    query_job = client.query(sql, job_config=job_config)
   

    query_job.result()

    print(f"Cleaned table saved as {new_table_id}.")


In [6]:
#creating a warehouse schema from json file
def create_warehouse_schema(project_id, json_path):
    # Initialize BigQuery client
    client = bigquery.Client(project=project_id)

    # Load the schema information from the JSON file
    with open(json_path, 'r') as f:
        schema_info = json.load(f)

    # Create a dataset named "warehouse" (if it doesn't already exist)
    dataset_id = "warehouse"
    dataset_ref = client.dataset(dataset_id)
    try:
        client.get_dataset(dataset_ref)
    except:
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = "US"
        dataset = client.create_dataset(dataset)

    # Define fact table schema
    fact_table_name = schema_info['fact_table_name']
    fact_table_columns = [bigquery.SchemaField(field['name'], field['type'], mode=field.get('mode', 'NULLABLE')) for field in schema_info['fact_table_columns']]
    fact_table_ref = client.dataset(dataset_id).table(fact_table_name)
    fact_table = bigquery.Table(fact_table_ref, schema=fact_table_columns)
    fact_table = client.create_table(fact_table)  # API request

    # Create dimension tables
    for dimension_table_name, dimension_table_info in schema_info['dimension_tables'].items():
        dimension_table_columns = [bigquery.SchemaField(field['name'], field['type'], mode=field.get('mode', 'NULLABLE')) for field in dimension_table_info]
        dimension_table_ref = client.dataset(dataset_id).table(dimension_table_name)
        dimension_table = bigquery.Table(dimension_table_ref, schema=dimension_table_columns)
        dimension_table = client.create_table(dimension_table)  # API request

    # Create fact-dimension mapping tables
    for fact_column, dimension_map in schema_info['fact_dimension_key_map'].items():
        for dimension_column, dimension_table_name in dimension_map.items():
            mapping_table_name = f"{fact_table_name}_{dimension_table_name}_{dimension_column}"
            mapping_table_columns = [
                bigquery.SchemaField(fact_column, 'STRING', mode='REQUIRED'),
                bigquery.SchemaField(dimension_column, 'STRING', mode='REQUIRED'),
            ]
            mapping_table_ref = client.dataset(dataset_id).table(mapping_table_name)
            mapping_table = bigquery.Table(mapping_table_ref, schema=mapping_table_columns)
            mapping_table = client.create_table(mapping_table)  # API request

    print("Warehouse schema created successfully.")


In [14]:
#loading data from staging to warehouse
def load_data_from_staging_to_warehouse(project_id, dataset_warehouse, dataset_staging, staging_table_id, warehouse_table_names):
    # Initialize BigQuery client
    client = bigquery.Client(project=project_id)

    # Iterate through the warehouse table names
    for warehouse_table_name in warehouse_table_names:
        # Get the schema of the warehouse table
        warehouse_table = client.get_table(f"{project_id}.{dataset_warehouse}.{warehouse_table_name}")
        warehouse_columns = [field.name for field in warehouse_table.schema]

        # Create a query to select and cast specific columns from the staging table
        source_columns = ', '.join([f"CAST({field.name} AS {field.field_type.replace('FLOAT', 'FLOAT64')}) AS {field.name}" for field in warehouse_table.schema])
        sql = f"""
            SELECT DISTINCT {source_columns}
            FROM `{project_id}.{dataset_staging}.{staging_table_id}`
        """

        # Create table references
        destination_table_ref = f"{project_id}.{dataset_warehouse}.{warehouse_table_name}"

        # Create a load job configuration
        job_config = bigquery.QueryJobConfig()
        job_config.destination = destination_table_ref
        job_config.write_disposition = bigquery.WriteDisposition.WRITE_TRUNCATE

        # Run the query job
        query_job = client.query(sql, job_config=job_config)
        query_job.result()

        print(f"Data transfered from {dataset_staging}.{staging_table_id} to {dataset_warehouse}.{warehouse_table_name}")


In [None]:
def join_staging_with_warehouse_tables(project_id, dataset_staging, staging_table_name, warehouse_dataset, warehouse_table_names):
    # Initialize BigQuery client
    client = bigquery.Client(project=project_id)

    # Iterate through the warehouse table names
    for warehouse_table_name in warehouse_table_names:
        # Create a query to update the warehouse table with the returned column from the staging table
        sql = f"""
            UPDATE `{project_id}.{warehouse_dataset}.{warehouse_table_name}` AS T
            SET T.returned = S.returned
            FROM `{project_id}.{dataset_staging}.{staging_table_name}` AS S
            WHERE T.order_id = S.order_id
        """

        # Run the query
        query_job = client.query(sql)
        query_job.result()

        print(f"Updated returned column in {warehouse_dataset}.{warehouse_table_name} using staging data.")

In [None]:
credentials = os.environ["GOOGLE_APPLICATION_CREDENTIALS"] ="path to key"
#loading data
csv_path = "path to csv file"
project_id = "your project id"
table_name = "superstore"
load_csv_to_bigquery(csv_path = csv_path, project_id = project_id, table_name = table_name)
#clean the table
table_id = "your_project_id.staging.superstore"
date_columns=["Order_Date", "Ship_Date"]
columns_to_check=["Customer_ID", "Order_Date", "Order_ID", "Product_ID"]
clean_bigquery_table(project_id = project_id, table_id = table_id, remove_nulls=True, remove_duplicates=True, date_columns=date_columns, columns_to_check=columns_to_check)

#creating warehouse schema from json file
json_path = "path to json file"
create_warehouse_schema(project_id = project_id, json_path = json_path)

#loading data from staging to warehouse
dataset_warehouse = "warehouse"
dataset_staging = "staging"
staging_table_id = "superstore_cleaned"
warehouse_table_names = ["date_dim", "customer_dim", "product_dim", "sales_fact","sales_fact_customer_dim_customer_id", 
                         "sales_fact_date_dim_order_date", "sales_fact_product_dim_product_id"]
load_data_from_staging_to_warehouse(project_id, dataset_warehouse, dataset_staging, staging_table_id, warehouse_table_names)

