In [None]:
from google.colab import auth, files
import pandas as pd
import re
from google.cloud import storage
import os
from datetime import datetime
import pytz
import numpy as np
from google.cloud import bigquery

# Authenticate to GCS
auth.authenticate_user()

# Define the GCS bucket
bucket_name = 'prd-marketshare'
project_id = 'bq-rf5039'

# Set up GCS client
client = storage.Client()
bucket = client.get_bucket(bucket_name)

# 1.1 Function to replace cells with a single "#" with empty

def replace_single_symbol_with_na(df, *column_names):
    for column_name in column_names:
        if df[column_name].dtype == 'object':  # For non-numeric columns
            df[column_name] = df[column_name].apply(lambda x: "" if str(x) == '#' else x)
        else:  # For numeric columns
            df[column_name] = df[column_name].apply(lambda x: np.nan if str(x) == '#' else x)

    return df

# 1.2 Function to replace commas by dots and convert the column to float

def replace_commas_and_convert_to_float(df, *column_names):
    for column_name in column_names:
        # Ensure the column is of string type
        if not isinstance(df[column_name].iloc[0], str):
            df[column_name] = df[column_name].astype(str)

        # Perform the string replace and then convert to float
        df[column_name] = df[column_name].str.replace(',', '.').astype(float)
    return df

# 1.3 Function to convert dates into proper format

def convert_date(value):
    # Convert MM.YYYY format
    if value.count('.') == 1:
        try:
            # Parse the month and year
            month, year = value.split('.')
            last_day = pd.Timestamp(year=int(year), month=int(month), day=1).to_period('M').to_timestamp('M')
            return last_day.strftime('%Y-%m-%d')  # Adjusted to desired format
        except:
            return value

    # Convert DD.MM.YYYY format
    elif value.count('.') == 2:
        try:
            return pd.to_datetime(value, format='%d.%m.%Y').strftime('%Y-%m-%d')  # Adjusted to desired format
        except:
            return value
    # Return original if no match
    else:
        return value

def process_dates(df, *column_names):
    for column in column_names:
        df[column] = df[column].apply(lambda x: "{:.4f}".format(x) if not isinstance(x, str) else x)
        df[column] = df[column].apply(convert_date)
    return df

# Example usage
#df = pd.DataFrame({'date_column': ['12.2022', '15.12.2022']})
#processed_df = process_dates(df, 'date_column')
#print(processed_df)

# 1.4 Function to convert values of columns to int

def columns_to_int(df, *column_names):
    for column_name in column_names:
        # Convert column to Pandas nullable integer type
        df[column_name] = df[column_name].astype('Int64')
    return df

# 1.5 Function to lowercase all string columns

def lowercase_string_columns(df):
    # Identify string columns (object dtype)
    string_columns = df.select_dtypes(include=['object']).columns

    # Apply lowercase transformation only to string columns
    for column in string_columns:
        df[column] = df[column].apply(lambda x: x.lower() if isinstance(x, str) else x)

    return df


# 1.6 Function to get the latest file from the bucket folder

def get_latest_file(bucket, folder_path):
    # List all blobs in the specified folder
    blobs = bucket.list_blobs(prefix=folder_path)

    # Sort the blobs by their creation time (most recent first)
    sorted_blobs = sorted(blobs, key=lambda b: b.time_created, reverse=True)

    # Return the most recent blob, if available
    return sorted_blobs[0] if sorted_blobs else None

# 1.7 Function to save the file to proper folder in the bucket

def save_to_bucket(df, bucket, destination_folder, file_name):
    # Temporary saving the file locally
    local_file_path = f'/tmp/{file_name}'
    df.to_csv(local_file_path, index=False)

    # Upload the file to the specific folder in GCS
    blob = bucket.blob(f'{destination_folder}/{file_name}')
    blob.upload_from_filename(local_file_path)

    # Download the file (if needed)
    #files.download(local_file_path)

# 1.8 Format column names for BigQuery

def format_column_name(column_name):
    # Convert to lowercase
    formatted_name = column_name.lower()
    # Remove periods
    formatted_name = formatted_name.replace('.', '')
    # Replace colons, hyphens, and spaces with underscore
    formatted_name = re.sub(r'[:\- ]+', '_', formatted_name)
    # Delete closing parentheses
    formatted_name = formatted_name.replace(')', '')
    # Replace multiple consecutive non-alphanumeric characters (now including underscores) with a single underscore
    formatted_name = re.sub(r'[_\s/|()]+', '_', formatted_name)
    # Remove any leading/trailing underscores
    formatted_name = formatted_name.strip('_')
    # Prefix with 'volumes_' if the name starts with a number
    if formatted_name[0].isdigit():
        formatted_name = 'volumes_' + formatted_name
    return formatted_name

# 1.9 Delete nan

def convert_nan_to_none(df):
    # Iterate over the DataFrame and replace NaN with None
    for col in df.columns:
        df[col] = df[col].apply(lambda x: None if pd.isna(x) else x)
    return df

# 2 Define a list of configurations
file_configs = [
    {
        'source_folder': '01_pihs/01_pihs_raw/',
        'destination_folder': '01_pihs/02_pihs_cleaned',
        'file_format': 'excel',
        'processing_steps': [
            (replace_single_symbol_with_na, ['Transmission Manufacturer', 'Model Code', 'Creation Date|Calendar Year']),
            (replace_commas_and_convert_to_float, ['Electric Motor Power (kW)', 'System Voltage (V)', 'AP: Battery Capacity (kWh)', 'AP: System Torque (Nm)', 'Electric Motor Torque (Nm)', 'T: Torque (N.m)']),
            (process_dates, ['Engine EOP', 'Vehicle EOP (End of Production)', 'Vehicle SOP (Start of Production)','Creation Date|Calendar Year']),
            (columns_to_int, ['2020', '2021', '2022', '2023', '2024', '2025', '2026', '2027', '2028', '2029', '2030', '2031', '2032', '2033', '2034', '2035','Transmission Forward Speed', 'Powertrain ID','Vehicle ID']),
            (lowercase_string_columns, None),  # If the function applies to all columns, use None
            (format_column_name, None)
        ],
        'dtypes': {'Global Nameplate': str, 'Transmission Program': str, 'Vehicle Platform': str, 'Vehicle Program': str, 'Engine Platform Value': str, 'T: Program_2': str, 'Mnemonic Vehicle ID': str},
        'load_to_bigquery': False  # Add this to specify whether to load to BigQuery
    },
    {
        'source_folder': '02_value_list/01_vl_raw/',
        'destination_folder': '02_value_list/02_vl_cleaned',
        'file_format': 'csv',
        'processing_steps': [
            (lowercase_string_columns, None)  # If the function applies to all columns, use None
        ],
        'dtypes': None,
        'load_to_bigquery': False  # Add this to specify whether to load to BigQuery
    },
    {
        'source_folder': '03_power_schedule/01_power_schedule_raw/',
        'destination_folder': '03_power_schedule/02_power_schedule_cleaned',
        'file_format': 'csv',
        'processing_steps': [
            (lowercase_string_columns, None)  # If the function applies to all columns, use None
        ],
        'dtypes': None,
        'load_to_bigquery': False  # Add this to specify whether to load to BigQuery
    },
    {
        'source_folder': '04_dictionary/01_emot/01_emot_raw/',
        'destination_folder': '04_dictionary/01_emot/02_emot_cleaned',
        'file_format': 'csv',
        'processing_steps': [
            (lowercase_string_columns, None),
            (convert_nan_to_none, None) # If the function applies to all columns, use None

        ],
        'dtypes': {'mnemonic_vehicle_id': str},
        'load_to_bigquery': False  # Add this to specify whether to load to BigQuery
    },
    {
        'source_folder': '04_dictionary/02_obcdcdc/01_obcdcdc_raw/',
        'destination_folder': '04_dictionary/02_obcdcdc/02_obcdcdc_cleaned',
        'file_format': 'csv',
        'processing_steps': [
            (lowercase_string_columns, None),
            (convert_nan_to_none, None) # If the function applies to all columns, use None

        ],
        'dtypes': None,
        'load_to_bigquery': False  # Add this to specify whether to load to BigQuery
    },
    {
        'source_folder': '04_dictionary/03_addressability/01_addressability_raw/',
        'destination_folder': '04_dictionary/03_addressability/02_addressability_cleaned',
        'file_format': 'csv',
        'processing_steps': [
            (lowercase_string_columns, None),
            (convert_nan_to_none, None) # If the function applies to all columns, use None

        ],
        'dtypes': None,
        'load_to_bigquery': True,  # Set to True for files that need to be loaded into BigQuery
        'bigquery_dataset': 'ADDRESSABILITY'  # Specify the BigQuery dataset for this file
    },
    {
        'source_folder': '06_prices/01_prices_raw/',
        'destination_folder': '06_prices/02_prices_cleaned',
        'file_format': 'csv',
        'processing_steps': [
            (lowercase_string_columns, None),
            (convert_nan_to_none, None) # If the function applies to all columns, use None

        ],
        'dtypes': {'row': int, 'year': int,
                   'system_voltage_v_lower_bound': float, 'system_voltage_v_upper_bound': float,
                   'electric_motor_power_kw_lower_bound': float, 'electric_motor_power_kw_upper_bound': float
                   },
        'load_to_bigquery': True,  # Set to True for files that need to be loaded into BigQuery
        'bigquery_dataset': 'PRICES'  # Specify the BigQuery dataset for this file
    },
]


# Function to load CSV from GCS to BigQuery
def load_csv_to_bigquery(bucket_name, folder_name, file_name, project_id, dataset_name):
    client = bigquery.Client()
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,
        autodetect=True,
        write_disposition='WRITE_APPEND'
    )
    file_uri = f"gs://{bucket_name}/{folder_name}/{file_name}"
    table_name = file_name.rsplit('.', 1)[0]
    table_id = f"{project_id}.{dataset_name}.{table_name}"
    load_job = client.load_table_from_uri(file_uri, table_id, job_config=job_config)
    load_job.result()
    print(f"Loaded {load_job.output_rows} rows into {table_id}.")

# Process files and load to BigQuery
for config in file_configs:
    latest_blob = get_latest_file(bucket, config['source_folder'])
    if latest_blob:
        gcs_path = f'gs://{bucket_name}/{latest_blob.name}'
        print(f"Processing file: {gcs_path}")

        # Determine how to read the file based on its format
        if config['file_format'] == 'excel':
            df = pd.read_excel(gcs_path, sheet_name='Data', dtype=config['dtypes'] if config['dtypes'] else None)
        elif config['file_format'] == 'csv':
            df = pd.read_csv(gcs_path, dtype=config['dtypes'] if config['dtypes'] else None)

        # Apply specified processing functions to their respective columns
        for func, columns in config['processing_steps']:
            if func == format_column_name:
                df.columns = [func(col) for col in df.columns]
            elif columns:
                df = func(df, *columns)
            else:
                df = func(df)

        # Generate file name for the processed file
        paris_tz = pytz.timezone('Europe/Paris')
        current_time = datetime.now(paris_tz).strftime("%Y%m%d_%H%M%S")
        original_file_name = os.path.splitext(os.path.basename(latest_blob.name))[0]
        processed_file_name = f'{original_file_name}_cleaned_{current_time}.csv'

        # Save the processed file to GCS
        save_to_bucket(df, bucket, config['destination_folder'], processed_file_name)

        # Load the processed file to BigQuery if specified
        if config.get('load_to_bigquery', False):
            load_csv_to_bigquery(
                bucket_name,
                config['destination_folder'],
                processed_file_name,
                project_id,
                config['bigquery_dataset']
            )
    else:
        print(f"No files found in the source folder: {config['source_folder']}")