In [1]:
import warnings
warnings.simplefilter(action='ignore', category=FutureWarning)

In [2]:
from google.cloud import bigquery
import pandas as pd
import requests as rq
from datetime import datetime, timedelta
import time


# URL de API de bencinas
URL = 'https://api.cne.cl/v3/combustibles/vehicular/estaciones?token=LI6qhEbKop'

# Solicitud a la API
res = rq.get(URL)
data = res.json()['data']

# Creación del dataframe a partir del json recibido
df = pd.json_normalize(data)



In [None]:
def get_data_from_api():  
    for i in range(5):
        error = True
        try:
            res = rq.get(URL)
        except rq.exceptions.RequestException:  # This is the correct syntax
            print('Error al intentar conectarse a la API, intentando nuevamente...')
            time.sleep(3)
        else:
            error = False
            break
    if error:
        raise Exception('No se puede conectar a la API')
    return res.json()['data']


In [None]:
dt = get_data_from_api()
print(dt)

In [5]:
TABLA_BENCINERAS = 'acquired-winter-316123.testing.bencinera'
TABLA_FECHA = 'acquired-winter-316123.testing.fecha'
TABLA_PRECIO_COMBUSTIBLE = 'acquired-winter-316123.testing.precio_combustible'
TABLA_UBICACION = 'acquired-winter-316123.testing.ubicacion'

In [None]:
df.shape

In [None]:
pd.set_option('display.max_columns', None)
pd.set_option('display.max_rows', None)

# LIMPIEZA DE DATOS

In [3]:
# Reemplazando caracteres en los nombres de las columnas para
# que sean aceptados por bigquery
df.columns = df.columns.astype(str).str.replace('.', '_')
df.columns = df.columns.astype(str).str.replace(' ', '_')

# Formato uniforme para los datos en columnas 'razon_social' y 'direccion_calle'
df['razon_social'] = df['razon_social'].str.capitalize()
df['direccion_calle'] = df['direccion_calle'].str.capitalize()

# Eliminando palabras redundantes de los nombres de las columnas
for i in ['precios_', 'metodos_de_', 'servicios_', 'ubicacion_']:
    df.columns = df.columns.astype(str).str.replace(i, '')

# Cambiando el tipo de dato a INTEGER
for i in ['gasolina_93', 'gasolina_95', 'gasolina_97', 'petroleo_diesel', 'glp_vehicular', 'gnc']:
    df[i] = df[i].apply(pd.to_numeric).round(0).astype('Int64')

# Renombrando algunas columnas
df.rename(columns={'fecha_hora_actualizacion': 'fecha_actualizacion', 
                    'id': 'id_bencinera'}, inplace=True)

# En vez de considerar la última actualización como fecha, su usará la
# fecha actual para registrar cuando se consultaron los datos, 
# y se asume que ese es el valor actual de la bencina
fecha = datetime.now().strftime('%Y-%m-%d')
df['fecha_actualizacion'] = fecha
df['fecha_actualizacion'] = pd.to_datetime(df['fecha_actualizacion'])

# Eliminando columnas que no se usarán
df.drop(['id_comuna', 'id_region', 'distribuidor_logo', 'distribuidor_logo_horizontal_svg'], axis=1, inplace=True)

df['id_ubicacion'] = df.index + 1


In [4]:
def insert(client, dataframe, table_id):
    # Insertando los datos de hoy
    job_config = bigquery.LoadJobConfig(
    )

    job = client.load_table_from_dataframe(
        df, table_id, job_config=job_config
    )

    job.result()  # Wait for the job to complete.

    table = client.get_table(table_id)  # Make an API request.
    print(
        "Se cargaron {} filas y {} columnas a la tabla {}".format(
            table.num_rows, len(table.schema), table_id
        )
    )

In [6]:
def check_alredy_executed(client):
    fecha = datetime.now()
    fecha = fecha.strftime('%Y-%m-%d')
    tabla_fecha = 'acquired-winter-316123.datawarehouse.fecha'
    query = f"SELECT id_fecha from `{tabla_fecha}` WHERE fecha = '{fecha}'"
    res = client.query(query)
    n_rows = res.result().total_rows
    if n_rows > 0:
        print('Hay fecha de hoy en tabla fecha')
        last_id_fecha = list(res)[0][0]
        tabla_comb = 'acquired-winter-316123.datawarehouse.precio_combustible'
        q = f"SELECT id_precio_combustible from `{tabla_comb}` WHERE id_fecha = {last_id_fecha}"
        res_comb = client.query(q)
        n_rows_comb = res_comb.result().total_rows
        if n_rows_comb > 0:
            print('Hay registros de hoy en tabla precios')
            print(f'Eliminando registros con id_fecha {last_id_fecha}')
            dlt_stm = f"DELETE from `{tabla_comb}` WHERE id_fecha = {last_id_fecha}"
            query_resp = client.query(dlt_stm)
    

In [None]:
check_alredy_executed(client)

In [7]:
client = bigquery.Client.from_service_account_json("acquired-winter-316123-d6459421edc5.json")
# check_alredy_executed(client, TABLA)

# insert(client, df, TABLA)

## TABLA FECHAS

Función que consulta si hay fechas guardadas. En caso de no haber, guarda la fecha de hoy y parte el id correlativo en 1. En caso de haber filas, busca el último registro y verifica si es igual a la fecha actual (ya se ejecutó el trabajo hoy). De ser igual, devuelve ese mismo id; pero de ser diferente, guarda un nuevo registro con el id siguiente en la correlación

In [8]:
def insert_fecha():
    id_tabla_fecha = 'acquired-winter-316123.datawarehouse.fecha'
    q = f"SELECT * FROM `{id_tabla_fecha}`"
    fecha = datetime.now().strftime('%Y-%m-%d')
    res = client.query(q)
    n_rows = res.result().total_rows
    res = res.to_dataframe()
    if n_rows > 0:
        if res.iloc[-1].fecha.strftime('%Y-%m-%d') == fecha:
            return res.iloc[-1].id_fecha
        else:
            next_id_fecha = int(res['id_fecha'].max() + 1)
            to_insert = {'id_fecha': next_id_fecha, 'fecha': fecha}
            client.insert_rows_json(id_tabla_fecha, [to_insert])
            return next_id_fecha
    to_insert = {'id_fecha': 1, 'fecha': fecha}
    client.insert_rows_json(id_tabla_fecha, [to_insert])
    return 1

# DIVIDE DF

In [None]:
df.T

## TABLA DE HECHO

In [None]:
df['id_ubicacion'] = df.index + 1

In [9]:
comb_tipos = ['gasolina_93', 'gasolina_95', 'gasolina_97', 'petroleo_diesel', 'glp_vehicular', 'gnc']

In [None]:
df_2 = df.copy()

fut_df = []
for index, row in df_2.iterrows():
    for tipo in comb_tipos:
        new_row = [row.id_bencinera, row.id_ubicacion, tipo, row[tipo]]
        fut_df.append(new_row)
    

In [None]:
hecho_df = pd.DataFrame(data=fut_df, columns=['id_bencinera', 'id_ubicacion', 'tipo_combustible', 'precio'])
hecho_df.head(20)

In [16]:
def set_id(df):
    # q = "SELECT * FROM `acquired-winter-316123.datawarehouse.precio_combustible`"
    # res = client.query(q)
    # n_rows = res.result().total_rows
    n_rows = 0
    if n_rows > 0:
        last_id_q = "SELECT id_precio_combustible FROM `datawarehouse.precio_combustible` ORDER BY id_precio_combustible DESC LIMIT 1"
        res = client.query(last_id_q)
        last_id = list(res)[0][0] + 1
        df['id_precio_combustible'] = df.index + last_id
    else:
        df['id_precio_combustible'] = df.index + 1
    return df

In [40]:
def main_table(df):
    comb_tipos = ['gasolina_93', 'gasolina_95', 'gasolina_97', 'petroleo_diesel', 'glp_vehicular', 'gnc']
    df_2 = df.copy()
    fut_df = []
    for index, row in df_2.iterrows():
        print(row)
        for tipo in comb_tipos:
            precio = row[tipo]
            if precio is pd.NA:
                continue
            new_row = [row.id_bencinera, row.id_ubicacion, tipo, precio]
            fut_df.append(new_row)
        break
    df_hecho = pd.DataFrame(data=fut_df, columns=['id_bencinera', 'id_ubicacion', 'tipo_combustible', 'precio'])
    df_hecho['id_fecha'] = 2 #insert_fecha()
    df_hecho = set_id(df_hecho)
    return df_hecho
    

In [41]:
df_hecho = main_table(df)
df_hecho.head()

id_bencinera                                               co110101
fecha_actualizacion                             2022-12-06 00:00:00
razon_social                        Iracabal otth henri edward jean
direccion_calle                                               Vivar
direccion_numero                                                402
nombre_comuna                                               Iquique
nombre_region                                              Tarapacá
horario_atencion                                           24 horas
distribuidor_nombre                                           Copec
distribuidor_logo_svg            http://api.cne.cl/brands/copec.svg
gasolina_93                                                    1344
gasolina_97                                                    1420
petroleo_diesel                                                1224
gasolina_95                                                    1383
pago_efectivo                                   

Unnamed: 0,id_bencinera,id_ubicacion,tipo_combustible,precio,id_fecha,id_precio_combustible
0,co110101,1,gasolina_93,1344,2,1
1,co110101,1,gasolina_95,1383,2,2
2,co110101,1,gasolina_97,1420,2,3
3,co110101,1,petroleo_diesel,1224,2,4


In [None]:
df_hecho.shape

### Insertando tabla de hecho

In [None]:
id_fecha = insert_fecha()
id_fecha

In [None]:
hecho_df['id_fecha'] = id_fecha
hecho_df

In [None]:
hecho_df = set_index(hecho_df)
hecho_df.head()

In [None]:
job_config = bigquery.LoadJobConfig(
)

job = client.load_table_from_dataframe(
    df_hecho, 'acquired-winter-316123.datawarehouse.precio_combustible', job_config=job_config
)

job.result() 

## TABLA DE BENCINERAS

In [None]:
df_3 = df.copy()

df_bencin = df_3[['id_bencinera', 'razon_social', 'distribuidor_nombre',
                  'distribuidor_logo_svg', 'tienda', 'farmacia', 'mantencion',
                  'autoservicio', 'pago_efectivo', 'pago_cheque', 
                   'pago_tarjetas_bancarias', 'pago_tarjetas_grandes_tiendas']]
df_bencin

In [None]:
job_config = bigquery.LoadJobConfig()

job = client.load_table_from_dataframe(
    df_bencin, 'acquired-winter-316123.datawarehouse.bencinera', job_config=job_config
)

job.result()


In [None]:
def is_empty(id_tabla):
    q = f"SELECT * FROM `{id_tabla}`"
    res = client.query(q)
    n_rows = res.result().total_rows
    if n_rows > 0:
        False
    else:
        return True

In [None]:
def bencinas(df):
    tabla_bencina = 'acquired-winter-316123.datawarehouse.bencinera'
    if is_empty(tabla_bencina):
        df_3 = df.copy()

        df_bencin = df_3[['id_bencinera', 'razon_social', 'distribuidor_nombre',
                          'distribuidor_logo_svg', 'tienda', 'farmacia', 'mantencion',
                          'autoservicio', 'pago_efectivo', 'pago_cheque', 
                           'pago_tarjetas_bancarias', 'pago_tarjetas_grandes_tiendas']]
        job_config = bigquery.LoadJobConfig()

        job = client.load_table_from_dataframe(
            df_bencin, tabla_bencina, job_config=job_config
        )

        job.result()


In [None]:
bencinas(df)

## TABLA UBICACIONES

In [None]:
df_ubicacion = df[['id_ubicacion', 'nombre_comuna', 'nombre_region',
                    'latitud', 'longitud', 'id_bencinera']].copy()
df_ubicacion['direccion'] = df['direccion_calle'] + ', ' + df['direccion_numero']
df_ubicacion.head()

In [None]:
def ubicacion(df):
    ubicaciones_id = 'acquired-winter-316123.datawarehouse.ubicacion'
    if is_empty(ubicaciones_id):
        df_copy = df.copy()

        df_ubicacion = df_copy[['id_ubicacion', 'nombre_comuna', 'nombre_region',
                            'latitud', 'longitud', 'id_bencinera']]
        df_ubicacion['direccion'] = df_copy.loc[:,'direccion_calle'] + ', ' + df_copy.loc[:,'direccion_numero']
        job_config = bigquery.LoadJobConfig()

        job = client.load_table_from_dataframe(
            df_ubicacion, ubicaciones_id, job_config=job_config
        )

        job.result()
    

In [None]:
ubicacion(df)

# ACTUALIZAR DIMENSIONES

## BENCINERAS

In [None]:
benc_table = client.query(f'SELECT * FROM {TABLA_BENCINERAS}')
benc_df = benc_table.to_dataframe()

In [None]:
benc_df.head(1)

In [None]:
df_bencin_2 = df_bencin.copy()

In [None]:
df_bencin_2.loc[df_bencin_2['id_bencinera'] == 'co110101', 'razon_social'] = 'mati'

In [None]:
def build_updt_stm(table_id, fields_dict, where_field, where_value, where_field_string):
    update_stm = f'UPDATE `{table_id}` SET '
    dict_len = len(fields_dict)
    counter = 0
    for key, value in fields_dict.items():
        counter += 1
        if type(value) == str:
            update_stm += f"{key} = '{value}'"
        else:
            update_stm += f"{key} = {value}"
        if counter < dict_len:
            update_stm += ', ' 
        else:
            update_stm += ' '
    if where_field_string:
        update_stm += f"WHERE {where_field} = '{where_value}'"
    else:
        update_stm += f"WHERE {where_field} = {where_value}"
    print(update_stm)
    client.query(update_stm)

In [None]:
rows_to_insert = []
for index, row in df_bencin_2.iterrows():
    row_to_compare = benc_df.loc[benc_df['id_bencinera'] == row.id_bencinera, :]
    if len(row_to_compare.index) == 1:
        print('ID encontrado')
        to_update = {}
        for col in row_to_compare:
            if row[col] != row_to_compare[col].values[0]:
                to_update[col] = row[col]
        build_updt_stm(TABLA_BENCINERAS, to_update, 'id_bencinera', row.id_bencinera, True)
    else:
        print('Nueva Bencinera!')
        rows_to_insert.append(row.to_dict())
    break
if len(rows_to_insert) > 0:
    client.insert_rows_json(TABLA_BENCINERASLA, rows_to_insert)

In [None]:
def updated_bencin(df_bencin):
    benc_table = client.query(f'SELECT * FROM {TABLA_BENCINERAS}')
    benc_df = benc_table.to_dataframe()
    rows_to_insert = []
    for index, row in df_bencin.iterrows():
        row_to_compare = benc_df.loc[benc_df['id_bencinera'] == row.id_bencinera, :]
        if len(row_to_compare.index) == 1:
            to_update = {}
            for col in row_to_compare:
                if row[col] != row_to_compare[col].values[0]:
                    to_update[col] = row[col]
            if len(to_update) > 0:
                print('Fila actualizada')
                print(to_update)
                build_updt_stm(TABLA_BENCINERAS, to_update, 'id_bencinera', row.id_bencinera, True)
        else:
            print('Nueva Bencinera!')
            rows_to_insert.append(row.to_dict())
    if len(rows_to_insert) > 0:
        print(rows_to_insert)
        client.insert_rows_json(TABLA_BENCINERAS, rows_to_insert)

In [None]:
df_bencin_2.loc[df_bencin_2['id_bencinera'] == 'co110101', 'farmacia'] = True

In [None]:
df_bencin_2.head()

In [None]:
mock_row = df_bencin_2.iloc[2].to_dict()
mock_row['id_bencinera'] = 'nueva_bencinera_2'
mock_row

In [None]:
df_bencin_2 = df_bencin_2.append(mock_row, ignore_index=True)
df_bencin_2.tail()

In [None]:
updated_bencin(df_bencin)

## UBICACIONES

In [None]:
def update_ubic(df_ubic):
    ubic_table = client.query(f'SELECT * FROM {TABLA_UBICACION}')
    ubic_df = ubic_table.to_dataframe()
    rows_to_insert = []
    for index, row in df_ubic.iterrows():
        row_to_compare = ubic_df.loc[ubic_df['id_bencinera'] == row.id_bencinera, :]
        if len(row_to_compare.index) == 1:
            to_update = {}
            for col in row_to_compare:
                if col == 'id_ubicacion':
                    continue
                elif row[col] != row_to_compare[col].values[0]:
                    to_update[col] = row[col]
            if len(to_update) > 0:
                print(to_update)
                build_updt_stm(TABLA_UBICACION, to_update, 'id_bencinera', row.id_bencinera, True)
        else:
            print('Nueva Bencinera!')
            new_row = row.to_dict()
            last_id_q = f"SELECT id_ubicacion FROM `{TABLA_UBICACION}` ORDER BY id_ubicacion DESC LIMIT 1"
            res = client.query(last_id_q)
            last_id = list(res)[0][0] + 1
            print('Siguiente ID:', last_id)
            new_row['id_ubicacion'] = last_id
            rows_to_insert.append(new_row)
    if len(rows_to_insert) > 0:
        client.insert_rows_json(TABLA_UBICACION, rows_to_insert)

In [None]:
df_ubi_2 = df_ubicacion.copy()

In [None]:
df_ubi_2.loc[df_ubi_2['id_bencinera'] == 'co110101', 'nombre_comuna'] = 'mati'
mock_row = df_ubi_2.iloc[5]
mock_row = mock_row.to_dict()
mock_row['nombre_region'] = 'nueva_region_2'
mock_row['id_bencinera'] = 'hola'
df_ubi_2 = df_ubi_2.append(mock_row, ignore_index=True)
df_ubi_2.head()

In [None]:
df_ubi.tail()

In [None]:
update_ubic(df_ubicacion)