In [None]:
import logging
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions, StandardOptions
from apache_beam.io.gcp.pubsub import ReadFromPubSub, WriteToPubSub
from google.cloud import bigquery

# Función para obtener datos desde la API
def obtener_datos_diarios():
    import requests
    url = "https://www.red.cl/restservice_v2/rest/getservicios/all"
    response = requests.get(url)
    return response.json()

# Función para transformar los datos al formato requerido
def transformar_datos(element):
    # Realizar transformaciones necesarias
    return element

# Función para cargar los datos a BigQuery
class CargarEnBigQuery(beam.DoFn):
    def __init__(self, project, dataset_id, table_id):
        self.project = project
        self.dataset_id = dataset_id
        self.table_id = table_id
        self.client = bigquery.Client(project=project)

    def process(self, element):
        table_ref = self.client.dataset(self.dataset_id).table(self.table_id)
        errors = self.client.insert_rows_json(table_ref, [element])
        if errors:
            logging.error(f"Error al insertar filas en BigQuery: {errors}")

# Configuración del pipeline
options = PipelineOptions()
options.view_as(StandardOptions).runner = 'DataflowRunner'
options.view_as(StandardOptions).streaming = True
options.view_as(StandardOptions).project = 'your-project-id'
options.view_as(StandardOptions).temp_location = 'gs://your-bucket/temp'

p = beam.Pipeline(options=options)

# Definir el pipeline
(
    p
    | 'Leer de Pub/Sub' >> ReadFromPubSub(subscription='projects/your-project-id/subscriptions/transportes-sub')
    | 'Transformar Datos' >> beam.Map(transformar_datos)
    | 'Cargar a BigQuery' >> beam.ParDo(CargarEnBigQuery('your-project-id', 'tu_dataset', 'tu_tabla'))
)

result = p.run()
result.wait_until_finish()
