In [None]:
# Databricks notebook
import requests
from pyspark.sql.functions import current_timestamp, from_json, explode
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, TimestampType

# FastAPI endpoint URLs
FASTAPI_BASE_URL = "https://hushed-dolphin-894.convex.site"  # Replace with your actual FastAPI URL
EMAILS_ENDPOINT = f"{FASTAPI_BASE_URL}/email"
USER_IDS_ENDPOINT = f"{FASTAPI_BASE_URL}/list"
USER_DETAILS_ENDPOINT = f"{FASTAPI_BASE_URL}/emailuserlist"
MESSAGES_ENDPOINT = f"{FASTAPI_BASE_URL}/messages"

# Fetch data from FastAPI endpoints
def fetch_data(url):
    response = requests.get(url)
    return response.json()

# Create DataFrames and save as Delta tables
def ingest_data():
    # Emails
    emails_data = fetch_data(EMAILS_ENDPOINT)
    emails_df = spark.createDataFrame([(email,) for email in emails_data], ["email"])
    emails_df = emails_df.withColumn("ingestion_timestamp", current_timestamp())
    emails_df.write.format("delta").mode("overwrite").saveAsTable("emails")

    # User IDs
    user_ids_data = fetch_data(USER_IDS_ENDPOINT)
    user_ids_df = spark.createDataFrame([(user_id,) for user_id in user_ids_data], ["user_id"])
    user_ids_df = user_ids_df.withColumn("ingestion_timestamp", current_timestamp())
    user_ids_df.write.format("delta").mode("overwrite").saveAsTable("user_ids")

    # User Details
    user_details_data = fetch_data(USER_DETAILS_ENDPOINT)
    user_details_schema = StructType([
        StructField("email", StringType(), True),
        StructField("username", StringType(), True),
        StructField("id", StringType(), True)
    ])
    user_details_df = spark.createDataFrame(user_details_data, schema=user_details_schema)
    user_details_df = user_details_df.withColumn("ingestion_timestamp", current_timestamp())
    user_details_df.write.format("delta").mode("overwrite").saveAsTable("user_details")

    # Messages
    messages_data = fetch_data(MESSAGES_ENDPOINT)
    message_schema = StructType([
        StructField("_id", StringType(), True),
        StructField("_creationTime", StringType(), True),
        StructField("meetupChatId", StringType(), True),
        StructField("senderId", StringType(), True),
        StructField("message", StringType(), True)
    ])
    messages_df = spark.createDataFrame(messages_data, schema=message_schema)
    messages_df = messages_df.withColumn("ingestion_timestamp", current_timestamp())
    messages_df.write.format("delta").mode("append").saveAsTable("messages")

# Run the ingestion
ingest_data()

print("Data ingestion completed successfully.")