## DEV Project


### Structure

ETL 1:
- Download all files from Google Drive
- From those files, keep only .csv files
- Download table from Bigquery with already uploaded files
- Compare files that have been already uploaded to Bigquery by taking its name
- Keep new files
- Upload new files to CS
- Upload tracker_registry table with files uploaded

ETL 2: Same as batch-telecom-data
- Once a file enters on Cloud Storage it triggers the CloudFunction:
    - It takes the csv file and applies the pandas transformation!!!
    - It updates the tracker/registry files table with the file_name and the rows of it
    - It uploads the data of the file to BQ table 

ETL 3: Checking function between Storage and BQ

BQ table structures:
    - BQ main table: columns needed / path_file / timestamp
    - BQ tracker table: file_path / num_lines / timestamp

### Code

COMMENTS:
- It is necessary to give access to the service_account to the drive_folder where the data is stored (now in mgaleramunoz drive)
- Change project name: e3escoles-dev => e3escoles


In [None]:
# TODO:
#    - change cs_client
#    - change bq_client
#    - change destination_file for building drive client
#    - change local_file_path for saving files in /tmp/


# Perhaps we need to erase files when downloaded to save memory

In [1]:
import os
import json
import datetime
import io
import csv
from google.oauth2.service_account import Credentials
from googleapiclient.discovery import build
from googleapiclient.errors import HttpError
from googleapiclient.http import MediaIoBaseDownload
from google.cloud import storage, bigquery
import pandas as pd


from utils import keep_csv_files, get_drive_files, find_index_of_substring, blob_to_filename, download_file_from_drive, count_csv_length
from utils_gcloud import load_json_from_cs, _storage_connection, download_storage_file, _bigquery_connection, insert_data_to_bq, list_blobs, create_folder, upload_storage_file, insert_row_to_bq

# Local testing var:
json_key_path = 'config/e3escoles-aed88f04ce6d.json'

In [2]:
# Define general env vars:
BUCKET_NAME = os.environ.get('BUCKET_NAME', 'ordino')
CONFIG_FILE_NAME = os.environ.get('CONFIG_FILE_NAME', 'config/config.json')

# Build Storage Client:
cs_client = _storage_connection(json_key_path) # cs_client = storage.Client()
bq_client = _bigquery_connection(json_key_path) # bq_client = bigquery.Client()

# Load config vars:
config = load_json_from_cs(cs_client, BUCKET_NAME, CONFIG_FILE_NAME)

project_id = config['project_id']
service_account_file_name = config['service_account_file_name']
bq_dataset = config['bq_dataset']
bq_tracker_table = config['bq_tracker_table']
schema_tracker_path = config['schema_file_names']['schema_tracker_file']
data_blob = config['storage_blobs']['data_blob']
tracker_table_headers = config['tracker_table_headers']

# Create vars: 
destination_file = './config/key_file.json' # destination_file = '/tmp/key_file.json'
tracker_table_path = project_id + '.' + bq_dataset + '.' + bq_tracker_table


# Build Google Drive Client:
file = open(destination_file, 'wt', encoding='utf-8')
service_account_json = download_storage_file(cs_client, BUCKET_NAME, service_account_file_name, destination_file)
creds = Credentials.from_service_account_file(destination_file)
drive_client = build('drive', 'v3', credentials=creds)

# Get Drive files:
files = get_drive_files(drive_client)
csv_files = keep_csv_files(files)

In [3]:
# Query `tracker_registry` table in order to obtain new files:
query = "select file_name, num_lines from  " + tracker_table_path
query_job = bq_client.query(query)
rows = query_job.result()
bq_tracker_results = {}
bq_tracker_files = []
for row in rows:
    bq_tracker_results[row.file_name] = [row.num_lines]
    bq_tracker_files.append(row.file_name)

# Compare both lists and filter already uploaded files
for file in bq_tracker_files:
    for index, csv in enumerate(csv_files):
        if file == csv['name']:
            csv_files.pop(index)

# Determine year we are at
current_year = datetime.datetime.now().year
previous_year = current_year - 1
current_year_str = '_' + str(current_year) + '-'
previous_year_str = '_' + str(previous_year) + '-'

In [4]:
# Uploads all the files into the data/.../ folder in Cloud Storage
for index, file in enumerate(csv_files):
    # Clculate index of our substring:
    index_string = find_index_of_substring(file['name'], current_year_str)
    if index_string == -1:
        index_string = find_index_of_substring(file['name'], previous_year_str)
    
    # Extract date of the file
    date_file = file['name'][index_string+1:index_string+11]

    # Download file from Drive:
    local_file_path = './data_samples/' + file['name'] # local_file_path = '/tmp/' + file['name']
    download_file_from_drive(drive_client, file['id'], local_file_path)
    
    # Count rows of csv file:
    len_file = count_csv_length(local_file_path)
    current_tmstmp = datetime.datetime.now().strftime('%Y-%m-%d %H:%M:%S')

    #if date_file in tmstmp_folders:
    storage_blob_path = data_blob + date_file + '/' + file['name']
    response = upload_storage_file(cs_client, BUCKET_NAME, storage_blob_path, local_file_path)

    if response == "Upload done succesfully":
        tracker_data = [file['name'], len_file, current_tmstmp]
        row_data = dict(zip(tracker_table_headers, tracker_data))
        insert_row_to_bq(client=bq_client, table_id= tracker_table_path, row=row_data)


### Feat: Add values to `tracker_registry` BQ table

In [None]:
#### Add example rows to bq tracker_registry table:
current_time = datetime.datetime.now()
tmstmp1 = current_time.strftime('%Y-%m-%d %H:%M:%S')
tmstmp2 =  (current_time - datetime.timedelta(days = 1)).strftime('%Y-%m-%d %H:%M:%S')
tmstmp3 = (current_time + datetime.timedelta(days = 1)).strftime('%Y-%m-%d %H:%M:%S')

dict_data = {
    'file_name': ['03AAB_1rE_2022-11-07T14_47_08+0100.csv', '03A52_biblioteca_2022-11-07T14_41_42+0100.csv', '03A49_seminari3_2022-11-07T14_53_21+0100.csv'],
    'num_lines': [43, 1003, 99988],
    'tmstmp': [tmstmp1, tmstmp2, tmstmp3]
}
dataframe_test = pd.DataFrame(dict_data)
dataframe_test

#### Insert the test data to BQ tracker_registry table:
bq_client = _bigquery_connection(json_key_path) # bq_client = bigquery.Client()
schema_tracker = load_json_from_cs(cs_client, BUCKET_NAME, schema_tracker_path)
schema = [bigquery.SchemaField(name=s['name'], field_type=s['type'], mode=s['mode']) for s in schema_tracker]

num_rows_added = insert_data_to_bq(bq_client, schema, dataframe_test, bq_dataset, bq_tracker_table)