In [1]:
!pip install pyspark requests
!pip install pymongo

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import requests
import json




In [2]:
MONGO_URI = "mongodb+srv://mbipchristian_db_user:christiano@cluster0.scoxfhf.mongodb.net/?appName=Cluster0"

spark = SparkSession.builder \
    .appName("RNB_to_MongoDB") \
    .config("spark.mongodb.write.connection.uri", MONGO_URI) \
    .config("spark.mongodb.read.connection.uri", MONGO_URI) \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.3.0") \
    .getOrCreate()



In [3]:
import time

BASE_URL = "https://rnb-api.beta.gouv.fr/api/alpha/buildings/?insee_code=33063&limit=100"

def fetch_all_buildings_cursor(start_url=BASE_URL, sleep_s=0.05):
    all_results = []
    url = start_url
    count = 0
    while url:
        r = requests.get(url, headers={"accept": "application/json"})
        r.raise_for_status()

        payload = r.json()
        results = payload.get("results", [])
        all_results.extend(results)

        count += 1
        if count % 100 == 0:
            print(f"{count} buildings processed")

        url = payload.get("next")  # cursor pagination
        time.sleep(sleep_s)

    return all_results

all_buildings = fetch_all_buildings_cursor()
print("TOTAL buildings fetched:", len(all_buildings))

dataframe = spark.createDataFrame(all_buildings)

dataframe.printSchema()
dataframe.show(3, truncate=False)


300 buildings processed
600 buildings processed
ðŸŽ‰ TOTAL buildings fetched: 75112
root
 |-- addresses: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- ext_ids: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: string (valueContainsNull = true)
 |-- is_active: boolean (nullable = true)
 |-- point: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- rnb_id: string (nullable = true)
 |-- shape: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- status: string (nullable = true)

+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------

In [4]:
df_flat = dataframe.select(
    col("rnb_id").alias("building_id"),
    col("status"),
    col("point"),
    col("shape"),
    col("addresses"),
    col("ext_ids"),
    col("is_active")
)

First, let's load the data that `df` will be based on. Assuming the data comes from a web API, I'll use `requests` to fetch it and then create a Spark DataFrame.

In [5]:
df_flat.write \
    .format("mongodb") \
    .mode("append") \
    .option("database", "rnb") \
    .option("collection", "buildings") \
    .save()
