In [1]:
import requests
import json
from pyspark.sql.functions import from_json, col, regexp_replace
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, MapType

In [2]:
url ="https://opendata.paris.fr/api/explore/v2.1/catalog/datasets/velib-emplacement-des-stations/records?limit=100"
BUCKET_NAME = "paris-opendata-gcp-data"
BQ_BRONZE_TABLE = "paris-opendata-gcp.bronze.velib_emplacement_stations"
TEMP_GCS_BUCKET = f"{BUCKET_NAME}/temporary/notebooks/"

In [3]:
## schema de retour d'API

schema = StructType([
    StructField("stationcode", StringType(), True),
    StructField("name", StringType(), True),
    StructField("capacity", IntegerType(), True),
    StructField("coordonnees_geo", StructType([
        StructField("lon", DoubleType(), True),
        StructField("lat", DoubleType(), True)
    ]), True),
    StructField("station_opening_hours", StringType(), True)  # ou autre type si besoin
])

In [4]:


response =  requests.get(url)

data = response.json()

# 3. Extraire la liste des enregistrements (champ "results")
records = data["results"]

# 4. Convertir en RDD puis DataFrame
rdd = spark.sparkContext.parallelize(records)
df = spark.read.json(rdd)

# 5. Afficher les premières lignes
#df.show(truncate=False)
#df.printSchema()

                                                                                

In [5]:
# 1. Remplacer ' par "
# 2. Remplacer None par null
df_fixed = df.withColumn(
    "_corrupt_record_json",
    regexp_replace(
        regexp_replace(col("_corrupt_record"), "'", "\""),
        "None",
        "null"
    )
)

# 3. Parser la chaîne JSON corrigée
df_parsed_json = df_fixed.withColumn("parsed", from_json(col("_corrupt_record_json"), schema))


In [6]:
df_final = df_parsed_json.select(
    col("parsed.stationcode").alias("stationcode"),
    col("parsed.name").alias("name"),
    col("parsed.capacity").alias("capacity"),
    col("parsed.coordonnees_geo.lon").alias("lon"),
    col("parsed.coordonnees_geo.lat").alias("lat"),
    col("parsed.station_opening_hours").alias("station_opening_hours")
)

### *Chargement de la donneés dans biquery Silver*

In [7]:
df_final.write \
    .format("bigquery") \
    .mode("overwrite") \
    .option("table", BQ_BRONZE_TABLE) \
    .option("temporaryGcsBucket", TEMP_GCS_BUCKET) \
    .save()

                                                                                