<a href="https://colab.research.google.com/github/ttn9171/Portfolio/blob/main/Modeling/UC2%3A%20NBA/Feature%20Extractions%20and%20Engineer/GA_Sessions_and_UTM_Extractions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Connection

In [None]:
!apt-get update
!apt-get install -y -q curl gnupg

!curl -fsSL https://packages.cloud.google.com/apt/doc/apt-key.gpg | gpg --dearmor -o /usr/share/keyrings/cloud.google.gpg
!echo "deb [signed-by=/usr/share/keyrings/cloud.google.gpg] https://packages.cloud.google.com/apt gcsfuse-jammy main" | tee /etc/apt/sources.list.d/gcsfuse.list
!apt-get update

!apt-get install -y -q gcsfuse

from google.colab import auth
auth.authenticate_user()

import os
project_id = 'capstone-aldo'
os.environ['Aldo_Capstone'] = project_id

from google.cloud import storage

client = storage.Client(project=project_id)
bucket = 'mma-capstone'

os.makedirs('/content/gcs', exist_ok=True)
!gcsfuse {bucket} /content/gcs

0% [Working]            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,632 B]
            Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
0% [Connecting to archive.ubuntu.com (185.125.190.81)] [Connecting to security.ubuntu.com (185.125.10% [Connecting to archive.ubuntu.com (185.125.190.81)] [Connecting to security.ubuntu.com (185.125.1                                                                                                    Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Waiting for headers] [Connected to ppa.launchpadcontent.net (185.125.190.8                                                                                                    Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Get:5 https://r2u.stat.illinois.edu/ubuntu jammy InRelease [6,555 B]
Get:6 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128

### Extract Number of Web Visits and Interactions

In [None]:
import pandas as pd
import re
import pyarrow.parquet as pq
import gcsfs
import gc
import time
import subprocess
import os

from typing import List

def run_email_traffic_pipeline(year: int, country: str, bucket_name: str):
    base_path = f"gs://{bucket_name}/GA/ga_sample1/banner=ALDO_{country}/year={year}/final/year_{year}.parquet"
    gcs_output_file = f"gs://{bucket_name}/data_preprocessed/visit_counts_{country}_{year}.csv"
    local_temp_file = f"/tmp/temp_visit_counts_{year}.csv"

    columns_for_visits = ["fullvisitor_id", "page_path", "unique_session_id"]
    chunksize = 500000

    email_traffic_pattern = re.compile(
        r'(utm_medium=email|sc_src=email[\w\d_-]*|cm_mmc=VIPClubEmails)',
        re.IGNORECASE
    )

    def detect_email_traffic(url):
        return bool(email_traffic_pattern.search(str(url))) if pd.notnull(url) else False

    print(f"\n🚀 Starting {year} Data Processing...")
    fs = gcsfs.GCSFileSystem()

    print(f"\n🔹 Processing year: {year} in batches...")
    parquet_file = pq.ParquetFile(fs.open(base_path, "rb"))

    chunk_list = []
    for chunk_number, batch in enumerate(parquet_file.iter_batches(batch_size=chunksize, columns=columns_for_visits), start=1):
        start_time = time.time()
        chunk = batch.to_pandas()
        print(f"\n✅ Processing Chunk {chunk_number} - Rows: {chunk.shape[0]}")

        chunk["is_email_traffic"] = chunk["page_path"].apply(detect_email_traffic)
        chunk_list.append(chunk)

        print(f"✅ Chunk {chunk_number} Processed in {time.time() - start_time:.2f} seconds")
        gc.collect()

    if chunk_list:
        df_2023 = pd.concat(chunk_list, ignore_index=True).dropna(subset=["fullvisitor_id"])

        df_sessions = df_2023.groupby(["fullvisitor_id", "unique_session_id"], as_index=False).agg({
            "is_email_traffic": "max",
            "page_path": "count"
        }).rename(columns={"page_path": "session_interactions"})

        df_grouped = df_sessions.groupby("fullvisitor_id", as_index=False).agg({
            "unique_session_id": "nunique",
            "is_email_traffic": "sum",
            "session_interactions": "sum"
        }).rename(columns={
            "unique_session_id": "total_visits",
            "is_email_traffic": "email_visits",
            "session_interactions": "total_interactions"
        })

        df_grouped["avg_interactions_per_session"] = (
            df_grouped["total_interactions"] / df_grouped["total_visits"].replace(0, 1)
        )
        df_grouped["country"] = f"ALDO_{country}"

        df_grouped = df_grouped.astype({
            "total_visits": int,
            "email_visits": int,
            "total_interactions": int
        })

        df_grouped.to_csv(local_temp_file, index=False)

        print("\n📤 Uploading Final Processed Data to GCS...")
        upload_result = subprocess.run(["gsutil", "cp", local_temp_file, gcs_output_file], capture_output=True, text=True)

        if upload_result.returncode == 0:
            print("Final Data Successfully Uploaded to GCS!")
            os.remove(local_temp_file)
        else:
            print("Upload Failed! Debugging Output Below:")
            print(upload_result.stderr)

        print("\n🎉 Processing Completed Successfully! Data saved at:", gcs_output_file)

if __name__ == "__main__":
    run_email_traffic_pipeline(year=2023, country="CA", bucket_name="mma-capstone")
    run_email_traffic_pipeline(year=2023, country="US", bucket_name="mma-capstone")


### Extract Events per Session (view product, add to cart, etc.)



In [None]:
import pandas as pd
import pyarrow.parquet as pq
import gcsfs
import gc
import time
import subprocess
import os

# GCS File Configuration
YEAR = 2023
COUNTRY = "US" #Replace region as needed (US or CA)
BUCKET_NAME = "mma-capstone"
BASE_PATH = f"gs://{BUCKET_NAME}/GA/ga_sample1/banner=ALDO_{COUNTRY}/year={YEAR}/final/year_{YEAR}.parquet"
GCS_OUTPUT_FILE = f"gs://{BUCKET_NAME}/data_preprocessed/session_event_{COUNTRY}_{YEAR}.csv"
LOCAL_TEMP_FILE = "/tmp/session_event_analysis.csv"

# Define columns to load
columns_to_load = ["fullvisitor_id", "unique_session_id", "event_action", "date"]

# Categorical keywords for event classification
checkout_keywords = {
    "checkout with Apple Pay", "checkout checkbox check", "fast-checkout",
    "checkout step displayed", "checkout", "Expand Section Checkout"
}
view_product_keywords = {"view product detail"}
add_to_cart_keywords = {"add to cart"}
purchase_keywords = {"purchase"}

# ✅ Function to classify event actions
def classify_event(event):
    event = str(event).lower()  # Normalize text
    return {
        "view_product_detail": any(keyword in event for keyword in view_product_keywords),
        "add_to_cart": any(keyword in event for keyword in add_to_cart_keywords),
        "checkout": any(keyword in event for keyword in checkout_keywords),
        "purchase": any(keyword in event for keyword in purchase_keywords)
    }

fs = gcsfs.GCSFileSystem()  # Initialize GCS filesystem
parquet_file = pq.ParquetFile(fs.open(BASE_PATH, "rb"))

# Load data in chunks
chunksize = 500000
chunk_number = 0
processed_chunks = []  # Store processed chunks

for batch in parquet_file.iter_batches(batch_size=chunksize, columns=columns_to_load):
    start_time = time.time()
    chunk_number += 1

    df_chunk = batch.to_pandas()
    print(f"\n Processing Chunk {chunk_number} - Rows: {df_chunk.shape[0]}")

    # ✅ Ensure `fullvisitor_id` and `unique_session_id` are treated as strings
    df_chunk["fullvisitor_id"] = df_chunk["fullvisitor_id"].astype(str)
    df_chunk["unique_session_id"] = df_chunk["unique_session_id"].astype(str)

    # ✅ Apply event classification
    classified_data = df_chunk["event_action"].apply(classify_event).apply(pd.Series)
    df_chunk = df_chunk.join(classified_data)

    # ✅ Keep only necessary columns
    df_chunk = df_chunk[["date","fullvisitor_id", "unique_session_id", "view_product_detail", "add_to_cart", "checkout", "purchase"]]

    # ✅ Store processed chunk
    processed_chunks.append(df_chunk)

    # ✅ Free up memory
    del df_chunk, classified_data, batch
    gc.collect()

    print(f"Chunk {chunk_number} Processed in {time.time() - start_time:.2f} seconds")

# Concatenate all processed chunks into a single DataFrame
df_final = pd.concat(processed_chunks, ignore_index=True)
df_final = df_final.dropna(subset=["unique_session_id"])


del processed_chunks
gc.collect()


# ✅ Group AFTER processing all chunks (Avoids Double Counting!)
df_grouped = df_final.groupby("unique_session_id", as_index=False).agg({
    "date": "last",
    "fullvisitor_id": "first",
    "view_product_detail": "sum",
    "add_to_cart": "sum",
    "checkout": "sum",
    "purchase": "sum"
})

df_grouped['country']= f"ALDO_{COUNTRY}"

# ✅ Convert boolean columns to integer (0 or 1)
boolean_columns = ["view_product_detail", "add_to_cart", "checkout", "purchase"]
df_grouped[boolean_columns] = df_grouped[boolean_columns].astype(int)

# ✅ Save processed data locally
df_grouped.to_csv(LOCAL_TEMP_FILE, index=False)

# Upload Final Data to GCS
print("\n **Uploading Processed Data to GCS...**")
upload_result = subprocess.run(["gsutil", "cp", LOCAL_TEMP_FILE, GCS_OUTPUT_FILE], capture_output=True, text=True)

if upload_result.returncode == 0:
    print("**Data Successfully Uploaded to GCS!**")
    os.remove(LOCAL_TEMP_FILE)  # Remove temp file after upload
else:
    print("**Upload Failed! Debugging Output Below:**")
    print(upload_result.stderr)

print("\n Data saved at:", GCS_OUTPUT_FILE)


### Extract Page Path Information

In [None]:
import pandas as pd
import pyarrow.parquet as pq
import time
import re
import gcsfs
import subprocess
import html
import os
import numpy as np
import gc


# ✅ Define Paths
BUCKET_NAME = "mma-capstone"
COUNTRY = "CA"
YEAR = 2023
INPUT_FILE = f"gs://{BUCKET_NAME}/GA/ga_sample1/banner=ALDO_{COUNTRY}/year={YEAR}/final/year_{YEAR}.parquet"
TEMP_FILE = "/tmp/temp_page_paths.csv"
FINAL_TEMP_FILE = "/tmp/temp_page_paths_final.csv"
GCS_OUTPUT_FILE = f"gs://{BUCKET_NAME}/data_preprocessed/page_paths_{COUNTRY}_{YEAR}.parquet"

# ✅ Compile regex patterns
SOURCE_PATTERN = re.compile(r'(?:[\?&])([^&]*(?:_source|_src|source))=([^&]+)', re.IGNORECASE)
EMAIL_TRAFFIC_PATTERN = re.compile(r'((?:[\?&]|amp;)utm_medium=email(?:[\?&]|$)|(?:[\?&]|amp;)sc_src=email\S*)', re.IGNORECASE)
UTM_CATEGORY_PATTERN = re.compile(r'(?:[\?&]|amp;)utm_category=([^&]+)', re.IGNORECASE)
PRODUCT_PATTERN = re.compile(r'/p/(\d+)', re.IGNORECASE)
CATEGORY_PATTERN = re.compile(r'/(?:ca|us)/(?:en|fr)(?:[_-][A-Z]{2})?/([^?]+?)(?=/p/|\?|$)', re.IGNORECASE)
SEGMENT_PATTERN = re.compile(r'[\?&]utm_segment\s*=\s*([^&]+)', re.IGNORECASE)
GENDER_PATTERN = re.compile(r'[\?&]utm_gender\s*=\s*([^&]+)', re.IGNORECASE)
GAD_SOURCE_PATTERN = re.compile(r'[\?&]gad_source=', re.IGNORECASE)

# ✅ Function to extract relevant information from `page_path`
def extract_info(url):
    if not isinstance(url, str):  # Handle missing values
        return [None] * 9

    try:
        # ✅ Decode HTML entities
        url = html.unescape(url)

        # ✅ Extract source (utm_source, gad_source, sc_src, etc.)
        source_match = SOURCE_PATTERN.search(url)
        traffic_source = source_match.group(2) if source_match else None

        # ✅ Detect Google Ads explicitly
        if GAD_SOURCE_PATTERN.search(url):
            traffic_source = "Google Ad"

        # ✅ Detect email traffic
        is_email_traffic = bool(EMAIL_TRAFFIC_PATTERN.search(url))

        # ✅ Extract utm_category
        utm_category_match = UTM_CATEGORY_PATTERN.search(url)
        utm_category = str(utm_category_match.group(1)) if utm_category_match else None

        # ✅ Extract product ID
        product_match = PRODUCT_PATTERN.search(url)
        product_id = str(product_match.group(1)) if product_match else None

        # ✅ Extract category path and split into **up to 3 category levels**
        category_match = CATEGORY_PATTERN.search(url)
        category_path = category_match.group(1) if category_match else None

        if category_path:
            category_parts = category_path.split("/")[:3]  # Extract max 3 category levels
            category_lv1, category_lv2, category_lv3 = (category_parts + [None] * 3)[:3]
        else:
            category_lv1, category_lv2, category_lv3 = None, None, None

        # ✅ Extract utm_segment
        utm_segment_match = SEGMENT_PATTERN.search(url)
        utm_segment = str(utm_segment_match.group(1)) if utm_segment_match else None

        # ✅ Extract utm_gender
        gender_match = GENDER_PATTERN.search(url)
        utm_gender = str(gender_match.group(1)) if gender_match else None

        return [
            traffic_source, is_email_traffic, utm_category,
            product_id, category_lv1, category_lv2, category_lv3, utm_segment, utm_gender
        ]
    except Exception as e:
        print(f"❌ Error processing URL: {url} → {e}")
        return [None] * 9

# ✅ Read the Parquet file in chunks
chunksize = 500000
columns_to_load = ["fullvisitor_id", "page_path", "unique_session_id", "date"]  # Load only relevant columns

fs = gcsfs.GCSFileSystem()
parquet_file = pq.ParquetFile(fs.open(INPUT_FILE, "rb"))

chunk_number = 0
df_list = []

for batch in parquet_file.iter_batches(batch_size=chunksize, columns=columns_to_load):
    chunk_number += 1
    start_time = time.time()

    chunk = batch.to_pandas()
    chunk["page_path"] = chunk["page_path"].astype(str)  # Convert page_path to string
    print(f"\n✅ Processing Chunk {chunk_number} - Rows: {chunk.shape[0]}")

    # ✅ Apply the extraction function
    extracted_values = chunk["page_path"].map(extract_info).apply(pd.Series)

    # ✅ Rename extracted columns
    extracted_values.columns = [
        "traffic_source", "is_email_traffic", "utm_category",
        "product_id", "category_lv1", "category_lv2", "category_lv3", "utm_segment", "utm_gender"
    ]

    # ✅ Merge extracted values with original chunk
    chunk = pd.concat([chunk[["fullvisitor_id", "page_path", "unique_session_id", "date"]], extracted_values], axis=1)
    chunk = chunk.drop_duplicates()

    df_list.append(chunk)
    #write_header = not os.path.exists(TEMP_FILE) or chunk_number == 1  # ✅ Ensure header in first chunk
    #chunk.to_csv(TEMP_FILE, mode="a", index=False, header=write_header)

    # ✅ Force garbage collection to free memory
    del chunk
    gc.collect()


    # ✅ Print processing time
    end_time = time.time()
    print(f"✅ Chunk {chunk_number} Processed in {end_time - start_time:.2f} seconds")

# ✅ Combine & Save Final Data

df_final = pd.concat(df_list, ignore_index=True)
df_final = df_final.drop_duplicates()


df_final.to_parquet(FINAL_TEMP_FILE, index=False)


# ✅ Upload to GCS
print("\n**Uploading Final Deduplicated File to GCS...**")
upload_result = subprocess.run(["gsutil", "cp", FINAL_TEMP_FILE, GCS_OUTPUT_FILE], capture_output=True, text=True)

if upload_result.returncode == 0:
    print("✅ **Final Data Successfully Uploaded to GCS!**")
    os.remove(FINAL_TEMP_FILE)
else:
    print("❌ **Upload Failed! Debugging Output Below:**")
    print(upload_result.stderr)

print("\n **Processing Completed Successfully!** Data saved at:", GCS_OUTPUT_FILE)
