In [None]:
import requests

In [14]:
def load_json_marseille():
    """
    Load reponse JSON  from API with Marseille stations current status
    Return a response JSON
    """

    base_url = 'https://data.ampmetropole.fr'
    endpoint = '/api/explore/v2.1/catalog/datasets/gbfs-extract-station-information/exports/json'
    params = {
        'lang':'fr',
        'timezone':'Europe/Berlin'
    }

    response = requests.get(base_url+endpoint, params=params)
    status_code = response.status_code
    print(f"📥 geting data from {base_url + endpoint}...")
    print(f'status code: {status_code}')
    return response.json()

In [20]:
json_response = load_json_marseille()
print(json_response)

📥 geting data from https://data.ampmetropole.fr/api/explore/v2.1/catalog/datasets/gbfs-extract-station-information/exports/json...
status code: 200
[{'station_id': 'ce4bgi12rcd13ifvqb70', 'nom_division': 'Marseille 1er Arrondissement', 'name': 'Cours Jean Ballard', 'capacity': 20, 'is_valet_station': 0, 'num_bikes_available': 7, 'num_docks_available': 16, 'is_installed': 1, 'is_renting': 1, 'is_returning': 1, 'point_geo': {'lon': 5.374584, 'lat': 43.293009}, 'last_reported_tr': '2025-02-24T12:19:55+01:00', 'is_virtual_station': 0, 'message_velo': 'Plus de trois vélo disponibles', 'message_dock_dispo': 'Plus de trois docks disponibles'}, {'station_id': 'ce4bgi92rcd13ifvqb90', 'nom_division': 'Marseille 1er Arrondissement', 'name': 'Canebière Saint-Ferreol', 'capacity': 20, 'is_valet_station': 0, 'num_bikes_available': 3, 'num_docks_available': 17, 'is_installed': 1, 'is_renting': 1, 'is_returning': 1, 'point_geo': {'lon': 5.37712, 'lat': 43.296278}, 'last_reported_tr': '2025-02-24T12:19

In [16]:
def extract_json_marseille(json_response):
    """
    take JSON response for Marseille stations to extract needed fields
    returns a dic
    """

    station_dic = {
        "station_id": [],
        "nom_division": [],
        "name": [],
        "capacity": [],
        "is_valet_station": [],
        "num_bikes_available": [],
        "num_docks_available": [],
        "is_installed": [],
        "is_renting": [],
        "is_returning": [],
        "lon": [],
        "lat": [],
        "last_reported_tr": [],
        "is_virtual_station": [],
        "message_velo": [],
        "message_dock_dispo": []
    }

    dic_keys = list(station_dic.keys())
    print(f'🛠️ extracting Marseille JSON data...')
    stations = json_response
    for station in stations:
        for key in dic_keys:
            # some key are directly accessible while others should be accessed inside another JSON
            if key in list(station.keys()):
                station_dic[key].append(station.get(key))
            else:
                sub_json = station.get('point_geo')
                station_dic[key].append(sub_json.get(key))
    print(f'✅ Marseille stations JSON data extracted')
    return station_dic

In [21]:
dic = extract_json_marseille(json_response)
print(dic)

🛠️ extracting Marseille JSON data...
✅ Marseille stations JSON data extracted
{'station_id': ['ce4bgi12rcd13ifvqb70', 'ce4bgi92rcd13ifvqb90', 'ce4bgj92rcd13ifvqbd0', 'ce4bgj92rcd13ifvqbe0', 'ce4bgjh2rcd13ifvqbf0', 'ce4bgjh2rcd13ifvqbg0', 'ce4bgk12rcd13ifvqbi0', 'ce4bgkh2rcd13ifvqbk0', 'ce4bgkh2rcd13ifvqbl0', 'ce4bgn92rcd13ifvqbvg', 'ce4bgop2rcd13ifvqc1g', 'ce4bgr92rcd13ifvqc6g', 'ce4bgr92rcd13ifvqc7g', 'ce4bgrp2rcd13ifvqc9g', 'ce4bgt12rcd13ifvqcd0', 'ce4bgt92rcd13ifvqcf0', 'ce4bgt92rcd13ifvqcg0', 'ce4bgup2rcd13ifvqcn0', 'ce4bgv12rcd13ifvqcp0', 'ce4bgv92rcd13ifvqcr0', 'ce4bgvp2rcd13ifvqct0', 'ce4bh012rcd13ifvqd00', 'ce4bh0h2rcd13ifvqd40', 'ce4bh0p2rcd13ifvqd80', 'ce4bh112rcd13ifvqd90', 'ce4bh112rcd13ifvqda0', 'ce4bh192rcd13ifvqdb0', 'ce4bh292rcd13ifvqdg0', 'ce4bh3p2rcd13ifvqdq0', 'ce4bh412rcd13ifvqds0', 'ce4bh492rcd13ifvqdvg', 'ce4bh4p2rcd13ifvqe0g', 'ce4bh512rcd13ifvqe3g', 'ce4bh692rcd13ifvqea0', 'ce4bh9p2rcd13ifvqec0', 'ce4bha12rcd13ifvqef0', 'ce4bhbh2rcd13ifvqeo0', 'ce4bhbp2rcd13ifvq

In [9]:
import time
import os
import io
from google.cloud import storage
from google.cloud import bigquery
from google.api_core.exceptions import NotFound
import pandas as pd

In [23]:
current_timestamp = time.time()
current_time_struct = time.localtime(current_timestamp)
current_date_str = time.strftime("%Y%m%d", current_time_struct)
current_timestamp_for_bq = time.strftime("%Y-%m-%d %H:%M:%S", current_time_struct)
current_timestamp_str = time.strftime("%Y%m%d_%H%M%S", current_time_struct)

print(current_date_str)
print(current_timestamp_for_bq)
print(current_timestamp_str)

20250224
2025-02-24 12:31:08
20250224_123108


In [24]:
def load_csv_to_bucket(json_extract, blob_name, BUCKET_NAME = os.getenv('BUCKET_NAME')):
    """
    Uploads stations data dictionnary as a CSV file to the GCS bucket.
    """
    # get current time to add it in blob name
    blob_full_name = f'{blob_name}/{blob_name}_{current_date_str}/{blob_name}_{current_timestamp_str}.csv'

    df = pd.DataFrame(json_extract)
    df['GCS_loaded_at'] = current_timestamp_for_bq
    # Initialize a GCS client
    client = storage.Client()
    # Get the bucket
    bucket = client.bucket(BUCKET_NAME)
    # Create a new blob (object) in the bucket
    blob = bucket.blob(blob_full_name)
    # Convert the DataFrame to a CSV string
    csv_buffer = io.StringIO()
    df.to_csv(csv_buffer, index=False)
    # Upload the CSV string to the blob
    blob.upload_from_string(csv_buffer.getvalue(), content_type='text/csv')
    print(f"✅ DataFrame uploaded to {blob_full_name} in bucket {BUCKET_NAME}")

In [25]:
load_csv_to_bucket(dic, 'example_extract_marseille')

✅ DataFrame uploaded to example_extract_marseille/example_extract_marseille_20250224/example_extract_marseille_20250224_123108.csv in bucket my-velib


In [26]:
def load_csv_to_bigquery(
    data_folder,
    BUCKET_NAME= os.getenv('BUCKET_NAME'),
    PROJECT_ID= os.getenv('PROJECT_ID'), 
):
    """
    function to load all the csv of the day from a bucket folder to big query
    here the GCS bucket folder take the same name of the Big Query Dataset
    """

    gcs_uri = f'gs://{BUCKET_NAME}/{data_folder}/{data_folder}_{current_date_str}/*.csv'
    
    job_config = bigquery.LoadJobConfig(
        source_format=bigquery.SourceFormat.CSV,
        skip_leading_rows=1,  # Skip header row if present
        autodetect=True,  # Automatically detect schema
    )

    # Initialize a BigQuery client
    client = bigquery.Client(project=PROJECT_ID)
    # dataset id in Big Query will have the same name of data folder in GCS
    dataset_ref = client.dataset(data_folder)
    # test if the dataset exists in BQ, create it if not
    try:
        
        client.get_dataset(dataset_ref)
    except NotFound:
        dataset = bigquery.Dataset(dataset_ref)
        dataset.location = "EU"  # Vous pouvez spécifier une autre région si nécessaire
        dataset = client.create_dataset(dataset)
        print(f"dataset '{data_folder}' has been created")

    table_id = f'{data_folder}_{current_date_str}'

    # Load data into BigQuery
    load_job = client.load_table_from_uri(
        gcs_uri,
        f'{PROJECT_ID}.{data_folder}.{table_id}',
        job_config=job_config
    )

    # Wait for the job to complete
    load_job.result()

    print(f'✅ Loaded {load_job.output_rows} rows into {PROJECT_ID}:{data_folder}.{table_id}.')

In [27]:
load_csv_to_bigquery('example_extract_marseille')

dataset 'example_extract_marseille' has been created
✅ Loaded 396 rows into fleet-petal-448410-u6:example_extract_marseille.example_extract_marseille_20250224.
