In [1]:
import os
import time
import json
import pandas as pd
pd.options.display.max_rows = 999
pd.options.display.max_columns = 999
#pip install google-cloud-storage
from google.cloud import storage

#pip install -U google-api-python-client
from googleapiclient import discovery

#pip install -U oauth2client
from oauth2client.client import GoogleCredentials
from google.oauth2 import service_account

#pip install pandas-gbq -U
from google.cloud import bigquery




st = time.time()

#Setup Application Default Credentials -- https://cloud.google.com/docs/authentication/client-libraries#python
os.environ["GOOGLE_APPLICATION_CREDENTIALS"] = os.getcwd() + "\\application_default_credentials.json"
credentials = GoogleCredentials.get_application_default()

service = discovery.build('cloudresourcemanager', 'v1', credentials=credentials)
 
request = service.projects().list()
project_list = []

while request is not None:
    response = request.execute()
    for project in response.get('projects', []):
        #print('{:<20} {:<22} {:<21}'.format(project['projectId'], project['name'], project['projectNumber']))
        project_list.append(pd.json_normalize(project,sep='_'))
    request = service.projects().list_next(previous_request=request, previous_response=response)

project_df = pd.concat(project_list)
project_df.rename(columns={"labels_support-group": "labels_support_group"},inplace=True)
project_df['insert_datetime'] = pd.Timestamp.today()
project_df = project_df.reset_index(drop=True)



In [2]:
bucket_list = []
item_list = []
storage_client = storage.Client(project='gcp-wow-rwds-ai-bi-prod')
buckets = storage_client.list_buckets()
for bucket in buckets:
    bucket_list.append(pd.json_normalize(dict({'bucket_id':bucket.id,
                        'bucket_name':bucket.name,
                        'bucket_location':bucket.location,
                        'bucket_location_type':bucket.location_type,
                        'bucket_storage_class':bucket.storage_class,
                        'bucket_owner':bucket.owner,
                        'bucket_user_project':bucket.user_project,
                        'bucket_project_number':bucket.project_number,
                        'bucket_label':bucket.labels                     
                       }),sep='_'))
    blobs = bucket.list_blobs()
    for item in blobs:
        item_list.append(pd.json_normalize(dict({'bucket_id':bucket.id,
                        'item_name':item.name,
                        'item_size':item.size,
                        'item_content_type':item.content_type,
                        'item_storage_class':item.storage_class,
                        'item_owner':item.owner,
                        'item_time_created':item.time_created,
                        'item_updated':item.updated                    
                       }),sep='_'))
        
bucket_df = pd.concat(bucket_list)
item_df = pd.concat(item_list)
bucket_df.rename(columns={"bucket_label_goog-composer-location": "bucket_label_goog_composer_location",
                         "bucket_label_goog-composer-environment":"bucket_label_goog_composer_environment",
                          "bucket_label_goog-composer-version":"bucket_label_goog_composer_version"
                         } ,inplace=True)

bucket_df['insert_datetime'] = pd.Timestamp.today()
item_df['insert_datetime'] = pd.Timestamp.today()
bucket_df = bucket_df.reset_index(drop=True)
item_df = item_df.reset_index(drop=True)





In [3]:

client = bigquery.Client(project='gcp-wow-rwds-ai-bi-prod')

#Load Project Table
project_table_id = "gcp-wow-rwds-ai-bi-prod.airflow_logs.gcp_project_list"
job_config_project = bigquery.LoadJobConfig(

    schema=[
        bigquery.SchemaField("projectNumber", bigquery.enums.SqlTypeNames.STRING),#ss
        bigquery.SchemaField("projectId", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("lifecycleState", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("name", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("createTime", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_business_unit", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_cost_centre", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_env", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_environment", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_name", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_owner", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_project_code", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_squad", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_support_contact", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("parent_type", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("parent_id", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("parent_type", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_product", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_exemption_id1", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_exemption_id2", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("labels_support_group", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("insert_datetime", bigquery.enums.SqlTypeNames.DATETIME)
    ],
    #write_disposition="WRITE_TRUNCATE",
 )

job_project = client.load_table_from_dataframe(
    project_df, project_table_id, job_config=job_config_project
)  # Make an API request.
job_project.result()  # Wait for the job to complete.

project_table = client.get_table(project_table_id)  # Make an API request.
print(
    "Loaded {} rows and {} columns to {}".format(
        project_table.num_rows, len(project_table.schema), project_table_id
    )
)




Loaded 241 rows and 21 columns to gcp-wow-rwds-ai-bi-prod.airflow_logs.gcp_project_list


In [4]:

bucket_table_id = "gcp-wow-rwds-ai-bi-prod.airflow_logs.gcs_bucket_details"
job_config_bucket = bigquery.LoadJobConfig(
    #schema_update_options=['ALLOW_FIELD_ADDITION'],
    autodetect = True,
    schema=[
        bigquery.SchemaField("bucket_id", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("bucket_name", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("bucket_location", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("bucket_storage_class", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("bucket_owner", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("bucket_user_project", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("bucket_project_number", bigquery.enums.SqlTypeNames.INT64),
        bigquery.SchemaField("bucket_label_goog_composer_environment", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("bucket_label_goog_composer_version", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("bucket_label_goog_composer_location", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("bucket_label_name", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("insert_datetime", bigquery.enums.SqlTypeNames.DATETIME)
    ],
    #write_disposition="WRITE_TRUNCATE",
 )

job_bucket = client.load_table_from_dataframe(
    bucket_df, bucket_table_id, job_config=job_config_bucket
)  # Make an API request.
job_bucket.result()  # Wait for the job to complete.

bucket_table = client.get_table(bucket_table_id)  # Make an API request.
print(
    "Loaded {} rows and {} columns to {}".format(
        bucket_table.num_rows, len(bucket_table.schema), bucket_table_id
    )
)



Loaded 63 rows and 13 columns to gcp-wow-rwds-ai-bi-prod.airflow_logs.gcs_bucket_details


In [5]:
item_table_id = "gcp-wow-rwds-ai-bi-prod.airflow_logs.gcs_object_details"

job_config_item = bigquery.LoadJobConfig(
     autodetect = True,
#     schema=[
#         bigquery.SchemaField("bucket_id", bigquery.enums.SqlTypeNames.STRING),
#         bigquery.SchemaField("bucket_name", bigquery.enums.SqlTypeNames.STRING),
#         bigquery.SchemaField("bucket_location", bigquery.enums.SqlTypeNames.STRING),
#         bigquery.SchemaField("bucket_storage_class", bigquery.enums.SqlTypeNames.STRING),
#         bigquery.SchemaField("bucket_owner", bigquery.enums.SqlTypeNames.STRING),
#         bigquery.SchemaField("bucket_user_project", bigquery.enums.SqlTypeNames.STRING),
#         bigquery.SchemaField("bucket_project_number", bigquery.enums.SqlTypeNames.INT64),
#         bigquery.SchemaField("bucket_label_goog_composer_environment", bigquery.enums.SqlTypeNames.STRING),
#         bigquery.SchemaField("bucket_label_goog_composer_version", bigquery.enums.SqlTypeNames.STRING),
#         bigquery.SchemaField("bucket_label_goog_composer_location", bigquery.enums.SqlTypeNames.STRING),
#         bigquery.SchemaField("bucket_label_name", bigquery.enums.SqlTypeNames.STRING),
#         bigquery.SchemaField("insert_datetime", bigquery.enums.SqlTypeNames.DATETIME)
#     ],
    #write_disposition="WRITE_TRUNCATE",
 #   schema_update_options = 'ALLOW_FIELD_ADDITION'
 )




job_item = client.load_table_from_dataframe(
    item_df, item_table_id, job_config=job_config_item
)  # Make an API request.
job_item.result()  # Wait for the job to complete.

item_table = client.get_table(item_table_id)  # Make an API request.
print(
    "Loaded {} rows and {} columns to {}".format(
        item_table.num_rows, len(item_table.schema), item_table_id
    )
)



Loaded 4136977 rows and 9 columns to gcp-wow-rwds-ai-bi-prod.airflow_logs.gcs_object_details


In [6]:
et = time.time()
elapsed_time = et - st
print('Execution time:', elapsed_time, 'seconds')

Execution time: 1075.484159231186 seconds


In [7]:
import concurrent.futures
import urllib.request
list_url = []
URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        list_url.append(url)
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

'http://www.foxnews.com/' page is 285993 bytes
'http://www.bbc.co.uk/' page is 318933 bytes
'http://europe.wsj.com/' generated an exception: HTTP Error 403: Forbidden
'http://www.cnn.com/' page is 1144964 bytes
'http://some-made-up-domain.com/' generated an exception: HTTP Error 403: Forbidden


In [8]:
# import time
# st = time.time()
# import concurrent.futures
# import urllib.request
# #get details for a single bucket
# def f_item_detail(item):
#     return (pd.json_normalize(dict({'bucket_id':bucket.id,
#                     'item_name':item.name,
#                     'item_size':item.size,
#                     'item_content_type':item.content_type,
#                     'item_storage_class':item.storage_class,
#                     'item_owner':item.owner,
#                     'item_time_created':item.time_created,
#                     'item_updated':item.updated                    
#                    }),sep='_'))


# bucket_list = []
# item_list = []
# i=0
# data =[]
# storage_client = storage.Client(project='gcp-wow-rwds-ai-bi-prod')
# buckets = storage_client.list_buckets()
# # for bucket in buckets:
# #     bucket_list.append(pd.json_normalize(dict({'bucket_id':bucket.id,
# #                         'bucket_name':bucket.name,
# #                         'bucket_location':bucket.location,
# #                         'bucket_location_type':bucket.location_type,
# #                         'bucket_storage_class':bucket.storage_class,
# #                         'bucket_owner':bucket.owner,
# #                         'bucket_user_project':bucket.user_project,
# #                         'bucket_project_number':bucket.project_number,
# #                         'bucket_label':bucket.labels                     
# #                        }),sep='_'))
#     # We can use a with statement to ensure threads are cleaned up promptly
# for bucket in buckets:
#     blobs = bucket.list_blobs()
#     with concurrent.futures.ThreadPoolExecutor(max_workers=100) as executor:
#         # Start the load operations and mark each future with its URL
#         future_to_item = {executor.submit(f_item_detail, item): item for item in blobs}
#         for future in concurrent.futures.as_completed(future_to_item):
#             try:
#                 #item_list.append(future.result())
#                 data = future.result()
#             except Exception as exc:
#                 print('%r generated an exception: %s' % (item, exc))



        
# # # bucket_df = pd.concat(bucket_list)
# # item_df = pd.concat(item_list)
# # # bucket_df.rename(columns={"bucket_label_goog-composer-location": "bucket_label_goog_composer_location",
# # #                          "bucket_label_goog-composer-environment":"bucket_label_goog_composer_environment",
# # #                           "bucket_label_goog-composer-version":"bucket_label_goog_composer_version"
# # #                          } ,inplace=True)

# # # bucket_df['insert_datetime'] = pd.Timestamp.today()
# # item_df['insert_datetime'] = pd.Timestamp.today()
# # # bucket_df = bucket_df.reset_index(drop=True)
# # item_df = item_df.reset_index(drop=True)
# print(data)
# et = time.time()
# elapsed_time = et - st
# print('Execution time:', elapsed_time, 'seconds')
