In [0]:
# --- Widgets for Job Parameters ---
dbutils.widgets.text("symbols", "IBM,AAPL,TSCO.LON", "Comma-separated Stock Symbols")
dbutils.widgets.text("apikey", "", "Alpha Vantage API Key")

# Read parameters
symbols = dbutils.widgets.get("symbols").split(",")
apikey = dbutils.widgets.get("apikey")


In [0]:
import pandas as pd
import requests
from io import StringIO
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import plotly.graph_objects as go

In [0]:
import pandas as pd
import requests
from io import StringIO
from statsmodels.tsa.holtwinters import ExponentialSmoothing
import plotly.graph_objects as go
import time

def run_stock_pipeline(symbol, apikey):
    symbol = symbol.strip()
    print(f"Processing {symbol}...")

    url = (
        f"https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY"
        f"&symbol={symbol}&apikey={apikey}&datatype=csv"
    )
    r = requests.get(url)
    data = pd.read_csv(StringIO(r.text))
    
    # Check if 'timestamp' column exists
    if 'timestamp' not in data.columns:
        print(f"Data for {symbol} is missing or malformed. Skipping.")
        return

    data = data.rename(columns={'timestamp': 'date'})
    data['date'] = pd.to_datetime(data['date'])
    numeric_cols = ['open', 'high', 'low', 'close', 'volume']
    data[numeric_cols] = data[numeric_cols].apply(pd.to_numeric, errors='coerce')
    data = data.sort_values('date')

    spark_df = spark.createDataFrame(data)
    path = f"/tmp/stock_data/{symbol}_monthly"
    spark_df.write.format("delta").mode("append").save(path)

    ts = data.set_index('date')['close']
    model = ExponentialSmoothing(ts, trend='add', seasonal=None)
    fit = model.fit()
    forecast_12m = fit.forecast(12)
    forecast_dates = pd.date_range(
        data['date'].max() + pd.offsets.MonthEnd(1),
        periods=12,
        freq='M'
    )

    fig = go.Figure()
    fig.add_trace(go.Scatter(
        x=data['date'], y=data['close'],
        mode='lines+markers', name='Historical Close',
        line=dict(color='blue')
    ))
    fig.add_trace(go.Scatter(
        x=forecast_dates, y=forecast_12m,
        mode='lines+markers', name='12-Month Forecast',
        line=dict(color='red', dash='dash')
    ))
    fig.update_layout(
        title=f"{symbol} Historical Close + 12-Month Forecast",
        xaxis_title="Date", yaxis_title="Close Price", hovermode="x unified"
    )
    fig.show()

    time.sleep(12)

In [0]:
symbol = "MSFT"
apikey = "3VABX86SUL1NE9NX"
datatype = "csv"
url = f"https://www.alphavantage.co/query?function=TIME_SERIES_MONTHLY&symbol={symbol}&apikey={apikey}&datatype={datatype}"
r = requests.get(url)
data = pd.read_csv(StringIO(r.text))
