In [11]:
from datetime import datetime, timedelta
import numpy as np
import pandas as pd
import seaborn as sns
import pandas_gbq
import pydata_google_auth
from concurrent.futures import ThreadPoolExecutor, as_completed
from google.cloud import bigquery
from tqdm.notebook import tqdm
from dateutil import tz
from BigQueryTools import QueryTool
import re

SCOPES = [
    'https://www.googleapis.com/auth/cloud-platform',
    'https://www.googleapis.com/auth/drive',
]
credentials = pydata_google_auth.get_user_credentials(
    SCOPES,
    # Set auth_local_webserver to True to have a slightly more convienient
    # authorization flow. Note, this doesn't work if you're running from a
    # notebook on a remote sever, such as over SSH or with Google Colab.
    auth_local_webserver=True,
)

%load_ext google.cloud.bigquery
%env GCLOUD_PROJECT=nbcu-ds-sandbox-a-001

The google.cloud.bigquery extension is already loaded. To reload it, use:
  %reload_ext google.cloud.bigquery
env: GCLOUD_PROJECT=nbcu-ds-sandbox-a-001


# How to use

### Before running
- Change start_date variable to analysis start. 
- Change end_date variable to analysis end. 
### Monthly and Quarterly Staging Tables
- Be aware that the pipeline will overwrite the table - it does not append new data to the existing table
- bq.add() adds jobs to queue. If a job with the same name is running, bq.add() will skip the file. Use bq.cancel() to drop currently running jobs.
- bq.start() runs all jobs in queue
- bq.update() updates all jobs in queue. Jobs that are finished are listed as DONE.
- bq.cancel() drops all running jobs in queue
- WAIT UNTIL ALL JOBS ARE COMPLETED BEFORE PROCEEDING
### Compiling Results
- "Compile Results" section puts together Monthly and Quarterly Results

In [12]:
start_date = '2022-01-01'
end_date = '2023-12-31'

In [13]:
def get_monthly_report_dates(start_date, end_date):
    start = datetime.strptime(start_date, "%Y-%m-%d")
    end = datetime.strptime(end_date, "%Y-%m-%d")
    monthly_reports = {}

    while start <= end:
        month_year = start.strftime("%b_%Y")
        report_start_date = start.replace(day=1)
        next_month = start.replace(day=28) + timedelta(days=4)
        report_end_date = next_month - timedelta(days=next_month.day)
        monthly_reports[month_year] = {
            "report_start_date": report_start_date.strftime("%Y-%m-%d"),
            "report_end_date": report_end_date.strftime("%Y-%m-%d")
        }
        start = next_month

    return monthly_reports

def get_quarter(p_date) -> int:
    return (p_date.month - 1) // 3 + 1

def get_quarterly_report_dates(start_date_str, end_date_str):
    start_date = datetime.strptime(start_date_str, '%Y-%m-%d')
    end_date = datetime.strptime(end_date_str, '%Y-%m-%d')

    quarter_dates = {}

    while start_date <= end_date:
        quarter = get_quarter(start_date)
        quarter_name = 'Q{}_{}'.format(quarter, start_date.year)
        quarter_start = datetime(start_date.year, (start_date.month - 1) // 3 * 3 + 1, 1)
        quarter_end = quarter_start.replace(year=quarter_start.year + 3 * quarter // 12, month=(quarter_start.month + 3) % 12, day=1) - timedelta(days=1)
        quarter_dates[quarter_name] = {
            'report_start_date': quarter_start.strftime('%Y-%m-%d'),
            'report_end_date': quarter_end.strftime('%Y-%m-%d')
        }
        start_date = quarter_end + timedelta(days=1)

    return quarter_dates

def generate_report_variants(period_dict, suffix="report"):
    return {'_'.join([period, suffix]): {'string_format': {'report': period}, 'params': params} for period, params in period_dict.items()}

In [14]:
months = get_monthly_report_dates(start_date, end_date)
quarters = get_quarterly_report_dates(start_date, end_date)
monthly_viewing_tables = generate_report_variants(months, 'viewing')
quarterly_viewing_tables = generate_report_variants(quarters, 'viewing')
month_tables = generate_report_variants(months)
quarter_tables = generate_report_variants(quarters)
bq = QueryTool()

# Viewing Tables

In [15]:
with open('CRM_Viewing.sql') as file:
    sql_file = file.read()
    bq.add(sql_file, monthly_viewing_tables)
    bq.add(sql_file, quarterly_viewing_tables)

Added: Jan_2022_viewing	 | 	Awaiting start
Added: Feb_2022_viewing	 | 	Awaiting start
Added: Mar_2022_viewing	 | 	Awaiting start
Added: Apr_2022_viewing	 | 	Awaiting start
Added: May_2022_viewing	 | 	Awaiting start
Added: Jun_2022_viewing	 | 	Awaiting start
Added: Jul_2022_viewing	 | 	Awaiting start
Added: Aug_2022_viewing	 | 	Awaiting start
Added: Sep_2022_viewing	 | 	Awaiting start
Added: Oct_2022_viewing	 | 	Awaiting start
Added: Nov_2022_viewing	 | 	Awaiting start
Added: Dec_2022_viewing	 | 	Awaiting start
Added: Jan_2023_viewing	 | 	Awaiting start
Added: Feb_2023_viewing	 | 	Awaiting start
Added: Mar_2023_viewing	 | 	Awaiting start
Added: Apr_2023_viewing	 | 	Awaiting start
Added: May_2023_viewing	 | 	Awaiting start
Added: Jun_2023_viewing	 | 	Awaiting start
Added: Jul_2023_viewing	 | 	Awaiting start
Added: Aug_2023_viewing	 | 	Awaiting start
Added: Sep_2023_viewing	 | 	Awaiting start
Added: Oct_2023_viewing	 | 	Awaiting start
Added: Nov_2023_viewing	 | 	Awaiting start
Added: Dec_

In [16]:
bq.start()

start 3acdfaa7-d611-4e05-94a9-87be2a8eb018
start 5f324d3a-6c42-40ca-9b3f-2faf0df33f3f
start 3a380521-efa6-42f8-98c3-5838a3c8686a
start a2c1c308-18f8-4e07-909b-d67fa9d4a9f7
start 104c3318-81ae-498f-a664-f53ba5b87222
start edef0a36-6fe5-4ba8-99d6-0401c949f151
start 15b67a7a-282d-4605-9985-dc3425d3af72
start f2426b92-7c08-4eb2-b726-c8d72f41cc92
start 355ef375-7a69-4891-899a-f0803e710a05
start 69e47648-a0cd-4413-aa11-069f875dc481
start 024be6cb-a374-424e-aca8-8575269de4bc
start 8b9ab80b-2160-4e78-bb72-d8fab70dba7e
start 53dcd572-e47a-4b58-9b28-7eb2829d17ed
start 92216f0b-4e2d-4f1f-89a1-c5b75f4272ff
start 4463837c-eddd-4cb9-916b-133f07bd04b2
start 9e881393-d419-4347-a781-c1ca9fbf8d33
start 734d22ec-2986-48ae-921a-8da1db0434e8
start b5c8e694-f242-47c5-92a4-36331910872a
start 4d7bb54a-bd60-4a2d-9f36-1151bfdd2892
start 279f6eea-5c28-4937-b204-48ab1112e6bc
start ec659718-54bc-409a-b3f4-9cf4e43a7ee7
start d25b0c46-678e-4ddc-b37e-8551bc4401ba
start 52cb60c2-0a28-4797-af7a-bd06198a2835
start 8fb7b

In [17]:
bq.update()

Active jobs: 
Q4_2022_viewing	 | 	09241799-38a7-40cc-9920-5491ba6eddba	 | 	Created: Jan 09, 2024 09:20 AM	 | 	Last Checked: Jan 09, 2024 10:19 AM	 | 	RUNNING
Q1_2023_viewing	 | 	52cb60c2-0a28-4797-af7a-bd06198a2835	 | 	Created: Jan 09, 2024 09:20 AM	 | 	Last Checked: Jan 09, 2024 10:19 AM	 | 	RUNNING
Q2_2023_viewing	 | 	15b67a7a-282d-4605-9985-dc3425d3af72	 | 	Created: Jan 09, 2024 09:20 AM	 | 	Last Checked: Jan 09, 2024 10:19 AM	 | 	RUNNING
Q3_2023_viewing	 | 	8fb7b7d5-c302-4fda-959d-08ef152846dd	 | 	Created: Jan 09, 2024 09:20 AM	 | 	Last Checked: Jan 09, 2024 10:19 AM	 | 	RUNNING
Q4_2023_viewing	 | 	2423994b-147e-407c-956a-c3dce27e6960	 | 	Created: Jan 09, 2024 09:20 AM	 | 	Last Checked: Jan 09, 2024 10:19 AM	 | 	RUNNING

Finished jobs: 
Jan_2022_viewing	 | 	9e881393-d419-4347-a781-c1ca9fbf8d33	 | 	Created: Jan 09, 2024 09:20 AM	 | 	Last Checked: Jan 09, 2024 10:19 AM	 | 	DONE
Feb_2022_viewing	 | 	5f324d3a-6c42-40ca-9b3f-2faf0df33f3f	 | 	Created: Jan 09, 2024 09:20 AM	 | 	Last Check

# Compile Results

In [18]:
# Merge and group monthly results
query = """
        CREATE or replace TABLE `nbcu-ds-sandbox-a-001.SLi_sandbox.SILVER_VIDEO_VIEWING_MONTHLY` AS 
        
        SELECT *
        FROM ( 
        """ + \
        (' UNION ALL ').join([f"(select * from `nbcu-ds-sandbox-a-001.SLi_sandbox.Video_Viewing_{report}`)" for report in months]) + \
        """
        )
        """
        
with bigquery.Client() as client:
    client.query(query).result()

In [19]:
# Merge and group monthly results
query = """
        CREATE or replace TABLE `nbcu-ds-sandbox-a-001.SLi_sandbox.SILVER_VIDEO_VIEWING_QUARTERLY` AS 
        
        SELECT *
        FROM ( 
        """ + \
        (' UNION ALL ').join([f"(select * from `nbcu-ds-sandbox-a-001.SLi_sandbox.Video_Viewing_{report}`)" for report in quarters]) + \
        """
        )
        """
        
with bigquery.Client() as client:
    client.query(query).result()