In [1]:
## Import relevant libraries

## Pandas for pandas manipulation of dataframes when loading the json to pandas GBQ
import pandas as pd
## Related to pandas in the numerilcal
import numpy as np
## To be able to manipulate by name structure, this is related to GCS blobs
import glob
# import time
# import datetime
## Directory manipulation
import os
## GCS linking
from google.cloud import storage
## GBQ linking
from google.cloud import bigquery
## string structure comprehension
import re
## We have to parse the json somehow
import json
## Import logging so we can see what's going on
import logging

In [16]:
# Install GCS command library

# %pip install google-cloud-storage

In [5]:
# define GCS bucket
bucket_name = "de-tech_assessment_data"
# Define project name
project = 'mi-proyecto-prueba-375018'
# Define dataset in GBQ where the table will be written to
dataset = 'json_parse'

# Define the date of interest to extract json files for
date_interest = '2019-06-01'

# Start the GCS client and obtain bucket
storage_client = storage.Client()
bucket = storage_client.get_bucket(bucket_name)

## Download service account json key for GBQ scopes and authorization
blobs = bucket.list_blobs()
sa_key = [blob.name for blob in blobs if 'mi-proyecto-prueba' in blob.name ]
for f in sa_key:
    # Download the key to the local directory
    blob = bucket.get_blob(f)
    ## The join is in case we place the key a folder down we can add a directory path, the getcwd() lets us estabish the filepath for the service account key
    filepath=os.path.join(os.getcwd(),f.replace('cual zip/',''))
    with open(filepath,'wb') as w:
          blob.download_to_file(w)
    w.close()
    print(f)

## We import the service_account library to authenticate ourselves
from google.oauth2 import service_account
## Create credentials from the service account key we downloaded
credentials = service_account.Credentials.from_service_account_file(filepath)
## We specify scopes to avoid a 404 error of Dataset not found
credentials = credentials.with_scopes(
    [
        'https://www.googleapis.com/auth/drive',
        'https://www.googleapis.com/auth/cloud-platform',
        "https://www.googleapis.com/auth/bigquery",
    ],
)    
    

# Start the GBQ client
#client = bigquery.Client()
# Specify project and scopes to avoid 404 error
client = bigquery.Client(project = project, credentials = credentials)

# Dataset name to create it if necessary
dataset_name = client.dataset(dataset)
    # Json loads require the dataset to exist beforehand
    # Therefore we verify the dataset exists and if it doesn't we create it
try:
    client.get_dataset(dataset_name)
    print(f"Dataset exists: {dataset_name}")
except Exception:
    logging.warn(f"Dataset didn't exist, creating dataset: {dataset}")
    client.create_dataset(dataset_name)

# Blob list (GCS object list)
blobs = bucket.list_blobs()
# filter objects for json files
filter_dir = ".json"
file_name = [blob.name for blob in blobs if '.json' in blob.name ]
# Filter files for the ones in the date of interest
file_name = [f for f in file_name if date_interest in f]

#print(file_name)

# For loop for every json file in the bucket which matches the date filter
for f in file_name:
    blob = bucket.get_blob(f)
    #with open(filepath,'wb') as w:
     #   blob.download_to_file(w)
    #w.close()
    # We have a variable defined with the gsutil uri for each file in the GCS bucket
    gsutil_uri = f"gs://de-tech_assessment_data/{f}"
    #print(f)
    #print(f.replace('DE-Tech_Assessment_data/data/','').replace('.json',''))
    print(gsutil_uri)
    # Define a table for each raw file
    # The reason we define it in this fashion instead of combining jsonfiles by date is because it's easier for the analysts and scientists
    # With regex comprehension on GBQ table names, or by accessing the table schemas, both easier to do on the user side in python
    # They can access the tables and concatanate the rows for the ones for the dates of interest and the schemas of interest
    table_id = f"{project}.{dataset}.{f.replace('DE-Tech_Assessment_data/data/','').replace('.json','')}_raw"
    # Create the load job for GBQ
    # We autodetect schema since all have the same schema
    job_config = bigquery.LoadJobConfig()
    job_config.autodetect = True
    job_config.create_disposition = 'CREATE_IF_NEEDED',
    job_config.source_format = 'NEWLINE_DELIMITED_JSON',
    job_config.write_disposition = 'WRITE_TRUNCATE',
    # create a bigquery load job
    try:
        load_job = client.load_table_from_uri(
            gsutil_uri,
            table_id,
            job_config=job_config
        )
        print(f"GBQ load job for {load_job}: Table {table_id}")
    except Exception as e:
        logging.error(f"Error: {e}")

mi-proyecto-prueba-375018-2ad9319721ec.json
Dataset exists: DatasetReference('mi-proyecto-prueba-375018', 'json_parse')
gs://de-tech_assessment_data/DE-Tech_Assessment_data/data/2019-06-01-15-17-10-events.json
GBQ load job for LoadJob<project=mi-proyecto-prueba-375018, location=US, id=f2c9ad67-f31d-4234-9053-9170a74006c0>: Table mi-proyecto-prueba-375018.json_parse.2019-06-01-15-17-10-events_raw
gs://de-tech_assessment_data/DE-Tech_Assessment_data/data/2019-06-01-15-17-11-events.json
GBQ load job for LoadJob<project=mi-proyecto-prueba-375018, location=US, id=a972ba0b-7630-4564-9712-309fa60b6490>: Table mi-proyecto-prueba-375018.json_parse.2019-06-01-15-17-11-events_raw
gs://de-tech_assessment_data/DE-Tech_Assessment_data/data/2019-06-01-15-17-12-events.json
GBQ load job for LoadJob<project=mi-proyecto-prueba-375018, location=US, id=108eec9c-6fb0-486a-aa08-a3b85499d512>: Table mi-proyecto-prueba-375018.json_parse.2019-06-01-15-17-12-events_raw
gs://de-tech_assessment_data/DE-Tech_Assess

KeyboardInterrupt: 