In [36]:
import pandas as pd
from google.cloud import bigquery
from google.cloud import storage
#from google.cloud import dataproc_v1 as dataproc
#from pyspark.sql import SparkSession
import os
import io
import datetime

In [12]:
storage_client = storage.Client()
bucket = storage_client.bucket('gmaps_data2')

# Obtener la URI del archivo en Cloud Storage
input_uri = 'gs://gmaps_data2/sitios/hechos_homicidios.csv'

# Configurar el cliente de Cloud Dataproc
project_id = os.environ.get('GCP_PROJECT')  # Obtener el ID del proyecto desde las variables de entorno
region = 'southamerica-east1'  # Reemplaza con tu región de preferencia
cluster_name = 'cluster-test'  # Reemplaza con el nombre de tu clúster de Cloud Dataproc
job_name = 'test-job'  # Nombre del trabajo que se ejecutará en Cloud Dataproc
job_type = 'pyspark'  # Tipo de trabajo: pyspark, hadoop, etc.
main_class = None  # Clase principal para trabajos Java
jar_file_uris = None  # URI del archivo JAR para trabajos Java
archive_uris = None  # URIs de archivos adicionales
properties = None  # Propiedades del trabajo
args = [input_uri]  # Argumentos para el trabajo

In [13]:
 client = dataproc.ClusterControllerClient()

In [None]:
def procesar_datos(event, context):
    # Obtener información sobre el archivo desde el evento
    bucket_name = event['bucket']
    file_name = event['name']

    # Configurar el cliente de Cloud Storage
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    # Obtener la URI del archivo en Cloud Storage
    input_uri = f'gs://{bucket_name}/{file_name}'

    # Configurar el cliente de Cloud Dataproc
    project_id = os.environ.get('GCP_PROJECT')  # Obtener el ID del proyecto desde las variables de entorno
    region = 'us-central1'  # Reemplaza con tu región de preferencia
    cluster_name = 'tu-cluster'  # Reemplaza con el nombre de tu clúster de Cloud Dataproc
    job_name = 'procesamiento-job'  # Nombre del trabajo que se ejecutará en Cloud Dataproc
    job_type = 'pyspark'  # Tipo de trabajo: pyspark, hadoop, etc.
    main_class = None  # Clase principal para trabajos Java
    jar_file_uris = None  # URI del archivo JAR para trabajos Java
    archive_uris = None  # URIs de archivos adicionales
    properties = None  # Propiedades del trabajo
    args = [input_uri]  # Argumentos para el trabajo

    # Iniciar el trabajo en Cloud Dataproc
    client = dataproc.ClusterControllerClient()
    job = {
        'placement': {
            'cluster_name': cluster_name
        },
        'pyspark_job': {
            'main_python_file_uri': 'gs://path/to/your/python/script.py',  # URI del script Python en Cloud Storage
            'args': args
        }
    }
    operation = client.submit_job(
        project_id=project_id,
        region=region,
        job=job,
        job_id=job_name
    )
    response = operation.result()

    print(f'Procesamiento iniciado en el clúster {cluster_name}.')

# Ejemplo de función que se activa cuando se carga un archivo en el bucket
# Reemplaza 'nombre-del-bucket' por el nombre de tu bucket
# Reemplaza 'procesar_datos' por el nombre de tu función
# GCP_PROJECT es el ID de tu proyecto de Google Cloud

In [15]:
df_sitios = pd.read_json('data/1.json', lines=True)
#Borrar Columnas
df_sitios= df_sitios.drop(['relative_results','address', 'num_of_reviews', 'description', 'url','category', 'MISC', 'hours'], axis=1) 
df_sitios= df_sitios.dropna(subset=['name']) # Elimina filas con address nulas
df_sitios['name']= df_sitios['name'].astype('string')
# Ordena el orden de las columnas
df_sitios= df_sitios[['gmap_id','name', 'latitude', 'longitude', 'avg_rating', 'price', 'state']]

In [23]:
df_sitios.info()

<class 'pandas.core.frame.DataFrame'>
Index: 274994 entries, 0 to 275000
Data columns (total 7 columns):
 #   Column      Non-Null Count   Dtype  
---  ------      --------------   -----  
 0   gmap_id     274994 non-null  object 
 1   name        274994 non-null  string 
 2   latitude    274994 non-null  float64
 3   longitude   274994 non-null  float64
 4   avg_rating  274994 non-null  float64
 5   price       13450 non-null   object 
 6   state       195523 non-null  object 
dtypes: float64(3), object(3), string(1)
memory usage: 22.6+ MB


In [4]:
def upload_to_gcs(bucket_name, source_file_name, destination_blob_name):
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)
    blob = bucket.blob(destination_blob_name)

    blob.upload_from_filename(source_file_name)

    print(f'ARCHIVO {source_file_name} CARGADO EN {destination_blob_name} EN BUCKET {bucket_name}.')


In [5]:
bucket_name = 'gmaps_data2'
source_file_name = 'data/hechos_homicidios.csv'
destination_blob_name = 'sitios/hechos_homicidios.csv'
#destination_blob_name = 'sitios/1.json'

# Upload the CSV file to GCS
upload_to_gcs(bucket_name, source_file_name, destination_blob_name)

ARCHIVO data/1.json CARGADO EN sitios/1.json EN BUCKET gmaps_data2.


In [4]:
def cargar_data(data):
    df_sitios= data.dropna(subset=['address']) # Elimina filas con address nulas
    df_sitios= df_sitios.dropna(subset=['category']) # Elimina filas con category nulas
    #Borrar Columnas
    df_sitios= df_sitios.drop(['relative_results','address', 'num_of_reviews', 'description', 'url','category', 'MISC', 'hours'], axis=1) 
    # Ordena el orden de las columnas
    df_sitios= df_sitios[['gmap_id','name', 'latitude', 'longitude', 'avg_rating', 'price', 'state']]
    return df_sitios

In [5]:
df = pd.read_json('data/1.json', lines=True)
df2= cargar_data(df)

In [6]:
df2

Unnamed: 0,gmap_id,name,latitude,longitude,avg_rating,price,state
0,0x88f16e41928ff687:0x883dad4fd048e8f8,Porter Pharmacy,32.388300,-83.357100,4.9,,Open ⋅ Closes 6PM
1,0x80c2c98c0e3c16fd:0x29ec8a728764fdf9,City Textile,34.018891,-118.215290,4.5,,Open now
2,0x80c2c778e3b73d33:0xbdc58662a4a97d49,San Soo Dang,34.058092,-118.292130,4.4,,Open ⋅ Closes 6PM
3,0x80c2c89923b27a41:0x32041559418d447,Nova Fabrics,34.023669,-118.232930,3.3,,Open ⋅ Closes 5PM
4,0x80c2c632f933b073:0xc31785961fe826a6,Nobel Textile Co,34.036694,-118.249421,4.3,,Open ⋅ Closes 5PM
...,...,...,...,...,...,...,...
274996,0x88fe64e7daceaaab:0x101c046dcadcc9c3,Maven Realty,32.857737,-79.985839,4.9,,Open 24 hours
274997,0x88fe59f4602c95fd:0x57e2b1904d9e6949,Stone Castle Fence,33.089951,-80.078021,4.0,,Closed ⋅ Opens 7AM Mon
274998,0x88fe71f9617c15e1:0x89518c1d251706bf,Health Insurance Solutions Team,32.918858,-80.027952,5.0,,Closed ⋅ Opens 8AM Mon
274999,0x4d34c7517d6e6e29:0x7fe553ee2f090cf0,Bernard Building Center,45.056691,-83.894423,4.7,,Closed ⋅ Opens 7:30AM Mon


In [8]:
storage_client = storage.Client()

# Nombre del bucket y archivo CSV
bucket_name = 'gmaps_data'
blob_name = 'sitios/1.json'

# Descargar el archivo CSV desde GCS
bucket = storage_client.bucket(bucket_name)
blob = bucket.blob(blob_name)
content = blob.download_as_string()

# Crear un DataFrame de Pandas a partir del contenido del archivo CSV
df = pd.read_json(io.BytesIO(content),lines=True)
df= cargar_data(df)

In [10]:
df.info()

<class 'pandas.core.frame.DataFrame'>
Index: 262678 entries, 0 to 275000
Data columns (total 7 columns):
 #   Column      Non-Null Count   Dtype  
---  ------      --------------   -----  
 0   gmap_id     262678 non-null  object 
 1   name        262678 non-null  object 
 2   latitude    262678 non-null  float64
 3   longitude   262678 non-null  float64
 4   avg_rating  262678 non-null  float64
 5   price       13319 non-null   object 
 6   state       186168 non-null  object 
dtypes: float64(3), object(4)
memory usage: 16.0+ MB


In [None]:
bigquery_client = bigquery.Client()

#project_id = 'tu-proyecto-id'
dataset_id = 'google_maps'
table_id = 'sitios_gmaps'
schema = [
        bigquery.SchemaField("gmap_id", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("name", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("latitude", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("longitude", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("avg_rating", bigquery.enums.SqlTypeNames.FLOAT64),
        bigquery.SchemaField("price", bigquery.enums.SqlTypeNames.STRING),
        bigquery.SchemaField("state", bigquery.enums.SqlTypeNames.STRING),
    ]
table_ref = bigquery_client.dataset(dataset_id).table(table_id)
try:
    tabla = bigquery_client.get_table(table_ref)
    tabla_existe = True
except:
    tabla_existe = False

if tabla_existe:
    print(f'La tabla {table_id} ya existe en el dataset {dataset_id}.')
else:
    print(f'La tabla {table_id} no existe en el dataset {dataset_id}.')
    # Crear la tabla si no existe
    tabla = bigquery.Table(table_ref, schema=schema)
    tabla = bigquery_client.create_table(tabla)
    print(f'Se ha creado la tabla {table_id} en el dataset {dataset_id}.')


#df.to_gbq(table_ref,if_exists="replace")
# Agregar los registros de df a la tabla existente o recién creada
job_config = bigquery.LoadJobConfig()
job_config.source_format = bigquery.SourceFormat.CSV
job_config.write_disposition = bigquery.WriteDisposition.WRITE_APPEND if tabla_existe else bigquery.WriteDisposition.WRITE_TRUNCATE
job = bigquery_client.load_table_from_dataframe(df, table_ref, job_config=job_config)
job.result()
print('Registros agregados correctamente.')