# Google Analytics Extractions
- Extracts page info/overall info for CAO website.
- To be run daily, with the most recent outputs replacing older ones. 
- Output will follow a "(Type)_ Data _(Period)_YYYY-MM-DD.gzip" format. 
    - "Page_Data_Daily_2025-11-11.gzip" for daily page extractions for example.

In [None]:
# Standard modules 
import os 
import pandas as pd 
import sys 
import requests
from datetime import datetime, date, timedelta
from dotenv import load_dotenv
import numpy as np 
from functools import reduce


# Analytics modules
from google.analytics.data_v1beta import BetaAnalyticsDataClient, Filter, FilterExpression, DateRange, Metric, Dimension, RunReportRequest
from google.auth.transport.requests import Request 
import math

# Progress bar
from tqdm import tqdm
import time 
import calendar

## Fiscal Year/Date Functions

In [None]:
def compare_to_end(current_date, end_date) -> bool: 

    """ Compares the current date to the last date of the fiscal year/quarter, used to determine the right end date to use for reports. """

    if (isinstance(current_date, date)&isinstance(end_date, date)): 
        val = current_date < end_date
    else: 
        raise TypeError("Aruguments must be of type date.")
    return val

def determine_fiscal_year(current_date) -> tuple: 

    """ Fetching Fiscal Year range based on the current date. """
    
    if isinstance(current_date, date): 
        if current_date.month < 4: # months 1, 2, 3 are considered part of the last fiscal year
            fiscal_start = datetime(current_date.year -1, 4, 1).date().strftime("%Y-%m-%d")
            fiscal_end = datetime(current_date.year, 3, 31).date()
        else: # everything else will use the current year 
            fiscal_start = datetime(current_date.year, 4, 1).date().strftime("%Y-%m-%d")
            fiscal_end = datetime(current_date.year+1, 3, 31).date()

        if compare_to_end(current_date, fiscal_end): 
            fiscal_end = current_date.strftime("%Y-%m-%d")
        else: 
            fiscal_end = fiscal_end.strftime("%Y-%m-%d")

    else: 
        raise TypeError("Argument must be of type date.")
    
    return (fiscal_start, fiscal_end)

def determine_fiscal_quarter(current_date) -> tuple: 

    """Fetching Quarter range based on the current date"""

    if isinstance(current_date, date): 
        cao_quarters = {"Q1": [4,5,6], 
                    "Q2": [7,8,9], 
                    "Q3": [10,11,12],
                    "Q4": [1,2,3]}
        if (current_date.month in [d for d in cao_quarters.get("Q1")]): 
            quarter_start = datetime(current_date.year, 4,1).date().strftime("%Y-%m-%d")
            quarter_end = datetime(current_date.year, 6, 30).date()
        elif (current_date.month in [d for d in cao_quarters.get("Q2")]):
            quarter_start = datetime(current_date.year, 7,1).date().strftime("%Y-%m-%d")
            quarter_end = datetime(current_date.year, 9, 30).date()
        elif (current_date.month in [d for d in cao_quarters.get("Q3")]):
            quarter_start = datetime(current_date.year, 10,1).date().strftime("%Y-%m-%d")
            quarter_end = datetime(current_date.year, 12, 31).date()
        else: 
            quarter_start = datetime(current_date.year, 1,1).date().strftime("%Y-%m-%d")
            quarter_end = datetime(current_date.year, 3, 31).date()
    else: 
        raise TypeError("Arguments must be of type date.")
    
    
    if compare_to_end(current_date, quarter_end): 
        quarter_end = current_date.strftime("%Y-%m-%d")
    else: 
        quarter_end = quarter_end.strftime("%Y-%m-%d")

    return (quarter_start, quarter_end)

# Pages individually
def fetch_metrics_per_page(property_id, start_date, run_date):
    with requests.Session() as session:    
        
        
        client = BetaAnalyticsDataClient()
        date_range = DateRange(start_date=start_date, end_date=run_date)
        limit = 250000
        offset = 0 #initialize counter for rows 

        metrics = [Metric(name='screenPageViews'),
                   Metric(name="newUsers"), 
                   Metric(name="totalUsers"),
                   Metric(name="screenPageViewsPerUser"), 
                   Metric(name="screenPageViewsPerSession"), 
                   Metric(name="userEngagementDuration")]
        dimensions = [Dimension(name="unifiedPagePathScreen")]


        # submit request for report based on metrics and dimensions, specifying limit and offset for pagination
        request = RunReportRequest(
        property=f"properties/{property_id}",
        date_ranges=[date_range],
        metrics=metrics,
        dimensions=dimensions,
        limit = limit, 
        offset = 0)
        # Run the report
        response = client.run_report(request)


        data = []

        for row in response.rows:
            row_data = {}
            for i, dimension in enumerate(row.dimension_values):
                row_data[dimensions[i].name] = dimension.value
            for i, metric in enumerate(row.metric_values):
                row_data[metrics[i].name] = metric.value
            data.append(row_data)

        total_rows = response.row_count 

        if total_rows > limit: 
            addon = []
            reps = ((total_rows - limit)/limit)
            iterations = math.ceil(reps)

            for i in tqdm(range(iterations), desc="Downloading..."): # strip the tqdm function if you'd like --> for i in range(iterations):
                offset = offset + limit

                request = RunReportRequest(
                property=f"properties/{property_id}",
                date_ranges=[date_range],
                metrics=metrics,
                dimensions=dimensions,
                limit = limit, 
                offset = offset)

                response = client.run_report(request)

                for row in response.rows:
                    row_data = {}
                    for i, dimension in enumerate(row.dimension_values):
                        row_data[dimensions[i].name] = dimension.value
                    for i, metric in enumerate(row.metric_values):
                        row_data[metrics[i].name] = metric.value
                    addon.append(row_data)

            results_df = pd.concat([pd.DataFrame(data), pd.DataFrame(addon)])
        else: 
            results_df = pd.DataFrame(data)
        session.close()
        results_df["date"] = start_date
        return results_df
            

# fetch aggregated data
def fetch_metrics_aggregate(property_id, start_date, run_date):
    with requests.Session() as session:    
        
        client = BetaAnalyticsDataClient()
        date_range = DateRange(start_date=start_date, end_date=run_date)
        limit = 250000
        offset = 0

        metrics = [
            Metric(name='screenPageViews'),
            Metric(name='newUsers'),
            Metric(name="totalUsers"),
            Metric(name='screenPageViewsPerUser'),
            Metric(name='screenPageViewsPerSession'),
            Metric(name='userEngagementDuration')
        ]

        # Initial request
        request = RunReportRequest(
            property=f"properties/{property_id}",
            date_ranges=[date_range],
            metrics=metrics,
            limit=limit,
            offset=offset
        )

        response = client.run_report(request)

        # Convert rows to list of dicts
        data = []
        for row in response.rows:
            d = {m.name: float(v.value) for m, v in zip(response.metric_headers, row.metric_values)}
            d["date"] = start_date
            data.append(d)

        total_rows = response.row_count

        # Handle pagination if needed
        if total_rows > limit:
            addon = []
            iterations = math.ceil((total_rows - limit) / limit)

            for i in tqdm(range(iterations), desc="Downloading..."):
                offset += limit
                request = RunReportRequest(
                    property=f"properties/{property_id}",
                    date_ranges=[date_range],
                    metrics=metrics,
                    limit=limit,
                    offset=offset
                )
                response = client.run_report(request)

                for row in response.rows:
                    d = {m.name: float(v.value) for m, v in zip(response.metric_headers, row.metric_values)}
                    d["date"] = start_date
                    addon.append(d)

            results_df = pd.concat([pd.DataFrame(data), pd.DataFrame(addon)], ignore_index=True)
        else:
            results_df = pd.DataFrame(data)

        session.close()
        return results_df

# Variables

In [None]:
current_date = datetime.now().date()

previous_day = (current_date - timedelta(1))

fiscal_quarter = determine_fiscal_quarter(current_date)

fiscal_year = determine_fiscal_year(current_date)
property_id = "PROPERTY_ID"

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = "APPLICATION_CREDENTIALS"

# Daily Data

In [None]:
# individual pages 
daily_all_pages = fetch_metrics_per_page(property_id, previous_day.strftime("%Y-%m-%d"), previous_day.strftime("%Y-%m-%d"))
daily_all_pages.to_parquet(f"ga_reports/page_data/daily/Page_Data_Daily_{previous_day}.gzip", index=False) # export to gzip

# aggregated pages
daily_overall = fetch_metrics_aggregate(property_id, previous_day.strftime("%Y-%m-%d"), previous_day.strftime("%Y-%m-%d"))
daily_overall.to_parquet(f"ga_reports/aggregate_data/daily/Overall_Data_Daily_{previous_day}.gzip", index=False)

# Monthly Data


In [None]:
# date prep
month_last_day = calendar.monthrange(current_date.year, current_date.month)[1]

month_start = f"{current_date.year}-{current_date.month}-01"

month_end = str()

if compare_to_end(current_date, datetime(current_date.year,current_date.month,month_last_day).date()): 
    month_end = current_date.strftime("%Y-%m-%d")
else: 
    month_end = f"{current_date.year}-{current_date.month}-{month_last_day}"

# individual pages 
monthly_all_pages = fetch_metrics_per_page(property_id, month_start, month_end)

monthly_all_pages.to_parquet(f"ga_reports/page_data/monthly/Page_Data_Monthly_{month_start}.gzip", index=False) # export to gzip

# Common issues 
commonissues = pd.read_excel("ga_reports/reference_files/MostCommonIssues.xlsx")

a3_links = pd.read_excel("MostCommonIssues.xlsx", sheet_name="3A").dropna()
b3_links = pd.read_excel("MostCommonIssues.xlsx", sheet_name="3B").dropna()

a3_links["type"] = "3A"
b3_links["type"] = "3B"

a3_links.columns = ["unifiedPagePathScreen", "type"]
b3_links.columns = ["unifiedPagePathScreen", "type"]

most_common_issues = monthly_all_pages[monthly_all_pages.unifiedPagePathScreen.isin(commonissues.link)].sort_values("unifiedPagePathScreen")

data_frames = [most_common_issues, a3_links, b3_links]

most_common_issues = reduce(lambda left, right: pd.merge(left, right, on=["unifiedPagePathScreen"], how="left"), data_frames) # fusing all reports based on link 

most_common_issues["type"] = most_common_issues["type_x"].replace(pd.NA, "") + most_common_issues["type_y"].replace(pd.NA, "")

most_common_issues = most_common_issues.drop(["type_x", "type_y"], axis=1) # getting rid of unnecessary columns 


other_issues = pd.read_excel("MostCommonIssues.xlsx", sheet_name="OtherIssues") # read in issues that are not considered most common, repeat the process

a3_links = pd.read_excel("MostCommonIssues.xlsx", sheet_name="3A").dropna()
b3_links = pd.read_excel("MostCommonIssues.xlsx", sheet_name="3B").dropna()

a3_links["type"] = "3A"
b3_links["type"] = "3B"

a3_links.columns = ["unifiedPagePathScreen", "type"]
b3_links.columns = ["unifiedPagePathScreen", "type"]

other_common_issues = monthly_all_pages[monthly_all_pages.unifiedPagePathScreen.isin(other_issues.link)].sort_values("unifiedPagePathScreen")

data_frames = [other_common_issues, a3_links, b3_links]

other_common_issues = reduce(lambda left, right: pd.merge(left, right, on=["unifiedPagePathScreen"], how="left"), data_frames) # fusing all reports based on link 

other_common_issues["type"] = other_common_issues["type_x"].replace(pd.NA, "") + other_common_issues["type_y"].replace(pd.NA, "")

other_common_issues = other_common_issues.drop(["type_x", "type_y"], axis=1) # getting rid of unnecessary columns 
most_common_issues["issueType"] = "Common"
other_common_issues["issueType"] = "Uncommon"
all_common_issues = pd.concat([most_common_issues,other_common_issues]) #combination of all 

all_common_issues.to_parquet(f"ga_reports/guided_steps_data/CommonIssues_Data_Monthly_{month_start}.gzip", index=False) # export 

# Overall
monthly_overall = fetch_metrics_aggregate(property_id, month_start, month_end)
monthly_overall.to_parquet(f"ga_reports/aggregate_data/monthly/Overall_Data_Monthly_{month_start}.gzip", index=False)

# Weekly Data


In [None]:
start_of_week = current_date - timedelta(days=current_date.weekday()+1)
end_of_week = start_of_week + timedelta(days = 6)
start_of_week = start_of_week.strftime("%Y-%m-%d")
if compare_to_end(current_date, end_of_week): 
    end_of_week = current_date.strftime("%Y-%m-%d")
else: 
    end_of_week = end_of_week.strftime("%Y-%m-%d")

# individual
weekly_pages = fetch_metrics_per_page(property_id, start_of_week, end_of_week)
weekly_pages.to_parquet(f"ga_reports/page_data/weekly/Page_Data_Weekly_{start_of_week}.gzip", index=False)

# Aggregate
weekly_overall = fetch_metrics_aggregate(property_id, start_of_week, end_of_week)
weekly_overall.to_parquet(f"ga_reports/aggregate_data/weekly/Overall_Data_Weekly_{start_of_week}.gzip", index=False)

# Quarterly Data

In [None]:
# individual
quarterly_pages = fetch_metrics_per_page(property_id, fiscal_quarter[0], fiscal_quarter[1])
quarterly_pages.to_parquet(f"ga_reports/page_data/quarterly/Pages_Data_Quarterly_{fiscal_quarter[0]}.gzip", index=False)

# aggregate
quarterly_overall = fetch_metrics_aggregate(property_id, fiscal_quarter[0], fiscal_quarter[1])
quarterly_overall.to_parquet(f"ga_reports/aggregate_data/quarterly/Overall_Data_Quarterly_{fiscal_quarter[0]}.gzip", index=False)

# Annual Data

In [None]:
# Individual pages 
annual_pages = fetch_metrics_per_page(property_id, fiscal_year[0], fiscal_year[1])
annual_pages.to_parquet(f"ga_reports/page_data/annually/Pages_Data_Annual_{fiscal_year[0]}.gzip", index=False)

# overall page
annual_overall = fetch_metrics_aggregate(property_id, fiscal_year[0], fiscal_year[1])
annual_overall .to_parquet(f"ga_reports/aggregate_data/annually/Overall_Data_Annual_{fiscal_year[0]}.gzip", index=False)