<a href="https://colab.research.google.com/github/luisosmx/Python_exercises/blob/main/class_ExtractLoad.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!pip install google-cloud-storage
!pip install pandas google-cloud-storage

In [None]:
from google.cloud import storage
from google.cloud import bigquery
import os
import pandas as pd


class ExtractLoad:

  def __init__(self, bucket_name, object_name, destination, path_json, all_column_names, filter_column, path_json_load, table_id, schema):
    self.bucket_name = bucket_name
    self.object_name = object_name
    self.destination = destination
    self.path_json = path_json
    self.all_column_names = all_column_names
    self.filter_column = filter_column
    self.path_json_load = path_json_load
    self.table_id = table_id
    self.schema = schema

  def download_file(self):
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.path_json
    storage_client = storage.Client()
    bucket = storage_client.bucket(self.bucket_name)
    blob = bucket.blob(self.object_name)
    #Descarga el contenido del objeto como bytes
    blob.download_to_filename(self.destination)
    print("Archivo descargado como 'file.csv")

  def clean_file(self):
    df = pd.read_csv(destination, encoding='latin-1')
    column_names = df.columns.tolist()
    filtered_column_names = [col for col in self.all_column_names if col in self.filter_column]
    self.filtered_df = df.loc[:, filtered_column_names]
    print('DataFrame is clean')
    return self.filtered_df

  def load_file(self):
    os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = path_json_load
    client = bigquery.Client()
    job_config = bigquery.LoadJobConfig(schema = schema, write_disposition="WRITE_TRUNCATE")
    job = client.load_table_from_dataframe(self.filtered_df, table_id, job_config=job_config)
    job.result()
    table = client.get_table(table_id)
    print("Loaded {} rows and {} columns to {}".format(table.num_rows, len(table.schema), table_id))




In [None]:
bucket_name = 'data_test_storage'
object_name = '/data_test_storage/afluenciastc_simple_01_2023.csv'
destination = '/content/afluentastic.csv'
path_json = '/content/data-project-395314-5a78f72435a4.json'
all_column_names = ['fecha', 'anio', 'mes', 'linea', 'estacion', 'afluencia']
filter_column = ['date', 'year', 'month', 'line', 'station', 'influx']
path_json_load = '/content/path.json'
table_id = 'data-project-395314.data_project.influx_cdmx'
schema = schema=[
                bigquery.SchemaField("date", "STRING"),
                bigquery.SchemaField("year", "STRING"),
                bigquery.SchemaField("month", "STRING"),
                bigquery.SchemaField("line", "STRING"),
                bigquery.SchemaField("station", "STRING"),
                bigquery.SchemaField("influx", "STRING")
                ]

extract_load = ExtractLoad(bucket_name,
                           object_name,
                           destination,
                           path_json,
                           all_column_names,
                           filter_column,
                           path_json_load,
                           table_id,
                           schema)

extract_load.download_file()
extract_load.clean_file()
extract_load.load_file()

In [None]:
from google.cloud import storage, bigquery
import os
import csv
import json
import pandas as pd

class ExtractLoadStorageDF:
    def __init__(self, bucket_name, object_name, destination, path_json, all_column_names, filter_column, table_id, schema, dataset_id):
        self.bucket_name = bucket_name
        self.object_name = object_name
        self.destination = destination
        self.path_json = path_json
        self.all_column_names = all_column_names
        self.filter_column = filter_column
        self.table_id = table_id
        self.schema = schema
        self.dataset_id = dataset_id

    def download_file(self):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.path_json
        storage_client = storage.Client()
        bucket = storage_client.bucket(self.bucket_name)
        blob = bucket.blob(self.object_name)
        blob.download_to_filename(self.destination)
        print("Archivo descargado como 'file.csv'")

    def clean_file(self):
        df = pd.read_csv(self.destination, encoding='latin-1')
        column_names = df.columns.tolist()
        filtered_column_names = [col for col in self.all_column_names if col in self.filter_column]
        self.filtered_df = df.loc[:, filtered_column_names]
        print('DataFrame is clean')
        return self.filtered_df

    def load_file(self):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.path_json
        client = bigquery.Client()
        job_config = bigquery.LoadJobConfig(schema=self.schema, write_disposition="WRITE_TRUNCATE")
        job = client.load_table_from_dataframe(self.filtered_df, self.table_id, job_config=job_config)
        job.result()
        table = client.get_table(self.table_id)
        print("Loaded {} rows and {} columns to {}".format(table.num_rows, len(table.schema), self.table_id))


class ExtractLoadStorageJson(ExtractLoadStorageDF):
    def __init__(self, bucket_name, object_name, destination, path_json, all_column_names, filter_column, table_id, schema, dataset_id, keys_json):
        super().__init__(bucket_name, object_name, destination, path_json, all_column_names, filter_column, table_id, schema, dataset_id)
        self.keys_json = keys_json
    def clean_file(self):
        csv_data = []
        with open(destination, 'r', encoding='ISO-8859-1') as csv_file:
            csv_reader = csv.DictReader(csv_file)
            for row in csv_reader:
                csv_data.append(row)


        for item in csv_data:
            for old_key, new_key in self.keys_json.items():
                if old_key in item:
                    item[new_key] = item.pop(old_key)

        self.modified_json = json.dumps(csv_data, indent=4)

    def load_file(self):
        os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = self.path_json
        data = json.loads(self.modified_json)
        client = bigquery.Client()
        table_ref = client.dataset(self.dataset_id).table(self.table_id)
        job_config = bigquery.LoadJobConfig()
        job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND  # Puedes ajustar esto según tus necesidades
        job_config.source_format = bigquery.SourceFormat.NEWLINE_DELIMITED_JSON
        load_job = client.load_table_from_json(data, table_ref, job_config=job_config)
        load_job.result()
        print("Carga completada en la tabla: {}.{}".format(dataset_id, table_id))




In [None]:
# Main code
bucket_name = 'data_test_storage'
object_name = 'afluenciastc_simple_01_2023.csv'
destination = '/content/afluentastic.csv'
path_json = '/content/data-project-395314-5a78f72435a4.json'
all_column_names = ['fecha', 'anio', 'mes', 'linea', 'estacion', 'afluencia']
filter_column = ['date', 'year', 'month', 'line', 'station', 'influx']
table_id = 'influx_cdmx'
dataset_id = 'data_project'
keys_json = {'date': 'date',
             'year': 'year',
             'month': 'month',
             'line': 'line',
             'station': 'station',
             'influx': 'influx'}
schema = schema=[
                bigquery.SchemaField("date", "STRING"),
                bigquery.SchemaField("year", "STRING"),
                bigquery.SchemaField("month", "STRING"),
                bigquery.SchemaField("line", "STRING"),
                bigquery.SchemaField("station", "STRING"),
                bigquery.SchemaField("influx", "STRING")
                ]


extract_load_instance = ExtractLoadStorageJson(bucket_name,
                                               object_name,
                                               destination,
                                               path_json,
                                               all_column_names,
                                               filter_column,
                                               table_id,
                                               schema,
                                               dataset_id,
                                               keys_json
                                               )
extract_load_instance.download_file()
extract_load_instance.clean_file()
extract_load_instance.load_file()

Archivo descargado como 'file.csv'
Carga completada en la tabla: data_project.influx_cdmx


La clase ExtractLoadStorageDF:

    Se inicializa con varios parámetros, incluyendo nombres de bucket y objeto en Cloud Storage, destino para la descarga del archivo, ruta al archivo JSON de credenciales, nombres de columnas, nombre de columna de filtro, ID de tabla, esquema, e ID de conjunto de datos en BigQuery.
    El método download_file utiliza las credenciales proporcionadas para descargar un archivo desde el bucket especificado en Cloud Storage.
    El método clean_file lee el archivo CSV descargado, filtra las columnas según lo definido en filter_column, y devuelve un DataFrame limpio.
    El método load_file carga el DataFrame limpio en BigQuery con opciones para manejar cómo se debe cargar la información y cómo se debe comportar si la tabla ya existe.

La clase ExtractLoadStorageJson (que hereda de ExtractLoadStorageDF):

    Agrega un parámetro adicional, keys_json, que mapea nombres de claves en el JSON a ser cargado con los nombres de claves deseados en el resultado final.
    El método clean_file en esta clase lee un archivo CSV, realiza la transformación de las claves según el mapeo en keys_json, y genera un JSON modificado.
    El método load_file carga el JSON modificado en BigQuery, permitiendo anexar los datos a una tabla existente si es necesario.