In [None]:
pip install google-cloud-bigquery
pip install pyarrow

In [None]:
pip show pyarrow

In [None]:
import pandas as pd
from google.cloud import bigquery

# Initialize the BigQuery client
def get_bq_client():
    """Initialize and return the BigQuery client."""
    return bigquery.Client()

# Reading a CSV file 
def read_and_preprocess_csv(file_path):
    """Read CSV and preprocess the data by handling missing values and applying conversions."""
    df = pd.read_csv(file_path)

    # Replace NaN values with None (NULL)
    df = df.where(pd.notnull(df), None)

    # Return the dataframe after preprocessing
    return df

# Convert columns to datetime 
def convert_columns_to_datetime(df, datetime_columns):
    """Convert specified columns to datetime type."""
    for column in datetime_columns:
        if column in df.columns:
            df[column] = pd.to_datetime(df[column], errors='coerce')
    return df

# Apply schema conversions
def apply_schema_conversions(df, schema_dict):
    """Apply the specified schema conversions to the DataFrame."""
    schema_dict_filtered = {col: dtype for col, dtype in schema_dict.items() if col in df.columns}
        # Convert integer columns to nullable Int64 type
    for col, dtype in schema_dict_filtered.items():
        if dtype == int:  # Check if the column is supposed to be an integer
            df[col] = df[col].astype('Int64')  # Use pandas' nullable Int64 type
        else:
            df[col] = df[col].astype(dtype)  # Regular casting for other types

    #df = df.astype(schema_dict_filtered)
    return df

# Load the DataFrame into BigQuery
def load_data_to_bq(df, project_id, dataset_id, table_name):
    """Load the data to a BigQuery table."""
    destination_table = f"{project_id}.{dataset_id}.{table_name}"
    bq_client = get_bq_client()

    # Load the DataFrame into BigQuery table
    job = bq_client.load_table_from_dataframe(df, destination_table)
    job.result()  # Wait for the job to complete

    print(f"Loaded {job.output_rows} rows into {destination_table}")

# Map CSV files to their BigQuery table names
def map_file_to_table(csv_files):
    """Map file paths to BigQuery table names."""
    table_name_mapping = {
        '../data/olist_customers_dataset.csv': 'customers',
        '../data/olist_geolocation_dataset.csv': 'geolocation',
        '../data/olist_order_items_dataset.csv': 'order_items',
        '../data/olist_order_payments_dataset.csv': 'order_payments',
        '../data/olist_order_reviews_dataset.csv': 'order_reviews',
        '../data/olist_orders_dataset.csv': 'orders',
        '../data/olist_products_dataset.csv': 'products',
        '../data/olist_sellers_dataset.csv': 'sellers',
        '../data/product_category_name_translation.csv': 'product_category_name_translation',
    }
    return {file_path: table_name_mapping[file_path] for file_path in csv_files}

# Process each CSV file
def process_csv_files(csv_files, schema_dict, project_id, dataset_id):
    """Loop through each CSV file and process it."""
    for file_path in csv_files:
        # Read the CSV into a pandas DataFrame
        df = read_and_preprocess_csv(file_path)

        # Convert datetime columns
        datetime_columns = [
            'shipping_limit_date', 'review_creation_date', 'review_answer_timestamp',
            'order_purchase_timestamp', 'order_approved_at', 'order_delivered_carrier_date',
            'order_delivered_customer_date', 'order_estimated_delivery_date'
        ]
        df = convert_columns_to_datetime(df, datetime_columns)

        # Apply schema conversions 
        df = apply_schema_conversions(df, schema_dict)

        # Get table name 
        table_name = map_file_to_table(csv_files).get(file_path)

        # Load data into BigQuery
        load_data_to_bq(df, project_id, dataset_id, table_name)

# Define BigQuery dataset and table name
project_id = 'project-brazilian-ecommerce'
dataset_id = 'ecommerce_data'

# Define schema for the DataFrame
schema_dict = {
    'customer_id': str,
    'customer_unique_id': str,
    'customer_zip_code_prefix': str,
    'customer_city': str,
    'customer_state': str,
    'geolocation_zip_code_prefix': str,
    'geolocation_lat': float,
    'geolocation_lng': float,
    'geolocation_city': str, 
    'geolocation_state': str,
    'order_id': str,
    'order_item_id': int,
    'product_id': str,
    'seller_id': str,
    'shipping_limit_date': 'datetime64[ns]',
    'price': float,
    'freight_value': float,
    'payment_sequential': int, 
    'payment_type': str,
    'payment_installments': int,
    'payment_value': float,    
    'review_id': str,
    'review_score': int,
    'review_comment_title': str,
    'review_comment_message': str,
    'review_creation_date': 'datetime64[ns]',
    'review_answer_timestamp': 'datetime64[ns]',
    'order_status': str,
    'order_purchase_timestamp': 'datetime64[ns]',
    'order_approved_at': 'datetime64[ns]',
    'order_delivered_carrier_date': 'datetime64[ns]',
    'order_delivered_customer_date': 'datetime64[ns]',
    'order_estimated_delivery_date': 'datetime64[ns]',
    'product_category_name': str,
    'product_name_lenght': int,
    'product_description_lenght': int,
    'product_photos_qty': int,
    'product_weight_g': int,
    'product_length_cm': int,
    'product_height_cm': int,
    'product_width_cm': int,
    'seller_zip_code_prefix': str,
    'seller_city': str,
    'seller_state': str,
    'product_category_name_english': str
}

# List of CSV files to process
csv_files = [
    '../data/olist_customers_dataset.csv',
    '../data/olist_geolocation_dataset.csv',
    '../data/olist_order_items_dataset.csv',
    '../data/olist_order_payments_dataset.csv',
    '../data/olist_order_reviews_dataset.csv',
    '../data/olist_orders_dataset.csv',
    '../data/olist_products_dataset.csv',
    '../data/olist_sellers_dataset.csv',
    '../data/product_category_name_translation.csv',
]

# Process all CSV files
process_csv_files(csv_files, schema_dict, project_id, dataset_id)

Loaded 99441 rows into project-brazilian-ecommerce.ecommerce_data.customers
Loaded 1000163 rows into project-brazilian-ecommerce.ecommerce_data.geolocation
Loaded 112650 rows into project-brazilian-ecommerce.ecommerce_data.order_items
Loaded 103886 rows into project-brazilian-ecommerce.ecommerce_data.order_payments
Loaded 99224 rows into project-brazilian-ecommerce.ecommerce_data.order_reviews
Loaded 99441 rows into project-brazilian-ecommerce.ecommerce_data.orders
Loaded 32951 rows into project-brazilian-ecommerce.ecommerce_data.products
Loaded 3095 rows into project-brazilian-ecommerce.ecommerce_data.sellers
Loaded 71 rows into project-brazilian-ecommerce.ecommerce_data.product_category_name_translation
