### Part 1: Retreiving Params from Airflow
Using **dbutils.widgets.text**(param, default_value), we can retreive the params pushed by Airflow from within the DatabricksRunNowOperator. <br>
The params are indicated by the **notebook_params** parameter.

In [0]:
dbutils.widgets.text("stocks", "")
portfolio = dbutils.widgets.get("stocks").split(" ")

In [0]:
# https://docs.google.com/document/d/1e9oakyEftdHp4zGz2F0WP-_X7GEzEKYGaPvu1SguuCE/edit

import pandas as pd
import numpy as np
import matplotlib as plt
from datetime import datetime
from datetime import date as dt
from datetime import timedelta as td
import yfinance as yf # https://pypi.org/project/yfinance/#description
from delta.tables import *

In [0]:
today = dt.today().strftime("%m/%d/%Y")

Pull data (API)
yfinance Python package (https://pypi.org/project/yfinance/) to extract the data from Yahoo Finance.
Data pulled will contain 6 features, including the days’ closing stock price.

In [0]:
"""
Pulling data from Yahoo Finance using the yfinance Python package (https://pypi.org/project/yfinance/).
Credit: https://github.com/ranaroussi/yfinance
Data pulled will contain 6 features, including the days’ closing stock price.
The requested data will be saved into a dictionary.
"""

company_info = {i: yf.Ticker(i).info for i in portfolio} # API call
industry_cap_dict = {i: [company_info[i]['sector'], company_info[i]['marketCap']] for i in portfolio}

industry_cap_dict

### Part 1: Aggregating Market Capitalization (Value) by Industry
Retreive the industry and market capitalization of each company, followed by aggregating the the market capitalization by industry <br>

In [0]:
# transform dictionary into dataframe
industry_cap_df = pd.DataFrame.from_dict(industry_cap_dict,orient='index',columns=['SECTOR', 'MARKET_CAP'])

# aggregate industries
industry_agg = industry_cap_df.groupby(['SECTOR'], as_index=False).sum()

# add column for date
industry_agg["PERIOD"] = pd.to_datetime(today)

# add column combining sector and period (For unique ID)
industry_agg["UNIQUE_ID"] = industry_agg['SECTOR'] + '_' + str(today)

industry_agg

### Part 2: Delta Table
Transform the pandas dataframe into a spark dataframe, followed by writing the dataframe onto a Delta Table

In [0]:
# transform pandas dataframe to spark dataframe
industry_agg_spdf = spark.createDataFrame(industry_agg)

# save the spark dataframe as a delta table
industry_agg_spdf.write.format("delta").mode("overwrite").save("/user/hive/warehouse/industry_aggregation_update")

# load the data from the delta lake using spark (to visualize the data)
industry_agg_delta_update = spark.read.format("delta").load("/user/hive/warehouse/industry_aggregation_update")
 
display(industry_agg_spdf)

### Part 3: Upsert
Upserting the aforementioned Delta Table with the already existing one. The existing table contains prior days' data.

In [0]:
industry_agg_delta = DeltaTable.forPath(spark, "/user/hive/warehouse/industry_aggregation")
industry_agg_delta_update = DeltaTable.forPath(spark, "/user/hive/warehouse/industry_aggregation_update")

dfUpdates = industry_agg_delta_update.toDF()

industry_agg_delta.alias('data') \
  .merge(
    dfUpdates.alias('updates'),
    'data.UNIQUE_ID = updates.UNIQUE_ID'
  ) \
  .whenMatchedUpdate(set =
    {
      "SECTOR": "updates.SECTOR",
      "MARKET_CAP": "updates.MARKET_CAP",
      "PERIOD": "updates.PERIOD",
      "UNIQUE_ID": "updates.UNIQUE_ID"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "SECTOR": "updates.SECTOR",
      "MARKET_CAP": "updates.MARKET_CAP",
      "PERIOD": "updates.PERIOD",
      "UNIQUE_ID": "updates.UNIQUE_ID"
    }
  ) \
  .execute()

# Data to be used for notification purposes:

In [0]:
current_stock_price = yf.download(portfolio, dt.today(), group_by='Ticker')
current_stock_price = current_stock_price.stack(level=0).rename_axis(['Date', 'Ticker']).reset_index()

current_stock_price

In [0]:
previous_stock_price = spark.sql("""
          SELECT * FROM default.financial_info_spdf 
          WHERE Date = (
              SELECT MAX(Date)
              FROM default.financial_info_spdf)""")

previous_stock_price = previous_stock_price.toPandas()

previous_stock_price

In [0]:

stock_notification = pd.merge(
    left=previous_stock_price, right=current_stock_price, how='outer', left_on='Ticker', right_on='Ticker')

stock_notification['Percent_Diff'] = round(
  (stock_notification['Adj Close'] - stock_notification['Adj_Close']) / stock_notification['Adj Close'], 5)

In [0]:
# Adding Financial info to Delta Table

current_stock_price.rename(columns={"Adj Close": "Adj_Close"}, inplace = True)
current_stock_price["ID"] = current_stock_price['Ticker'] + "_" + current_stock_price['Date'].dt.strftime('%Y-%m-%d')

financial_info_spdf_update = spark.createDataFrame(current_stock_price)
financial_info_spdf_update.write.format("delta").mode("overwrite").save("/user/hive/warehouse/financial_info_spdf_update")


industry_agg_delta = DeltaTable.forPath(spark, '/user/hive/warehouse/financial_info_spdf')
industry_agg_delta_update = DeltaTable.forPath(spark, "/user/hive/warehouse/financial_info_spdf_update")

dfUpdates = industry_agg_delta_update.toDF()

industry_agg_delta.alias('data') \
  .merge(
    dfUpdates.alias('updates'),
    'data.ID = updates.ID'
  ) \
  .whenMatchedUpdate(set =
    {
      "Date": "updates.Date",
      "Ticker": "updates.Ticker",
      "Adj_Close": "updates.Adj_Close",
      "Close": "updates.Close",
      "High": "updates.High",
      "Low": "updates.Low",
      "Open": "updates.Open",
      "Volume": "updates.Volume",
      "ID": "updates.ID"
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "Date": "updates.Date",
      "Ticker": "updates.Ticker",
      "Adj_Close": "updates.Adj_Close",
      "Close": "updates.Close",
      "High": "updates.High",
      "Low": "updates.Low",
      "Open": "updates.Open",
      "Volume": "updates.Volume",
      "ID": "updates.ID"
    }
  ) \
  .execute()

In [0]:
big_movers = stock_notification[abs(stock_notification['Percent_Diff']) > 0.05][['Ticker','Percent_Diff']].values.tolist()

In [0]:
dbutils.notebook.exit(big_movers)