In [1]:
from google.cloud import bigquery
from google.cloud import storage
import concurrent.futures
import re
import os
import pandas as pd

project_id = 'iter-data-storage-pv-uat'
dataset_id = 'acsele_data'
bucket_name='interseguro-datalake-alloy-uat-new'
path_insert='Desarrollo/Replicacion_insert/'
dataset_output='acsele_temp'
dataset_input='acsele_data'
path_ddl='Desarrollo/Replicacion_ddl/'
path_truncate='Desarrollo/Replicacion_truncate/'

In [32]:
sql =f"""
SELECT 
TABLE_CATALOG
,TABLE_SCHEMA
,TABLE_NAME
,REPLACE(REPLACE(REPLACE(REPLACE(DDL,'{dataset_id}','acsele_temp'),');',CONCAT(')OPTIONS (format="PARQUET",URIS=["gs://{bucket_name}/{dataset_id}/',TABLE_NAME,'/*.parquet"]);')),'CREATE TABLE','CREATE OR REPLACE EXTERNAL TABLE'),'PARTITION BY DATE(AUDITDATE)','') AS DDL
FROM {project_id}.{dataset_id}.INFORMATION_SCHEMA.TABLES
"""

In [3]:
def generate_blobs(dataset,table_name):
    client_storage = storage.Client(project=project_id)
    bucket = client_storage.bucket(bucket_name)
    blobs = bucket.list_blobs(prefix=f'{dataset}/{table_name}')
    return blobs

In [4]:
def delete_blob(blob):
    blob.delete()
    print(f'Archivo {blob.name} eliminado.')

In [5]:
def clean_storage(dataset_id,project_id):
    client = bigquery.Client(project_id)
    tables = client.list_tables(dataset_id)
    for table in tables:
        table_id = table.table_id
        blobs = generate_blobs(dataset_id,table_id)
        with concurrent.futures.ThreadPoolExecutor() as executor:
            futures = [executor.submit(delete_blob, blob) for blob in blobs]
            concurrent.futures.wait(futures)  
    return f"Limpieza ruta: {dataset_id}/{table_id}"

In [6]:
def export_table_storage(dataset_id,project_id):
    client = bigquery.Client(project_id)
    dataset_ref = client.dataset(dataset_id)
    tables = client.list_tables(dataset_ref)
    for table in tables:
        table_id = table.table_id
        table_ref = dataset_ref.table(table_id)
        uri = f'gs://{bucket_name}/{dataset_id}/{table_id}/*.parquet'
        extract_job = client.extract_table(
            table_ref,
            uri,
            location="US",
            job_config=bigquery.ExtractJobConfig(destination_format="PARQUET")
            )
        print(f'Tabla Exportada: {table_ref}')

In [7]:
def read_table_bq(query):
  df = pd.read_gbq(query, project_id=project_id, dialect='standard')
  return df

In [8]:
def sqlbigquery(file):
    with open(file,"r") as file:
        statement = file.readlines()
        statement = " ".join(statement)
    return statement

In [26]:
def execute_job(query,project_id):
    client = bigquery.Client(project_id)
    query_job = client.query(query)
    results = query_job.result()
    for row in results:
        print(row)
        table_name = row['TABLE_NAME']
        ddl_statement = row['DDL']
        client.query(ddl_statement).result()
        print(f'Tabla creada: {table_name}')
    return f'Ejectuado Correctamente'

In [10]:
def store_base(path,df):
  for index, row in df.iterrows():
    nombre_archivo = row['TABLE_NAME']
    contenido_archivo = row['DDL']
    with open(path + nombre_archivo+'.sql', 'w', newline='') as archivo:
      archivo.write(contenido_archivo)
  return 'Se exporto correctamente los DDL'

In [11]:
def external_to_table(path,dataset_id,dataset_output,dataset_input):
    client = bigquery.Client(project_id)
    dataset_ref = client.dataset(dataset_id)
    tables = client.list_tables(dataset_ref)
    for table in tables:
        table_id = table.table_id
        table_ref = dataset_ref.table(table_id)
        with open(path+table_id+'.sql','w') as file:
            file.write('INSERT INTO '+project_id+'.'+dataset_input+'.'+table_id+'\n')
            file.write('SELECT * FROM '+project_id+'.'+dataset_output+'.'+table_id)

In [30]:
def execute_job_bq(path,type):
    client = bigquery.Client(project_id)
    list_sql = os.listdir(path)
    for file in list_sql:
        print(f" Query {type} a BQ: {file}")
        query = sqlbigquery(path+file)
        query_job = client.query(query)
        query_job.result()

In [41]:
def generate_truncate(path,dataset_id):
    client = bigquery.Client(project_id)
    dataset_ref = client.dataset(dataset_id)
    tables = client.list_tables(dataset_ref)
    for table in tables:
        table_id = table.table_id
        table_ref = dataset_ref.table(table_id)
        with open(path+table_id+'.sql','w') as file:
            file.write('TRUNCATE TABLE '+project_id+'.'+dataset_id+'.'+table_id)

In [42]:
generate_truncate(path_truncate,dataset_input)

### Main que realiza el exportado a cloud storage

In [16]:
def main_export_storage():
    #clean_storage(dataset_id,project_id)
    export_table_storage(dataset_id,project_id)
    return "Ejecutado Correctamente"

In [17]:
main_export_storage()

Tabla Exportada: iter-data-storage-pv-uat.acsele_data.ACCIDENTADO_landing
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.ACCIDENTADO_raw
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.AGREGATEDINSURANCEOBJECT_raw
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.AGREGATEDPOLICYTYPE_landing
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.AGREGATEDPOLICYTYPE_raw
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.AGREGATEDPOLICY_raw
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.AGREGINSOBJECTTYPE_landing
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.AGREGINSOBJECTTYPE_raw
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.ATROPELLOSIN_raw
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.CDTR_TRANSFORMERTRANSLATION_raw
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.CLAIMINSURANCEOBJECT_raw
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.CLAIMNORMALRESERVE_raw
Tabla Exportada: iter-data-storage-pv-uat.acsele_data.CLAIMPAYMENT_r

'Ejecutado Correctamente'

### Main que crea los ddl para las tablas externas en bq

In [20]:
def main_create_external_table():
    store_base(path_ddl,read_table_bq(sql))
    #execute_job(sql,project_id)
    return "Ejecutado Correctamente"

In [None]:
main_create_external_table()

In [38]:
execute_job_bq(path_ddl,'DDL')

 Query DDL a BQ: ACCIDENTADO_landing.sql
 Query DDL a BQ: ACCIDENTADO_raw.sql
 Query DDL a BQ: AGREGATEDINSURANCEOBJECT_raw.sql
 Query DDL a BQ: AGREGATEDPOLICYTYPE_landing.sql
 Query DDL a BQ: AGREGATEDPOLICYTYPE_raw.sql
 Query DDL a BQ: AGREGATEDPOLICY_raw.sql
 Query DDL a BQ: AGREGINSOBJECTTYPE_landing.sql
 Query DDL a BQ: AGREGINSOBJECTTYPE_raw.sql
 Query DDL a BQ: ATROPELLOSIN_raw.sql
 Query DDL a BQ: CDTR_TRANSFORMERTRANSLATION_raw.sql
 Query DDL a BQ: claiminsuranceobject.sql
 Query DDL a BQ: CLAIMINSURANCEOBJECT_raw.sql
 Query DDL a BQ: claimnormalreserve.sql
 Query DDL a BQ: CLAIMNORMALRESERVE_raw.sql
 Query DDL a BQ: CLAIMPAYMENT_raw.sql
 Query DDL a BQ: CLAIMRESERVEADJUST_raw.sql
 Query DDL a BQ: claimriskunit.sql
 Query DDL a BQ: CLAIMRISKUNIT_raw.sql
 Query DDL a BQ: CLAIM_raw.sql
 Query DDL a BQ: COBERTURA_raw.sql
 Query DDL a BQ: CONFIGURABLEOBJECTTYPE_landing.sql
 Query DDL a BQ: CONFIGURABLEOBJECTTYPE_raw.sql
 Query DDL a BQ: CONFIGURATEDCOVERAGE_landing.sql
 Query DDL

### Genera los archivos sql con los insert

In [39]:
external_to_table(path_insert,dataset_id,dataset_output,dataset_input)

In [83]:
#execute_job_bq(path_insert,'Insert')

 Query insert a BQ: CLAIM_raw.sql
 Query insert a BQ: COBERTURA_raw.sql


### Ejecutar

In [None]:
# Ejecuta la exportacion a cloud storage del todo un dataset entero 
main_export_storage()
# Crea los DDL para las tablas externas y los ejecuta en bq
main_create_external_table()
# genera los insert de las tablas externas a tabla en bq
external_to_table(path_insert,dataset_id,dataset_output,dataset_input)
# Aqui falta sumar el truncate table pero es riesgoso (evaluar)

# Ejecuta todos los insert 
execute_job_bq(path_insert)