In [21]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import requests
import datetime
import json

spark = SparkSession.builder.getOrCreate()
today = datetime.datetime.today().strftime('%Y-%m-%d')

# API endpoints
endpoints = {
    "products": "https://fakestoreapi.com/products",
    "users": "https://fakestoreapi.com/users",
    "carts": "https://fakestoreapi.com/carts"
}

# Define accurate schema
schemas = {
    "products": StructType([
        StructField("id", IntegerType(), True),
        StructField("title", StringType(), True),
        StructField("price", DoubleType(), True),
        StructField("description", StringType(), True),
        StructField("category", StringType(), True),
        StructField("image", StringType(), True),
        StructField("rating", StructType([
            StructField("rate", DoubleType(), True),
            StructField("count", IntegerType(), True)
        ]), True)
    ]),
    "users": StructType([
        StructField("id", IntegerType(), True),
        StructField("email", StringType(), True),
        StructField("username", StringType(), True),
        StructField("password", StringType(), True),
        StructField("name", StructType([
            StructField("firstname", StringType(), True),
            StructField("lastname", StringType(), True)
        ])),
        StructField("address", StructType([
            StructField("city", StringType(), True),
            StructField("street", StringType(), True),
            StructField("number", IntegerType(), True),
            StructField("zipcode", StringType(), True),
            StructField("geolocation", StructType([
                StructField("lat", StringType(), True),
                StructField("long", StringType(), True)
            ]))
        ])),
        StructField("phone", StringType(), True),
        StructField("__v", IntegerType(), True)
    ]),
    "carts": StructType([
        StructField("id", IntegerType(), True),
        StructField("userId", IntegerType(), True),
        StructField("date", StringType(), True),
        StructField("products", ArrayType(StructType([
            StructField("productId", IntegerType(), True),
            StructField("quantity", IntegerType(), True)
        ]))),
        StructField("__v", IntegerType(), True)
    ])
}

# Ingest and save
for name, url in endpoints.items():
    response = requests.get(url)
    data = response.json()
    
    rdd = spark.sparkContext.parallelize([json.dumps(record) for record in data])
    df = spark.read.schema(schemas[name]).json(rdd)

    path = f'Files/Landing/{name}/{today}'
    df.write.mode("overwrite").json(path)
    print(f"✅ {name} saved to: {path}")


StatementMeta(, 7a3d255f-d078-41dc-8e83-c15aeff00bfd, 23, Finished, Available, Finished)

✅ products saved to: Files/Landing/products/2025-05-15
✅ users saved to: Files/Landing/users/2025-05-15
✅ carts saved to: Files/Landing/carts/2025-05-15
