# Creating and setup bigquery tables

- dim table  : stations
- fact table : records

In [1]:
import pandas as pd
import os
import requests

from google.cloud import bigquery

from config import Config
from dash_app.src.data.models import Station, Record

os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = Config.GOOGLE_CREDENTIALS

# Construct a BigQuery client object.
client = bigquery.Client()

In [2]:
Config.__dict__

mappingproxy({'__module__': 'config',
              'PROJECT_ID': 'vlille-gcp-dash-yzpt',
              'DATASET_ID': 'vlille_dataset_v2',
              'RECORDS_TABLE_ID': 'records',
              'STATIONS_TABLE_ID': 'stations',
              'REGION': 'europe-west9',
              'GOOGLE_CREDENTIALS': 'key-vlille-gcp-dash-yzpt.json',
              'API_URL': 'https://data.lillemetropole.fr/geoserver/wfs?SERVICE=WFS&REQUEST=GetFeature&VERSION=2.0.0&TYPENAMES=dsp_ilevia%3Avlille_temps_reel&OUTPUTFORMAT=application%2Fjson',
              'TIMEZONE': 'Europe/Paris',
              '__dict__': <attribute '__dict__' of 'Config' objects>,
              '__weakref__': <attribute '__weakref__' of 'Config' objects>,
              '__doc__': None})

## Create the dataset

In [4]:
# create a new dataset
dataset = bigquery.Dataset(Config.PROJECT_ID + "." + Config.DATASET_ID)
dataset.location = Config.REGION
dataset = client.create_dataset(dataset, timeout=30)

print("Created dataset {}.{}".format(client.project, dataset.dataset_id))

Created dataset vlille-gcp-dash-yzpt.vlille_dataset_v2


## Create Stations table and populate it

In [5]:
# Create a new table
table = bigquery.Table(Config.PROJECT_ID + "." + Config.DATASET_ID + "." + Config.STATIONS_TABLE_ID)

# Set the schema considering the dataclass below:
# @dataclass
# class Station:
#     id: int
#     name: str
#     adress: str
#     city: str
#     type: str
#     latitude: float
#     longitude: float
    
schema = [
    bigquery.SchemaField("id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("name", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("adress", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("city", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("type", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("latitude", "FLOAT", mode="REQUIRED"),
    bigquery.SchemaField("longitude", "FLOAT", mode="REQUIRED"),
]

table.schema = schema
table = client.create_table(table)
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))

Created table vlille-gcp-dash-yzpt.vlille_dataset_v2.stations


In [7]:
## Query the source API to get the stations data

url = Config.API_URL
response = requests.get(url)
data = response.json()

stations = []
for record in data['features']:
    station = Station(
        id=         record['properties']['objectid'],
        name=       record['properties']['nom'],
        adress=     record['properties']['adresse'],
        city=       record['properties']['commune'],
        type=       record['properties']['type'],
        latitude=   record['properties']['x'],
        longitude=  record['properties']['y']
    )
    stations.append(station)
    
df = pd.DataFrame(station.__dict__ for station in stations)
df.sample(5)

# insert into bigquery
table = client.get_table(Config.PROJECT_ID + "." + Config.DATASET_ID + "." + Config.STATIONS_TABLE_ID)

job = client.load_table_from_dataframe(df, table)
job.result()  # Waits for the job to complete.

LoadJob<project=vlille-gcp-dash-yzpt, location=europe-west9, id=9a180f4f-abc5-4473-a21c-486f98daf15c>

In [8]:
# Query the table to check the data
query = f"""
    SELECT *
    FROM `{Config.PROJECT_ID}.{Config.DATASET_ID}.{Config.STATIONS_TABLE_ID}`
"""

query_job = client.query(query)
df = query_job.to_dataframe()
df.sample(5)

Unnamed: 0,id,name,adress,city,type,latitude,longitude
20,22,LOUISE DE BETTIGNIES,2 avenue du Peuple Belge,Lille,AVEC TPE,3.064974,50.64144
60,31,PONT NEUF,38 bis rue du Pont Neuf,Lille,AVEC TPE,3.062935,50.644264
283,185,SAINTE HÉLÈNE,48 rue Sainte-Hélène,Saint Andrè Lez Lille,AVEC TPE,3.056147,50.65722
144,202,PARC BARBIEUX,Avenue de Jussieu,Roubaix,AVEC TPE,3.162436,50.678038
52,20,N.D. DE LA TREILLE,11 place Gilleson,Lille,AVEC TPE,3.062053,50.640385


In [9]:
print(len(df), "stations inserted.")

289 stations inserted.


## Create the records table

In [11]:
# create a new table
table = bigquery.Table(Config.PROJECT_ID + "." + Config.DATASET_ID + "." + Config.RECORDS_TABLE_ID)

# Set the schema considering the dataclass below:
# @dataclass
# class Record:
#     station_id: int
#     state: str
#     available_bikes: int
#     available_places: int
#     connexion_state: str
#     last_update: str
#     record_timestamp: str

schema = [
    bigquery.SchemaField("station_id", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("state", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("available_bikes", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("available_places", "INTEGER", mode="REQUIRED"),
    bigquery.SchemaField("connexion_state", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("last_update", "STRING", mode="REQUIRED"),
    bigquery.SchemaField("record_timestamp", "STRING", mode="REQUIRED"),
]

table.schema = schema
table = client.create_table(table)
print("Created table {}.{}.{}".format(table.project, table.dataset_id, table.table_id))

Created table vlille-gcp-dash-yzpt.vlille_dataset_v2.records


## Request the records table to check data insertion

After deploying the Cloud Function, we can check the records table to see if the data was inserted correctly.

In [6]:
# Query the table to check the data
query = f"""
    SELECT *
    FROM `{Config.PROJECT_ID}.{Config.DATASET_ID}.{Config.RECORDS_TABLE_ID}`
    WHERE state = 'EN SERVICE'
    ORDER BY record_timestamp DESC
    LIMIT 5
"""

query_job = client.query(query)
df = query_job.to_dataframe()
df

Unnamed: 0,station_id,state,available_bikes,available_places,connexion_state,last_update,record_timestamp
0,309,EN SERVICE,4,20,CONNECTÉ,2024-09-09T17:19:24.728Z,2024-09-09 17:20:05
1,273,EN SERVICE,4,10,CONNECTÉ,2024-09-09T17:19:24.720Z,2024-09-09 17:20:05
2,274,EN SERVICE,6,2,CONNECTÉ,2024-09-09T17:19:24.720Z,2024-09-09 17:20:05
3,310,EN SERVICE,10,10,CONNECTÉ,2024-09-09T17:19:24.728Z,2024-09-09 17:20:05
4,276,EN SERVICE,3,17,CONNECTÉ,2024-09-09T17:19:24.722Z,2024-09-09 17:20:05


In [7]:
sample_records = df
sample_records.to_csv('sample_records.csv', index=False)

In [9]:
# Query the stations table
query = f"""
    SELECT *
    FROM `{Config.PROJECT_ID}.{Config.DATASET_ID}.{Config.STATIONS_TABLE_ID}`
"""

query_job = client.query(query)
sample_stations = query_job.to_dataframe()

sample_stations.to_csv('sample_stations.csv', index=False)
sample_stations

Unnamed: 0,id,name,adress,city,type,latitude,longitude
0,274,CALMETTE,323 avenue Laënnec,Hem,AVEC TPE,3.192974,50.667847
1,275,LAËNNEC,31 bis rue des Écoles,Hem,AVEC TPE,3.198101,50.665946
2,176,FACULTÉ DE MÉDECINE,310 Avenue Eugène Avinée,LOOS,AVEC TPE,3.031421,50.604499
3,175,RECHERCHE,977 Avenue Eugène Avinée,Loos,AVEC TPE,3.038779,50.603010
4,177,RUE DE LONDRES,2 Rue du Maréchal Foch,Loos,AVEC TPE,3.023032,50.616763
...,...,...,...,...,...,...,...
284,155,BAILLY,17 rue Albert Bailly,Saint André Lez Lille,AVEC TPE,3.051201,50.655098
285,156,ST ANDRE MAIRIE,91 rue du Général Leclerc,Saint André Lez Lille,AVEC TPE,3.050322,50.661602
286,190,QUAI 22,149 rue Sadi Carnot,Saint André Lez Lille,AVEC TPE,3.057167,50.662941
287,194,LOMMELET,185 rue du Général Leclerc,Saint André Lez Lille,AVEC TPE,3.049462,50.665968
