In [1]:
# !pip install --upgrade google-cloud-bigquery
# !pip install pyarrow
# !pip install pandas-gbq

In [1]:
# import google
from google.cloud import bigquery
from google.oauth2 import service_account
from airflow.models import Variable

import pandas as pd
from bs4 import BeautifulSoup
import requests
import re
import numpy as np

In [2]:
credentials = service_account.Credentials.from_service_account_file(
    filename='dags/data/credentials.json',
    scopes = ["https://www.googleapis.com/auth/cloud-platform"]
)

In [3]:
# FUNCTIONS
def find_text(element):
  # Looking for text nested
  text = ''
  for tag in element.children:
    text += clean_string(str(tag.string))
  return text

def clean_string(text):
  # Removes spaces and replacing non-breaking spaces (\xa0)
  return re.sub(r'(?<!\w)\s(?!\w)|\n', '', text.replace(u'\xa0', ' '))

def scrape_rotten(url):
  res = requests.get(url)
  
  if res.ok:
    soup = BeautifulSoup(res.text, 'html.parser')

    score_board = soup.find('score-board')
    tomatometer = score_board['tomatometerscore']
    audience = score_board['audiencescore']
    tomatometer_reviews, audience_reviews = [int(re.sub(r'[^0-9]', '', tag.string) or 0) for tag in score_board.find_all('a')]
    
    movie_info_data = soup.find('section', {'id': 'movie-info'}).find_all('li')
    movie_info = {clean_string(str(tag.find('b').string[:-1])): find_text(tag.find('span')) for tag in movie_info_data}
    movie_info.update({
      'tomattometer_score': tomatometer, 
      'audience_score': audience, 
      'tomattometer_reviews': tomatometer_reviews, 
      'audience_reviews': audience_reviews
    })

    return movie_info
  
  return {}

def rottenize(title):
        reaplace_dict = {':': '', "'": '', ".":"", ",":"", "-":"_", " ":"_"}
        return title.translate(str.maketrans(reaplace_dict)).lower()

def get_rotten_data(movie_titles, ids, keys):
  data = []
  for tmdb_id, title in zip(ids, movie_titles):
    url = f'https://www.rottentomatoes.com/m/{rottenize(title)}'
    try:
      rotten_data = scrape_rotten(url)
    except:
      continue
    if rotten_data:
      final_dict = dict.fromkeys(keys)
      final_dict['tmdb_id'] = tmdb_id
      final_dict['original_title'] = title
      final_dict.update(rotten_data)
      # Tratando o Box Office
      if 'Box Office (Gross USA)' in final_dict.keys():
        K = 1000
        M = 1000000
        original_box_office = final_dict['Box Office (Gross USA)']
        if original_box_office:
          box_office = float(re.sub(r'[^0-9\.]', '', str(original_box_office)))
          final_dict['Box Office (Gross USA)'] = box_office * K if original_box_office[-1] == 'K' else box_office * M
        else:
          final_dict['Box Office (Gross USA)'] = np.nan

      data.append(final_dict)
  return data

def get_rotten_dataframe(movies_dataframe):
  """recebe um dataframe com os filmes"""
  keys = ['tmdb_id', 'original_title', 'Box Office (Gross USA)', 
          'Release Date (Theaters)', 'Release Date (Streaming)', 'tomattometer_score', 
          'audience_score', 'tomatometer_reviews', 'audience_reviews', 
          'Distributor', 'Aspect Ratio', 'Runtime', 
          'Writer', 'Original Language', 'Rating', 
          'Director', 'Genre', 'Producer', 
          'Sound Mix', 'Aspect Ratio']
  data = get_rotten_data(movies_dataframe['original_title'], movies_dataframe['id'], keys)
  df_out = pd.DataFrame.from_dict(data).set_index('tmdb_id')
  df_out = df_out.reset_index().astype(str)
  # rename the columns to match the table schema
  df_out.columns = [
          'tmdb_id', 'original_title', 'Box Office',
        'Release Date Theaters', 'Release Date Streaming', 'tomattometer_score', 
        'audience_score', 'tomatometer_reviews', 'audience_reviews', 
        'Distributor', 'Aspect Ratio', 'Runtime', 
        'Writer', 'Original Language', 'Rating', 
        'Director', 'Genre', 'Producer',
        'Sound Mix', 'tomattometer_reviews', 'Production Co'
  ]
  
  return df_out

def get_bigquery_tmdb_df(begin_date, end_date):
    """this function gets the tmdb_id and original_title 
    from the discover table in bigquery"""

    # credentials = service_account.Credentials.from_service_account_file(
    #     filename=Variable.get('credentials_path'),
    #     scopes = ["https://www.googleapis.com/auth/cloud-platform"]
    # )

    # Construct a BigQuery client object.
    client = bigquery.Client(credentials= credentials)

    query = """
        SELECT id, original_title FROM `turingdb.data_warehouse.discover` 
        where release_date between '{}' and '{}' limit 100;
    """.format(begin_date, end_date)
    query_job = client.query(query)  # Make an API request.

    print("The query data:")

    tmdb_id = []
    original_titles = []
    for row in query_job:
        # Row values can be accessed by field name or index.
        # store the data in the lists
        tmdb_id.append(row[0])
        original_titles.append(row[1])

    df = pd.DataFrame(list(zip(tmdb_id, original_titles)), columns =['id', 'original_title'])
    return df

# insert the data into the bigquery table using insert into
def insert_into_bigquery(df_rotten):
    """this function inserts the data into the bigquery table"""
    client = bigquery.Client()
    table_id = "turingdb.data_warehouse.rotten_tomatoes"

    job = client.load_table_from_dataframe(
        df_rotten, 
        table_id
        # , job_config=job_config    ### Uncomment to create the table for the first time
    )  # Make an API request.

    job.result()  # Wait for the job to complete.

# FUNCTIONS END

In [4]:
df = get_bigquery_tmdb_df('2023-01', '2023-02')
df_rotten = get_rotten_dataframe(df)
insert_into_bigquery(df_rotten)

The query data:


In [14]:
# ### UNCOMMENT THE CODE BELOW TO CREATE THE TABLE IN BIGQUERY FOR THE FIRST TIME
# # job_config = bigquery.LoadJobConfig(
# schema=[
#         bigquery.SchemaField("tmdb_id", "STRING"),
#         bigquery.SchemaField("original_title", "STRING"),
#         bigquery.SchemaField("Box Office", "STRING"),
#         bigquery.SchemaField("Release Date Theaters", "STRING"),
#         bigquery.SchemaField("Release Date Streaming", "STRING"),
#         bigquery.SchemaField("tomattometer_score", "STRING"),
#         bigquery.SchemaField("audience_score", "STRING"),
#         bigquery.SchemaField("tomatometer_reviews", "STRING"),
#         bigquery.SchemaField("audience_reviews", "STRING"),
#         bigquery.SchemaField("Distributor", "STRING"),
#         bigquery.SchemaField("Aspect Ratio", "STRING"),
#         bigquery.SchemaField("Runtime", "STRING"),
#         bigquery.SchemaField("Writer", "STRING"),
#         bigquery.SchemaField("Original Language", "STRING"),
#         bigquery.SchemaField("Rating", "STRING"),
#         bigquery.SchemaField("Director", "STRING"),
#         bigquery.SchemaField("Genre", "STRING"),
#         bigquery.SchemaField("Producer", "STRING"),
#         bigquery.SchemaField("Sound Mix", "STRING"),
#         bigquery.SchemaField("tomattometer_reviews", "STRING"),
#         bigquery.SchemaField("Production Co", "STRING")
# ]
# # ,
# #     write_disposition="WRITE_TRUNCATE",
# # )

# # Construct a BigQuery client object.
# client = bigquery.Client(credentials=credentials)

# table_id = "turingdb.data_warehouse.rotten_tomatoes"

# table = bigquery.Table(table_id, schema=schema)
# table = client.create_table(table)  # API request