<a href="https://colab.research.google.com/github/jmccrosky/regrets-reporter/blob/master/analysis/RegretsReporter_API_Pipeline.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install needed libraries

!pip install -U google-api-python-client google-auth-oauthlib google-auth-httplib2 google-cloud-bigquery google-cloud-bigquery-storage pyarrow

In [None]:
# Import needed libraries

from apiclient.discovery import build
from google.cloud import bigquery
from google.cloud import bigquery_storage
import pandas as pd
import google.auth
from google.colab import auth

In [None]:
# YouTube API Key

api_key='REDACTED'

In [None]:
# Establish connection to YouTube API

YOUTUBE_API_SERVICE_NAME = "youtube"
YOUTUBE_API_VERSION = "v3"

youtube = build(YOUTUBE_API_SERVICE_NAME, YOUTUBE_API_VERSION,
    developerKey=api_key)

In [None]:
# Schema for stored YouTube API data

SCHEMA = [
  bigquery.SchemaField(
    "video_id", "STRING", mode="REQUIRED",
    description="YouTube Video ID"),
  bigquery.SchemaField(
    "language", "STRING", mode="NULLABLE",
    description="defaultAudioLanguage specified by uploader"),
  bigquery.SchemaField(
    "tags", "STRING", mode="REPEATED",
    description="tags as specified by uploader"),
  bigquery.SchemaField(
    "comment_count", "INT64", mode="NULLABLE",
    description="comment count"),
  bigquery.SchemaField(
    "like_count", "INT64", mode="NULLABLE",
    description="like count"),
  bigquery.SchemaField(
    "dislike_count", "INT64", mode="NULLABLE",
    description="dislike count"),
  bigquery.SchemaField(
    "takedown", "BOOLEAN", mode="REQUIRED",
    description="video has been taken down"),
  bigquery.SchemaField(
    "blocked", "STRING", mode="REPEATED",
    description="countries video is blocked in"),
]

In [None]:
# Set up access to Mozilla BigQuery

auth.authenticate_user()
credentials, your_project_id = google.auth.default(
    scopes=["https://www.googleapis.com/auth/cloud-platform"]
)
print('Authenticated')

project_id = "moz-fx-data-bq-regrets-report"
bq_client = bigquery.Client(project=project_id, credentials=credentials)
bq_storage_client = bigquery_storage.BigQueryReadClient(credentials=credentials)

Authenticated


In [None]:
# Define and create table for storing API data

table_ref = "moz-fx-data-shared-prod.regrets_reporter_analysis.yt_api_data_v4"
table = bigquery.Table(table_ref, schema=SCHEMA)
table = bq_client.create_table(table)

In [None]:
# Specify end date for RegretsReport data to process

end_date = "2021-05-31"

In [None]:
# Get list of reported and trail videos from RegretsReporter data

report_query = f'''
WITH deletion_requests_t AS (
  SELECT
    event_metadata.client_timestamp AS timestamp,
    data_deletion_request.extension_installation_uuid AS installation_id,
  FROM
    `moz-fx-data-shared-prod.regrets_reporter.regrets_reporter_update`
  WHERE
    date(submission_timestamp) >= "2020-6-1"  -- Filter on submission_timestamp is required by policy.
    AND data_deletion_request IS NOT NULL
),
cleaned_t AS (
  SELECT *
  FROM
    `moz-fx-data-shared-prod.regrets_reporter.regrets_reporter_update` main_t
  LEFT JOIN
    deletion_requests_t
  ON
    main_t.event_metadata.extension_installation_uuid = deletion_requests_t.installation_id
    AND main_t.event_metadata.client_timestamp <= deletion_requests_t.timestamp
  WHERE
    deletion_requests_t.installation_id IS NULL
    AND date(submission_timestamp) <= "{end_date}"
)
SELECT
  id
FROM UNNEST((
  SELECT
    video_ids
  FROM (
    SELECT
      ARRAY_CONCAT_AGG(
        ARRAY_CONCAT(
         [regret_report.report_data.youtube_navigation_metadata.video_metadata.video_id],
         ARRAY((SELECT video_metadata.video_id FROM UNNEST(regret_report.report_data.parent_youtube_navigations_metadata) WHERE video_metadata.video_id IS NOT NULL))
        )
      ) AS video_ids,
    FROM
      cleaned_t
    WHERE
      regret_report.report_data.youtube_navigation_metadata.video_metadata.video_id IS NOT NULL
  )
)) AS id
GROUP BY id
'''

report_list = bq_client.query(report_query).result().to_dataframe(bqstorage_client=bq_storage_client)

In [None]:
# Get list of already-acquired API data

acquired_query = '''
SELECT
  video_id
FROM
  `moz-fx-data-shared-prod.regrets_reporter_analysis.yt_api_data_v4`
'''

acquired_list = bq_client.query(acquired_query).result().to_dataframe(bqstorage_client=bq_storage_client)

In [None]:
# Run pipeline to fetch API data, storing to BigQuery after every 10 videos

table_ref = "moz-fx-data-shared-prod.regrets_reporter_analysis.yt_api_data_v4"
table = bigquery.Table(table_ref, schema=SCHEMA)

if len(acquired_list) > 0:
  needed = [v for v in report_list.id if v not in list(acquired_list.video_id)]
else:
  needed = [v for v in report_list.id]
ids = []
languages = []
tag_lists = []
dislike_counts = []
like_counts = []
comment_counts = []
takedowns = []
blocked_country_lists = []
for v in needed:
  request = youtube.videos().list(
    part="snippet,statistics,contentDetails",
    id=v
  )
  response = request.execute()

  takedown = False
  tags = []
  language = None
  comment_count = None
  like_count = None
  dislike_count = None
  blocked_countries = []
  if len(response['items'])==0:
    takedown = True
  else:
    if "tags" in response['items'][0]['snippet']:
      tags = response["items"][0]["snippet"]["tags"]
    if "regionRestriction" in response['items'][0]['contentDetails'] and "blocked" in response['items'][0]['contentDetails']['regionRestriction']:
      blocked_countries = response['items'][0]['contentDetails']['regionRestriction']['blocked']    
    if 'defaultAudioLanguage' in response["items"][0]["snippet"]:
      language = response["items"][0]["snippet"]['defaultAudioLanguage']
    if 'defaultLanguage' in response["items"][0]["snippet"]:
      language = response["items"][0]["snippet"]['defaultLanguage']
    if 'statistics' in response['items'][0]:
      if 'dislikeCount' in response['items'][0]['statistics']:
        dislike_count = response['items'][0]['statistics']['dislikeCount']
      if 'likeCount' in response['items'][0]['statistics']:
        like_count = response['items'][0]['statistics']['likeCount']
      if 'commentCount' in response['items'][0]['statistics']:
        comment_count = response['items'][0]['statistics']['commentCount']
  ids = ids + [v]
  languages = languages + [language]
  tag_lists = tag_lists + [tags]
  dislike_counts = dislike_counts + [dislike_count]
  like_counts = like_counts + [like_count]
  comment_counts = comment_counts + [comment_count]
  takedowns = takedowns + [takedown]
  blocked_country_lists = blocked_country_lists + [blocked_countries]
  print("loading row with {} tags".format(len(tags)))
  if len(ids) >= 10:
    job_config = bigquery.LoadJobConfig(
          write_disposition="WRITE_APPEND",
          schema=SCHEMA,
      )
    
    load_job = bq_client.load_table_from_json(
        pd.DataFrame({"video_id": ids, "language":languages, "tags":tag_lists, "comment_count":comment_counts, "like_count":like_counts, "dislike_count":dislike_counts, "takedown":takedowns, "blocked":blocked_country_lists}).to_dict(orient='records'),
        table,
        job_config=job_config,
    )
    load_job.result()
    ids = []
    languages = []
    tag_lists = []
    dislike_counts = []
    like_counts = []
    comment_counts = []
    takedowns = []
    blocked_country_lists = []