In [None]:
# FastAPI GCP ETL API v3
!pip install fastapi uvicorn google-cloud-bigquery pandas pyspark==3.5.0

import nest_asyncio
nest_asyncio.apply()  # FIXES Colab event loop

# Test distA top 3
result = await get_sales_ranking("distA", 3)
print("distA Top 3 Sales:")
for row in result["rankings"]:
    print(f"Date: {row.date}, Sales: {row.sales}, Rank: {row.rank}")




  from email.headerregistry import HeaderRegistry as HeaderRegistry


distA Top 3 Sales:
Date: 2026-02-12, Sales: 744000, Rank: 1
Date: 2026-02-11, Sales: 720000, Rank: 2
Date: 2026-02-10, Sales: 700000, Rank: 3


  result = await get_sales_ranking("distA", 3)


In [None]:
import nest_asyncio
nest_asyncio.apply()

async def test_all():
    print("=== distA Top 3 ===")
    result_a = await get_sales_ranking("distA", 3)
    for row in result_a["rankings"]:
        print(f"  {row.date}: {row.sales} (Rank {row.rank})")

    print("\n=== distB Top 2 ===")
    result_b = await get_sales_ranking("distB", 2)
    for row in result_b["rankings"]:
        print(f"  {row.date}: {row.sales} (Rank {row.rank})")

await test_all()


=== distA Top 3 ===
  2026-02-12: 744000 (Rank 1)
  2026-02-11: 720000 (Rank 2)
  2026-02-10: 700000 (Rank 3)

=== distB Top 2 ===
  2026-02-12: 588000 (Rank 1)
  2026-02-11: 550000 (Rank 2)


In [None]:
import asyncio

async def test_all():
    print("=== distA Top 3 ===")
    result_a = await get_sales_ranking("distA", 3)
    for row in result_a["rankings"]:
        print(f"  {row.date}: {row.sales} (Rank {row.rank})")

    print("\n=== distB Top 2 ===")
    result_b = await get_sales_ranking("distB", 2)
    for row in result_b["rankings"]:
        print(f"  {row.date}: {row.sales} (Rank {row.rank})")

asyncio.run(test_all())


=== distA Top 3 ===
  2026-02-12: 744000 (Rank 1)
  2026-02-11: 720000 (Rank 2)
  2026-02-10: 700000 (Rank 3)

=== distB Top 2 ===
  2026-02-12: 588000 (Rank 1)
  2026-02-11: 550000 (Rank 2)


In [None]:
# PRODUCTION FastAPI (File → Download → main.py)
fastapi_code = '''
from fastapi import FastAPI, Query
from google.cloud import bigquery
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, col
import os

app = FastAPI(title="GCP PySpark Sales Ranking API")

@app.get("/rankings/{distributor}")
async def get_sales_ranking(distributor: str, top_n: int = Query(3)):
    # Your GCP project (update credentials)
    client = bigquery.Client(project="spheric-wonder-469917-i9")

    query = f"""
    SELECT date, dist_id, sales, dpu
    FROM `spheric-wonder-469917-i9.paul_gcp_etl_demo.gcp_sales`
    WHERE dist_id = '{distributor}'
    ORDER BY sales DESC LIMIT 100
    """

    df_pandas = client.query(query).to_dataframe()
    spark = SparkSession.builder.appName("RankingAPI").getOrCreate()
    df = spark.createDataFrame(df_pandas)

    window_spec = Window.orderBy(col("sales").desc())
    df_ranked = df.withColumn("rank", row_number().over(window_spec))

    return {"rankings": [{"date": r.date, "sales": int(r.sales), "rank": int(r.rank)}
                        for r in df_ranked.limit(top_n).collect()]}

@app.get("/")
async def root():
    return {"message": "GCP PySpark Sales Ranking API", "endpoints": ["/rankings/distA?top_n=3"]}
'''

print("✅ PRODUCTION main.py ready!")
print("File → Download → main.py")
print("requirements.txt: fastapi uvicorn google-cloud-bigquery pyspark pandas")


✅ PRODUCTION main.py ready!
File → Download → main.py
requirements.txt: fastapi uvicorn google-cloud-bigquery pyspark pandas
