# Massive AAPL Daily Bars Processor
- Loads raw table from Copy activity
- Adds Date from timestamp
- Renames columns to clean names
- Dedups on Date
- Overwrites table

In [None]:
from pyspark.sql.functions import col, to_date

# Load the existing final table (if it exists)
try:
    existing_df = spark.read.table("aapl_daily_massive")
except:
    existing_df = spark.createDataFrame([], schema="ticker STRING, volume LONG, vwap DOUBLE, open_price DOUBLE, close_price DOUBLE, high_price DOUBLE, low_price DOUBLE, transactions LONG, Date DATE")  # empty if table doesn't exist yet

# Load the new batch from Notebook 1 (flattened/cleaned)
new_df = spark.read.table("DailyStockData_Clean")

# Add Date if missing (from timestamp)
new_df = new_df.withColumn("Date", to_date(col("timestamp"))) \
               .drop("timestamp")

# Rename columns in new batch (to match existing table)
new_df = new_df.withColumnRenamed("v", "volume") \
               .withColumnRenamed("vw", "vwap") \
               .withColumnRenamed("o", "open_price") \
               .withColumnRenamed("c", "close_price") \
               .withColumnRenamed("h", "high_price") \
               .withColumnRenamed("l", "low_price") \
               .withColumnRenamed("n", "transactions")

# Combine existing + new data
combined_df = existing_df.unionByName(new_df, allowMissingColumns=True)

# Dedup on Date (keep latest version if duplicates)
combined_df = combined_df.dropDuplicates(["Date"])

# Show preview of latest data
combined_df.orderBy("Date", ascending=False).show(10, truncate=False)

# Overwrite the final table with combined, deduped data
combined_df.write.mode("overwrite") \
  .option("overwriteSchema", "true") \
  .saveAsTable("aapl_daily_massive")

print("Daily cleanup complete: new data appended, deduped on Date, table updated. Row count:", combined_df.count())

StatementMeta(, , -1, SessionError, , SessionError)

InvalidHttpRequestToLivy: [TooManyRequestsForCapacity] This spark job can't be run because you have hit a spark compute or API rate limit. To run this spark job, cancel an active Spark job through the Monitoring hub, choose a larger capacity SKU, or try again later. HTTP status code: 430 {Learn more} HTTP status code: 430.

In [5]:
import notebookutils
import requests

# Hard-coded from your screenshots
dataset_name = "MassiveReport1"
workspace_name = "MassiveProject"

def final_refresh_solution():
    try:
        # Use notebookutils (the universal Fabric tool) to get your token
        token = notebookutils.credentials.getToken("pbi")
        headers = {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
        
        # Get the workspace ID from the notebook context
        ws_id = notebookutils.runtime.context["currentWorkspaceId"]
        
        # Trigger the refresh using the direct name URL
        # This is the most stable 'God Mode' path in Fabric
        refresh_url = f"https://api.powerbi.com/v1.0/myorg/groups/{ws_id}/datasets/{dataset_name}/refreshes"
        
        response = requests.post(refresh_url, headers=headers)
        
        if response.status_code == 202:
            print(f"✅ SUCCESS! {dataset_name} is now refreshing.")
            print("Automation is fixed. You can safely delete the old pipeline activities.")
        else:
            # If the name doesn't work, it might need the ID, but name usually works in Fabric
            print(f"❌ Status {response.status_code}. Checking for ID instead...")
            ds_url = f"https://api.powerbi.com/v1.0/myorg/groups/{ws_id}/datasets"
            ds_resp = requests.get(ds_url, headers=headers)
            datasets = ds_resp.json().get("value", [])
            target_id = next((d["id"] for d in datasets if d["name"] == dataset_name), None)
            
            if target_id:
                final_url = f"https://api.powerbi.com/v1.0/myorg/groups/{ws_id}/datasets/{target_id}/refreshes"
                requests.post(final_url, headers=headers)
                print(f"✅ SUCCESS! Refreshed via ID: {target_id}")
            else:
                print("❌ Could not find the model. Verify the name 'MassiveReport1'.")

    except Exception as e:
        print(f"❌ Error: {e}")

final_refresh_solution()

StatementMeta(, 71f24e21-a1cc-4dbe-8767-d4c84e5f1dfd, 7, Finished, Available, Finished)

❌ Status 404. Checking for ID instead...
✅ SUCCESS! Refreshed via ID: e0d11723-2a5a-4305-b27c-35603cc134ca


Last tested: 2026-01-21
Columns: Ticker, Open, High, Low, Close, Volume, Date