In [227]:
# Module Imports
from google.cloud import bigquery
from google.oauth2 import service_account
from errors import ValidationError, UnsupportedPlatformError
import io
import os
import pandas as pd
import subprocess
from sys import platform, exit

# Loading Wrapper Classes
from csv_parser import CsvParser
from screaming_frog_automation import ScreamingFrogAnalyser

# Utility Functions
from utils import config_setup_check, dataframe_checker, dataframe_row_checker, YamlParser
config = YamlParser()

# Setup Variables - You will need to change these depending upon your Mac + Google Cloud Platform Setup!
OUTPUTFOLDER = config.data['environment-variables']['OUTPUTFOLDER']
SERVICE_ACCOUNT_KEY_LOCATION = config.data['environment-variables']['SERVICE_ACCOUNT_KEY_LOCATION']
GOOGLE_CLOUD_PROJECT_ID = config.data['environment-variables']['GOOGLE_CLOUD_PROJECT_ID']
GOOGLE_CLOUD_BIGQUERY_DATASET_ID = config.data['environment-variables']['GOOGLE_CLOUD_BIGQUERY_DATASET_ID']
BIGQUERY_TABLE_ID_MAPPINGS = config.data['bigquery_table_id_mappings']

In [228]:
credentials = service_account.Credentials.from_service_account_file(SERVICE_ACCOUNT_KEY_LOCATION)
client = bigquery.Client(credentials=credentials, project=GOOGLE_CLOUD_PROJECT_ID)

In [252]:
create_bigquery_table = False

----------------------------------------------------------------------------------------------------------------

## How To Create A Single Tab with Specific Schema:

In [233]:
# # TODO(developer): Set table_id to the ID of the table to create.
# # table_id = "your-project.your_dataset.your_table_name"

# schema = [
#     bigquery.SchemaField("full_name", "STRING", mode="REQUIRED"),
#     bigquery.SchemaField("age", "INTEGER", mode="REQUIRED"),
# ]

# table = bigquery.Table(table_id, schema=schema)
# table = client.create_table(table)  # Make an API request.
# print(
#     "Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id)
# )

---------------------------------------------------------------------------------------------------------

### Create a dynamic table generated from the parser class:

In [234]:
# Setting Some Variables From The Mock-Test Data Folder:
outputfolder = '/Users/jamesaphoenix/Desktop/Imran_And_James/Python_For_SEO/Course/11_data_wrangling_screaming_frog/src/test_data'

# Single File Paths:
csv_single_file_path = [outputfolder + '/' +  '2020.07.04.19.48.59']
seo_spider_single_file_path = [outputfolder + '/' + '2020-no-csv-exports-1']

# Multiple File Paths:
csv_multiple_file_paths = ['2020.07.04.19.48.59', '2020.07.04.19.49.38']; csv_multiple_file_paths = [outputfolder + '/' + item for item in csv_multiple_file_paths]
seo_spider_multiple_file_paths = ['2020-no-csv-exports-1', '2020-no-csv-exports-2'];
seo_spider_multiple_file_paths = [outputfolder + '/' + item for item in seo_spider_multiple_file_paths]

website_urls = ['https://phoenixandpartners.co.uk/', 'https://phoenixandpartners.co.uk/']

In [235]:
parser = CsvParser(outputfolder=outputfolder,
                  file_paths=csv_multiple_file_paths,
                  website_urls=website_urls)

In [236]:
if not any(dataframe_checker(parser)):
        print('''Finished crawling and saved the output to your desired folder/folders. It's impossible to save to BigQuery because you have no .csv data.
        Re-run the script with export_tabs, export_reports, or export_bulk_exports if you would like to upload to BigQuery!

        Existing the program.
        ''')
        # exit() <-- Disabling this whilst running tests.

In [237]:
# 2.1 Data checking - Compile a list of dataframes that have both rows and columns:
available_data = dataframe_row_checker(parser)

In [253]:
class BigQueryAutomation():
    def __init__(self, client, project_id, dataset_id,
    available_data_dict, create_bigquery_table):
        self.client = client
        self.project_id = project_id
        self.dataset_id = dataset_id
        self.available_data_dict = available_data_dict
        self.create_bigquery_table_boolean = create_bigquery_table
        
        # Automatically executed functions;
        self._create_schema_dictionary()

    # Helper Functions
    def extract_schema_data(self, df, date_column_name):
        table_schema = []
        for name, dtype in zip(df.columns, df.dtypes):
            if name == date_column_name:
                table_schema.append('DATETIME')
            elif dtype.name == 'object':
                table_schema.append('STRING')
            else:
                table_schema.append(str(dtype).upper())
        return table_schema

    # Automatically create the required schema
    def _create_schema_dictionary(self):
        self.master_schema_data = []
        for key, value in self.available_data_dict.items():
            schema_dict = {}
            schema_dict['name'] = key

            # Extracting the column names:
            # Replacing all () in pandas dataframes:
            value.columns = value.columns.str.replace("[() ]", "_")
            bq_column_names = list(value.columns)
            # Converting the date column:
            value['Date'] = pd.to_datetime(value['Date'])
            value['Date'] = value['Date'].dt.strftime("%Y-%m-%dT%H:%M:%S")

            # Dynamically creating the SQL Schema From Two Lists (Column Names)
            table_schema = self.extract_schema_data(value, 'Date')
            schema_results = []
            for name, schema in zip(bq_column_names, table_schema):
                schema_results.append(bigquery.SchemaField(name, schema, mode='NULLABLE'))

            schema_dict['data'] = value
            schema_dict['schema'] = schema_results

            # Save it to the schema data list
            self.master_schema_data.append(schema_dict)
            
    def _create_single_bq_table(self, item):
        name = item['name'].split('.')[0]
        table_id = f"{self.project_id}.{self.dataset_id}.{name}"
        table = bigquery.Table(table_id, schema=item['schema'])
        table = self.client.create_table(table)  # Make an API request.

    # Create BigQuery Tables
    def _upload_single_bg_table(self, item):
        # Assigning the references
        table_id = item['name'].split('.')[0]
        dataset_ref = self.client.dataset(self.dataset_id)
        table_ref = dataset_ref.table(table_id)
        # Customise the Jobconfig setup
        job_config = bigquery.LoadJobConfig()
        job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
        job_config.schema = item['schema']
        # Running the Job iside of a StringIO stream:
        with io.StringIO(item['data'].to_json(orient="records",lines=True)) as source_file:
            job = self.client.load_table_from_file(source_file, table_ref, job_config=job_config)
        job.result()
    
    # Execution Functions:
    def create_all_bq_tables(self):
        for result in self.master_schema_data:
            self._create_single_bq_table(result)
    
    def upload_all_bq_tables(self):
        for result in self.master_schema_data:
            self._upload_single_bg_table(result)

    # Push The Data To BigQuery If The BigQuery table is there it is an dataframe that is not empty
    def automate_bq_reports(self):
        if self.create_bigquery_table_boolean:
            self.create_all_bq_tables()
            self.upload_all_bq_tables()
        else:
            self.upload_all_bq_tables()

In [254]:
# 1. Check that all of the bg_automation keys have matches to the bgquery mappings dictionary
checking_data = set(available_data.keys()) - set(BIGQUERY_TABLE_ID_MAPPINGS)

if checking_data:
    raise ValidationError("All of the BigQuery mappings haven't been matched against the available data",
                          '''BIGQUERY_TABLE_ID_MAPPINGS = config.data['bigquery_table_id_mappings']''')

In [256]:
# 2. Change the name on the key to be the dictionary name instead of the standard one.
if create_bigquery_table is False:
    available_data = {BIGQUERY_TABLE_ID_MAPPINGS[key].replace("[() ]", "_"): 
                      value for key, value in available_data.items()}

In [261]:
# 3. Perform the BigQuery creation
bg_automation = BigQueryAutomation(client=client, project_id=GOOGLE_CLOUD_PROJECT_ID,
                                  dataset_id=GOOGLE_CLOUD_BIGQUERY_DATASET_ID, 
                                  available_data_dict=available_data,
                                  create_bigquery_table=False)

In [262]:
bg_automation.automate_bq_reports()