In [1]:
import requests
import json
import os
from dotenv import load_dotenv
import pandas as pd
import datetime
import time
import csv
import psycopg2
import sqlalchemy
from sqlalchemy import create_engine
import schedule
from google.cloud.sql.connector import Connector
import pg8000.native

In [2]:
connector = Connector()

# function to return the database connection
def getconn() -> pg8000.native.Connection:
    conn: pg8000.connections.Connection = connector.connect(
        os.environ.get("google_cloud_project"),
        "pg8000",
        user=os.environ.get("google_cloud_user"),
        password=os.environ.get("google_cloud_pass"),
        db="bicimad_worker"
    )
    return conn

# create connection pool
pool = sqlalchemy.create_engine(
    "postgresql+pg8000://",
    creator=getconn,
)

In [3]:
def get_token():
    load_dotenv('./.env')
    email = os.environ.get("email")
    password = os.environ.get("password")
    client_id = os.environ.get("X-ClientId")
    pass_key = os.environ.get("passKey")
    url = "https://openapi.emtmadrid.es/v3/mobilitylabs/user/login/"
    headers = {"email": email, "password" : password}
    response = requests.get(url, headers=headers)
    return response.content

In [4]:
def update_estaciones(): #Esta función hace UPDATE de los datos que no cambian de las estaciones en la base de datos
    connector = Connector()

    # function to return the database connection
    def getconn() -> pg8000.native.Connection:
        conn: pg8000.connections.Connection = connector.connect(
            os.environ.get("google_cloud_project"),
            "pg8000",
            user=os.environ.get("google_cloud_user"),
            password=os.environ.get("google_cloud_pass"),
            db="bicimad_worker"
        )
        return conn

    # create connection pool
    pool = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
    )

    load_dotenv('./.env')
    token = os.environ.get("access_token")
    url = "https://openapi.emtmadrid.es/v3/transport/bicimad/stations/"
    headers = {"accessToken" : token}
    stations = requests.get(url, headers = headers).json()
    date_and_time = datetime.datetime.now()
    date_and_time_formated = date_and_time.strftime("%Y-%m-%d %H:%M:%S")
    date_and_time_formated2 = date_and_time.strftime("%Y%m%d%H%M%S")
    estaciones = pd.DataFrame(stations["data"])
    estaciones["last_updated"] = date_and_time_formated
    estaciones[["longitude", "latitude"]] = estaciones["geometry"].apply(lambda x: pd.Series(x["coordinates"]))
    estaciones = estaciones.drop(["activate", "virtualDelete", "tipo_estacionPBSC", "geofence", "activate", "geometry", "integrator", "virtual_bikes", "virtual_bikes_num", "geofenced_capacity", "bikesGo"], axis=1)
    estaciones['coordinates'] = list(zip(estaciones['longitude'], estaciones['latitude']))
    update_estaciones = estaciones[["address", "code_district", "code_suburb", "id", "number", "total_bases", "last_updated", "longitude", "latitude", "coordinates"]]
    update_estaciones.to_csv(f'../data_csv/estaciones_{date_and_time_formated2}.csv')
    with pool.connect() as conn:
        update_estaciones.to_sql('estaciones', con=conn, if_exists='replace', index=False)
    return update_estaciones

In [5]:
connector = Connector()

# function to return the database connection
def getconn() -> pg8000.native.Connection:
    conn: pg8000.connections.Connection = connector.connect(
        os.environ.get("google_cloud_project"),
        "pg8000",
        user=os.environ.get("google_cloud_user"),
        password=os.environ.get("google_cloud_pass"),
        db="bicimad_worker"
    )
    return conn

# create connection pool
pool = sqlalchemy.create_engine(
    "postgresql+pg8000://",
    creator=getconn,
)

In [6]:
def get_disponibilidad(): #Esta función hace append en la base de datos de aquellas columnas que sí cambian a lo largo del día
    connector = Connector()

    # function to return the database connection
    def getconn() -> pg8000.native.Connection:
        conn: pg8000.connections.Connection = connector.connect(
            os.environ.get("google_cloud_project"),
            "pg8000",
            user=os.environ.get("google_cloud_user"),
            password=os.environ.get("google_cloud_pass"),
            db="bicimad_worker"
        )
        return conn

    # create connection pool
    pool = sqlalchemy.create_engine(
        "postgresql+pg8000://",
        creator=getconn,
    )
    
    load_dotenv('./.env')
    token = os.environ.get("access_token")
    url = "https://openapi.emtmadrid.es/v3/transport/bicimad/stations/"
    headers = {"accessToken" : token}
    stations = requests.get(url, headers = headers).json()
    date_and_time = datetime.datetime.now()
    date_and_time_formated = date_and_time.strftime("%Y-%m-%d %H:%M:%S")
    date_and_time_formated2 = date_and_time.strftime("%Y%m%d%H%M%S")
    estaciones = pd.DataFrame(stations["data"])
    estaciones["last_updated"] = date_and_time_formated
    estaciones[["longitude", "latitude"]] = estaciones["geometry"].apply(lambda x: pd.Series(x["coordinates"]))
    estaciones = estaciones.drop(["virtualDelete", "tipo_estacionPBSC", "geofence", "geometry", "integrator", "virtual_bikes", "virtual_bikes_num", "geofenced_capacity", "bikesGo"], axis=1)
    estaciones['coordinates'] = list(zip(estaciones['longitude'], estaciones['latitude']))
    disponibilidad = estaciones[["activate","dock_bikes", "free_bases", "id", "light", "no_available", "reservations_count", "last_updated"]]
    disponibilidad.to_csv(f'../data_csv/disponibilidad_{date_and_time_formated2}.csv')
    with pool.connect() as conn:
        disponibilidad.to_sql('disponibilidad', con=conn, if_exists='append', index=False)
    return disponibilidad

In [7]:
update_estaciones()

Unnamed: 0,address,code_district,code_suburb,id,number,total_bases,last_updated,longitude,latitude,coordinates
0,"Calle Miguel Moya nº 1,",01,015,1406,2,27,2024-03-15 08:58:01,-3.705690,40.420400,"(-3.70569, 40.4204)"
1,"Plaza Conde Surchill, 4 ,",07,072,1407,3,19,2024-03-15 08:58:01,-3.707254,40.430322,"(-3.7072537, 40.4303223)"
2,"Calle Fuencarral nº 106,",01,014,1409,5,27,2024-03-15 08:58:01,-3.702135,40.428521,"(-3.7021354, 40.4285212)"
3,"Calle Hortaleza nº 63,",01,014,1410,6,19,2024-03-15 08:58:01,-3.698447,40.424148,"(-3.698447, 40.424148)"
4,"Calle Hortaleza nº 75,",01,014,1411,7,19,2024-03-15 08:58:01,-3.697771,40.425191,"(-3.6977715, 40.4251906)"
...,...,...,...,...,...,...,...,...,...,...
607,"Campuzano, 1,",15,152,1965,482,23,2024-03-15 08:58:01,-3.642339,40.435687,"(-3.6423386, 40.4356873)"
608,"Calle de las Delicias, 41 ,",02,026,2117,273,19,2024-03-15 08:58:01,-3.690918,40.403819,"(-3.69091831, 40.40381914)"
609,"Calle Camilo José Cela, 27,",04,044,1647,241,23,2024-03-15 08:58:01,-3.667790,40.437800,"(-3.66779, 40.4378)"
610,"Ronda de Atocha nº 34,",01,012,1453,49,27,2024-03-15 08:58:01,-3.699277,40.406053,"(-3.6992774, 40.4060533)"


In [8]:
get_disponibilidad()

Unnamed: 0,activate,dock_bikes,free_bases,id,light,no_available,reservations_count,last_updated
0,1,17,10,1406,2,0,0,2024-03-15 08:58:23
1,1,9,10,1407,2,0,0,2024-03-15 08:58:23
2,1,13,14,1409,2,0,0,2024-03-15 08:58:23
3,1,8,11,1410,2,0,0,2024-03-15 08:58:23
4,1,7,12,1411,2,0,0,2024-03-15 08:58:23
...,...,...,...,...,...,...,...,...
607,1,0,23,1965,0,0,0,2024-03-15 08:58:23
608,1,15,4,2117,1,0,0,2024-03-15 08:58:23
609,1,4,18,1647,0,0,0,2024-03-15 08:58:23
610,1,10,17,1453,2,0,0,2024-03-15 08:58:23


In [9]:
#Esto hace que se ejecute la función cada hora y se carguen los datos en mi base de datos
schedule.every().hour.do(get_disponibilidad)

while True:
    schedule.run_pending()
    time.sleep(1)

schedule.every().month.do(update_estaciones)

while True:
    schedule.run_pending()
    time.sleep(1)