<a href="https://colab.research.google.com/github/tomomitanaka00/Blog-SQL/blob/main/Revenue_Prediction_Cloud.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
!pip install google-cloud-bigquery
!pip install google-auth




In [3]:
from google.colab import files
uploaded = files.upload()

Saving predictive-behavior-analytics-b509bad93e58.json to predictive-behavior-analytics-b509bad93e58.json


In [4]:
!pip install "dask[dataframe]"


Collecting dask-expr<1.2,>=1.1 (from dask[dataframe])
  Downloading dask_expr-1.1.11-py3-none-any.whl.metadata (2.5 kB)
INFO: pip is looking at multiple versions of dask-expr to determine which version is compatible with other requirements. This could take a while.
  Downloading dask_expr-1.1.10-py3-none-any.whl.metadata (2.5 kB)
  Downloading dask_expr-1.1.9-py3-none-any.whl.metadata (2.5 kB)
Downloading dask_expr-1.1.9-py3-none-any.whl (241 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m241.9/241.9 kB[0m [31m16.4 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: dask-expr
Successfully installed dask-expr-1.1.9


In [5]:
import pandas as pd
import dask.dataframe as dd
import numpy as np
from google.cloud import bigquery
from google.oauth2 import service_account
from sklearn.model_selection import train_test_split, RandomizedSearchCV
from sklearn.ensemble import RandomForestRegressor, GradientBoostingRegressor, VotingRegressor
from sklearn.linear_model import ElasticNet
from sklearn.metrics import mean_squared_error, r2_score, mean_absolute_error
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.impute import SimpleImputer
from sklearn.feature_selection import RFE, RFECV
import lightgbm as lgb
import matplotlib.pyplot as plt
import logging
import os
from datetime import datetime, timedelta
import gc

# Set up logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)

# Set up credentials and BigQuery client
credentials = service_account.Credentials.from_service_account_file('predictive-behavior-analytics-b509bad93e58.json')
project_id = "predictive-behavior-analytics"
client = bigquery.Client(credentials=credentials, project=project_id)

# Function to fetch data in chunks, process it, and save it to disk
def fetch_and_process_data(start_date, end_date, chunk_size=7, output_dir='processed_data'):
    start = datetime.strptime(start_date, '%Y%m%d')
    end = datetime.strptime(end_date, '%Y%m%d')

    os.makedirs(output_dir, exist_ok=True)

    current_start = start
    chunk_number = 1

    while current_start <= end:
        current_end = min(current_start + timedelta(days=chunk_size - 1), end)

        query = f"""
        SELECT
          CONCAT(fullVisitorId, CAST(visitId AS STRING)) AS session_id,
          date,
          totals.timeOnSite,
          totals.pageviews,
          totals.transactions,
          totals.transactionRevenue,
          trafficSource.source,
          trafficSource.medium,
          device.deviceCategory,
          geoNetwork.country,
          hits
        FROM
          `bigquery-public-data.google_analytics_sample.ga_sessions_*`
        WHERE
          _TABLE_SUFFIX BETWEEN '{current_start.strftime('%Y%m%d')}' AND '{current_end.strftime('%Y%m%d')}'
        """

        logger.info(f"Fetching data from {current_start.strftime('%Y-%m-%d')} to {current_end.strftime('%Y-%m-%d')}")

        df_chunk = client.query(query).to_dataframe()
        logger.info(f"Fetched chunk with shape: {df_chunk.shape}")

        # Reduce the dataset to 40%
        df_chunk = df_chunk.sample(frac=0.1, random_state=42)

        # Save processed chunk to disk
        output_file = os.path.join(output_dir, f'processed_chunk_{chunk_number}.parquet')
        df_chunk.to_parquet(output_file)
        logger.info(f"Saved processed chunk to {output_file}")

        current_start = current_end + timedelta(days=1)
        chunk_number += 1

# Function to load and combine processed chunks using Dask
def load_and_combine_chunks(directory='processed_data'):
    # Read all the Parquet files at once using Dask
    ddf = dd.read_parquet(os.path.join(directory, '*.parquet'))
    return ddf

def safe_json_loads(x):
    try:
        return json.loads(x) if isinstance(x, str) else x
    except json.JSONDecodeError:
        return {}

def flatten_nested_columns(df):
    nested_columns = ['totals', 'trafficSource', 'device', 'geoNetwork']
    for col in nested_columns:
        if col in df.columns:
            try:
                flattened = pd.json_normalize(df[col].apply(safe_json_loads))
                flattened.columns = [f'{col}_{subcol}' for subcol in flattened.columns]
                df = pd.concat([df.drop(col, axis=1), flattened], axis=1)
            except Exception as e:
                logger.warning(f"Error flattening column {col}: {str(e)}")
    return df

def optimize_dtypes(df):
    for col in df.columns:
        if df[col].dtype == 'object':
            try:
                df[col] = df[col].astype('category')
            except TypeError:
                pass
        elif df[col].dtype == 'float64':
            df[col] = df[col].astype('float32')
        elif df[col].dtype == 'int64':
            df[col] = df[col].astype('int32')
    return df




In [6]:
def clean_and_engineer_data(df):
    logger.info("Starting data cleaning and feature engineering...")

    # Flatten any nested columns
    df_cleaned = flatten_nested_columns(df)
    logger.info(f"Flattened DataFrame shape: {df_cleaned.shape}")

    # Identify columns with complex data types (arrays, lists, etc.)
    for col in df_cleaned.columns:
        # Compute whether the column contains any complex data types
        has_complex_types = df_cleaned[col].apply(lambda x: isinstance(x, (list, np.ndarray))).compute().any()
        if has_complex_types:
            logger.warning(f"Column '{col}' contains complex data types. Dropping or handling this column is necessary.")
            # Handle the column as needed, either by flattening or removing
            df_cleaned = df_cleaned.drop(columns=[col])

    # Optimize data types to reduce memory usage
    df_cleaned = optimize_dtypes(df_cleaned)

    df_cleaned['date'] = dd.to_datetime(df_cleaned['date'], format='%Y%m%d')
    numeric_columns = df_cleaned.select_dtypes(include=[np.number]).columns
    categorical_columns = df_cleaned.select_dtypes(exclude=[np.number, 'datetime64']).columns

    all_nan_columns = df_cleaned.columns[df_cleaned.isna().all()].tolist()
    if all_nan_columns:
        df_cleaned = df_cleaned.drop(columns=all_nan_columns)
        numeric_columns = [col for col in numeric_columns if col not in all_nan_columns]
        categorical_columns = [col for col in categorical_columns if col not in all_nan_columns]

    numeric_imputer = SimpleImputer(strategy='median')
    categorical_imputer = SimpleImputer(strategy='most_frequent')
    df_cleaned[numeric_columns] = df_cleaned[numeric_columns].apply(lambda col: numeric_imputer.fit_transform(col.compute()))
    df_cleaned[categorical_columns] = df_cleaned[categorical_columns].apply(lambda col: categorical_imputer.fit_transform(col.compute()))

    # Feature engineering
    df_cleaned['day_of_week'] = df_cleaned['date'].dt.dayofweek
    df_cleaned['is_weekend'] = df_cleaned['day_of_week'].isin([5, 6]).astype(int)
    df_cleaned['month'] = df_cleaned['date'].dt.month
    df_cleaned['quarter'] = df_cleaned['date'].dt.quarter

    logger.info(f"Final cleaned and engineered DataFrame shape: {df_cleaned.shape}")
    return df_cleaned


In [7]:
def prepare_data_for_modeling(df):
    logger.info("Preparing data for revenue prediction...")

    # Define potential feature columns
    numeric_columns = df.select_dtypes(include=[np.number]).columns.tolist()
    categorical_columns = df.select_dtypes(include=['object', 'category']).columns.tolist()

    # Exclude certain columns
    exclude_columns = ['session_id', 'date', 'totals_transactionRevenue', 'log_revenue', 'totals_totalTransactionRevenue']
    feature_columns = [col for col in numeric_columns + categorical_columns if col not in exclude_columns]

    logger.info(f"Using the following features: {feature_columns}")

    if not feature_columns:
        raise ValueError("No valid features found in the DataFrame")

    X = df[feature_columns]

    # Check if the 'log_revenue' column exists
    if 'log_revenue' not in df.columns:
        logger.warning("The column 'log_revenue' is missing from the DataFrame. Attempting to create it.")
        if 'totals_transactionRevenue' in df.columns:
            df['totals_transactionRevenue'] = df['totals_transactionRevenue'].replace(0, 1e-6)
            df['log_revenue'] = np.log1p(df['totals_transactionRevenue'])
        else:
            logger.error("Both 'log_revenue' and 'totals_transactionRevenue' columns are missing. Cannot proceed with modeling.")
            raise KeyError("Required revenue columns not found.")

    y = df['log_revenue']

    # Handle missing values
    numeric_features = X.select_dtypes(include=[np.number]).columns
    categorical_features = X.select_dtypes(include=['object', 'category']).columns

    if len(numeric_features) > 0:
        numeric_imputer = SimpleImputer(strategy='median')
        X[numeric_features] = numeric_imputer.fit_transform(X[numeric_features])

    if len(categorical_features) > 0:
        categorical_imputer = SimpleImputer(strategy='constant', fill_value='missing')
        X[categorical_features] = categorical_imputer.fit_transform(X[categorical_features])

    # Encode categorical variables
    for col in categorical_features:
        X[col] = pd.Categorical(X[col]).codes

    # Perform feature selection using RFE
    n_features_to_select = min(30, X.shape[1])
    rfe_selector = RFECV(estimator=RandomForestRegressor(n_estimators=30, random_state=42), step=1, cv=3, scoring='neg_mean_squared_error')
    X_selected = rfe_selector.fit_transform(X, y)

    # Update feature_columns to reflect only the selected features
    selected_columns = pd.Series(feature_columns)[rfe_selector.get_support()].tolist()

    logger.info(f"Final feature matrix shape: {X_selected.shape}")

    return X_selected, y, selected_columns


In [8]:
def train_and_evaluate_models(X, y):
    logger.info("Training and evaluating multiple models...")

    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=42)

    # Define models
    models = {
        'RandomForest': RandomForestRegressor(random_state=42),
        'GradientBoosting': GradientBoostingRegressor(random_state=42),
        'ElasticNet': ElasticNet(random_state=42),
        'LightGBM': lgb.LGBMRegressor(random_state=42)
    }

    # Train and evaluate each model
    results = {}
    for name, model in models.items():
        logger.info(f"Training {name}...")

        # Hyperparameter tuning
        param_dist = {}
        if name == 'RandomForest':
            param_dist = {
                'n_estimators': [30, 40, 50],
                'max_depth': [3, 5, 7, None],
                'min_samples_split': [2, 5, 10],
                'min_samples_leaf': [1, 2, 4]
            }
        elif name == 'GradientBoosting':
            param_dist = {
                'n_estimators': [30, 40, 50],
                'max_depth': [3, 5, 7],
                'learning_rate': [0.01, 0.05, 0.1],
                'min_samples_split': [2, 5, 10],
                'min_samples_leaf': [1, 2, 4]
            }
            model.set_params(n_iter_no_change=5, validation_fraction=0.1)  # Early stopping
        elif name == 'ElasticNet':
            param_dist = {
                'alpha': [0.1, 0.5, 1.0],
                'l1_ratio': [0.1, 0.5, 0.9]
            }
        elif name == 'LightGBM':
            param_dist = {
                'num_leaves': [31, 50, 100],
                'max_depth': [3, 5, 7, -1],
                'learning_rate': [0.01, 0.05, 0.1],
                'n_estimators': [30, 40, 50],
                'min_child_samples': [20, 50, 100]
            }

        random_search = RandomizedSearchCV(model, param_distributions=param_dist,
                                           n_iter=10, cv=5, random_state=42, n_jobs=-1, verbose=2)
        random_search.fit(X_train, y_train)

        # Use best model
        best_model = random_search.best_estimator_
        y_pred = best_model.predict(X_test)

        # Calculate metrics
        mse = mean_squared_error(y_test, y_pred)
        rmse = np.sqrt(mse)
        mae = mean_absolute_error(y_test, y_pred)
        r2 = r2_score(y_test, y_pred)

        results[name] = {
            'model': best_model,
            'mse': mse,
            'rmse': rmse,
            'mae': mae,
            'r2': r2
        }

        logger.info(f"{name} - RMSE: {rmse:.3f}, R²: {r2:.3f}")

    # Create an ensemble model
    ensemble_model = VotingRegressor(estimators=[
        ('rf', models['RandomForest']),
        ('gb', models['GradientBoosting']),
        ('en', models['ElasticNet']),
        ('lgb', models['LightGBM'])
    ])
    ensemble_model.fit(X_train, y_train)
    ensemble_pred = ensemble_model.predict(X_test)

    ensemble_mse = mean_squared_error(y_test, ensemble_pred)
    ensemble_rmse = np.sqrt(ensemble_mse)
    ensemble_r2 = r2_score(y_test, ensemble_pred)

    results['Ensemble'] = {
        'model': ensemble_model,
        'mse': ensemble_mse,
        'rmse': ensemble_rmse,
        'r2': ensemble_r2
    }

    logger.info(f"Ensemble - RMSE: {ensemble_rmse:.3f}, R²: {ensemble_r2:.3f}")

    return results



In [9]:
# Main execution
if __name__ == "__main__":
    start_date = '20160801'
    end_date = '20170731'
    chunk_size = 1  # Fetch 1 day of data at a time to reduce memory usage

    logger.info("Starting data fetch and processing...")
    fetch_and_process_data(start_date, end_date, chunk_size)
    logger.info("Finished fetching and processing all data.")

    logger.info("Loading and processing data using Dask...")
    ddf = load_and_combine_chunks()  # Load all chunks at once with Dask

    # Perform cleaning and feature engineering on the entire dataset
    cleaned_and_engineered_df = clean_and_engineer_data(ddf)

    # Persist the processed data to disk (optional)
    cleaned_and_engineered_df.to_parquet('cleaned_engineered_data.parquet', write_index=False)

    # Convert Dask DataFrame to Pandas DataFrame for modeling
    final_df = cleaned_and_engineered_df.compute()

    # Prepare data for modeling
    X, y, selected_columns = prepare_data_for_modeling(final_df)

    # Train and evaluate models
    results = train_and_evaluate_models(X, y)

    logger.info("Revenue prediction analysis completed successfully.")


You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('session_id', 'bool'))

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('date', 'bool'))

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, plea

NotImplementedError: Dask DataFrame.apply only supports axis=1
  Try: df.apply(func, axis=1)