In [None]:
title: "DSAN-5500 Final Project: Polygon.io API ETL Pipeline"
author: "Matt Carswell, Jude Moukarzel, Alex Pattarini"
format:
    html:
        embed-resources: true

**STOCK MARKET ANALYSIS**  
*By Alex Pattarini, Matt Carswell, Jude Moukarzel*

**Problem Outline**  


*   **Inefficiency in Data Access and Processing:** Manually retrieving and processing daily stock data is time-consuming and prone to errors. This inefficiency affects the timeliness and reliability of financial analysis and trading strategies.
*   **Difficulty in Tracking Progress and Ensuring Data Integrity**: Without a streamlined process, tracking the status of data retrieval and transformations across multiple stocks and markets is challenging. Ensuring the integrity and accuracy of financial data is crucial for making informed trading decisions.



**Proposed Solution: Implementation of an ETL Pipeline**  
An ETL (Extract, Transform, Load) pipeline can address these challenges by automating the data workflow, ensuring data integrity, and improving data accessibility and tracking.

**Step 1: Extract**  
Retrieve daily stock data of the top companies within each major indistry such as open price, close price, high, low, and number of transactions using Polygon.io.  
Pydantic is used at this stage to validate the data schema as soon as data is extracted, ensuring the data types and required fields are correct and complete. This prevents errors early in the pipeline.  

**Step 2: Transform**  
Structure the data for analysis, creating a dataframe with all relevant values as well as engineering any new features needed for additional insights.

**Step 3: Load**  
Store and load the transformed and validated data in a structured JSON database for easy access and analysis.  
Data integrity and schema adherence are maintained during the loading phase, with logs and errors managed through Prefect, which also tracks the ETL process for performance and error handling.    

By leveraging this ETL pipeline, we can address the inefficiencies related to data management in stock market analysis, ensuring timely and accurate data processing with improved oversight and error handling.  

**Setting Up Prefect and Polygon**

In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
from google.colab import userdata
my_api_key = userdata.get('PREFECT_API_KEY')

In [3]:
%%capture
!pip install prefect

In [4]:
!prefect cloud login -k "$my_api_key" -w "matt-carswell/default"

[32mAuthenticated with Prefect Cloud! Using workspace 'matt-carswell/default'.[0m


In [5]:
!pip install -U polygon-api-client

Collecting polygon-api-client
  Downloading polygon_api_client-1.13.7-py3-none-any.whl (40 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m40.4/40.4 kB[0m [31m304.1 kB/s[0m eta [36m0:00:00[0m
Collecting urllib3<2.0.0,>=1.26.9 (from polygon-api-client)
  Downloading urllib3-1.26.18-py2.py3-none-any.whl (143 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m143.8/143.8 kB[0m [31m3.3 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: urllib3, polygon-api-client
  Attempting uninstall: urllib3
    Found existing installation: urllib3 2.0.7
    Uninstalling urllib3-2.0.7:
      Successfully uninstalled urllib3-2.0.7
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
spacy 3.7.4 requires typer<0.10.0,>=0.3.0, but you have typer 0.12.3 which is incompatible.
weasel 0.3.4 requires typer<0.10.0,>=0.3.0, but you ha

In [6]:
from polygon import RESTClient
client = RESTClient(api_key="T0QpOfpp3phUHlnzbXCbzP2r0Z6Hvsj2")

## Individual Stocks

**Creating a List of Stock Tickers**

In [34]:
import pandas as pd
data = {
    'Ticker': [
        "AAPL", "MSFT", "GOOG", "META", "INTC", "NVDA", "AMD", "ORCL", "IBM", "QCOM",
        "JNJ", "PFE", "UNH", "MRK", "ABT", "ABBV", "AMGN", "TMO", "GILD", "BMY",
        "JPM", "BAC", "WFC", "C", "GS", "MS", "AXP", "BLK", "TFC", "SPGI",
        "XOM", "CVX", "COP", "SLB", "EOG", "PSX", "VLO", "KMI", "WMB",
        "PG", "KO", "PEP", "NKE", "MO", "EL", "PM", "CL", "KMB", "GIS",
        "AMZN", "WMT", "HD", "COST", "TGT", "LOW", "DG", "SBUX", "CVS", "WBA",
        "TSLA", "GM", "F", "TM", "HMC", "RACE", "NSANY", "HYMTF",
        "XOM", "CVX", "BP"
    ]}
# Create the DataFrame
df = pd.DataFrame(data)

# Print the DataFrame
df.head()

Unnamed: 0,Ticker
0,AAPL
1,MSFT
2,GOOG
3,META
4,INTC


**Creating our first ETL pipeline for Individual Stock Price Performance**

In [None]:
%%writefile stock_scraper.py
from collections import Counter
from pprint import pprint
from typing import List
from prefect.artifacts import create_markdown_artifact
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
from prefect import flow, task
from pydantic import BaseModel, field_validator, HttpUrl
from pydantic import BaseModel, FilePath, HttpUrl, NonNegativeInt
import requests
import time
from datetime import datetime, timedelta

from polygon import RESTClient
client = RESTClient(api_key="T0QpOfpp3phUHlnzbXCbzP2r0Z6Hvsj2")

from google.colab import userdata
my_api_key = userdata.get('PREFECT_API_KEY')

class Stock(BaseModel):
  ticker: str
  open: float
  close: float
  high: float
  low: float
  volume: float
  transactions: float


  @field_validator('ticker')
  @classmethod
  def ticker_must_be_alphabetical(cls, value):
      if not value.isalpha():
          raise ValueError(f'Ticker must be alphabetical, got {value}')
      return value

  @field_validator('open', 'close', 'high', 'low', 'volume', 'transactions')
  @classmethod
  def must_be_nonnegative(cls, value):
      if value < 0:
          raise ValueError("Financial figures must be non-negative.")
      return value


@task
def fetch_aggregates(ticker,from_date,to_date):
    # First off, we request fetch the actual prices of each ticker
    aggs = []


    for a in client.list_aggs(ticker=ticker, multiplier=1, timespan="day", from_=from_date, to=to_date, limit=50000):
      aggs.append(a)

    # Store prices in class object
    stock_open = aggs[0].open
    stock_close = aggs[0].close
    stock_high = aggs[0].high
    stock_low = aggs[0].low
    stock_volume = aggs[0].volume
    stock_transactions = aggs[0].transactions

  # Finally, we build our Quote object
    stock_info = Stock(
        ticker=ticker,
        open=stock_open,
        close=stock_close,
        high=stock_high,
        low = stock_low,
        volume = stock_volume,
        transactions = stock_transactions
    )
    return stock_info



@task
def load_stock_into_db(
    stock_obj: Stock,
    db_fpath: FilePath
) -> None:
    stock_json = stock_obj.json() # using the .json method from Pydantic to serialize the Quote object to JSON

    # append the JSON string to the file
    with open(db_fpath, 'a') as file:
        file.write(stock_json + '\n')


@flow
def scrape_ticker(ticker, from_date, to_date, db_filename: str = "extracted_stocks.jsonl"):
    extracted_aggs = fetch_aggregates(ticker, from_date, to_date)
    db_result = load_stock_into_db(extracted_aggs, db_filename)
    return extracted_aggs

def define_date_range():
    today = datetime.today()
    from_date = (today - timedelta(days=1)).strftime('%Y-%m-%d')
    to_date = today.strftime('%Y-%m-%d')
    return from_date, to_date

# Loop over a list of tickers
today = datetime.today()
from_date = (today - timedelta(days=1)).strftime('%Y-%m-%d')
to_date = today.strftime('%Y-%m-%d')
for ticker in df['Ticker']:
    from_date, to_date = define_date_range()
    scrape_ticker(ticker=ticker, from_date=from_date, to_date=to_date)

if __name__ == "__main__":
    scrape_index.serve(name="report-deployment",
                          tags=["onboarding"],
                          parameters={},
                          interval=86400)  # Set interval to 86400 seconds (24 hours)

**Creating a new ETL for Market Indices performance**  
Slight caveat: Our API is limited in which indicies can be pulled without a very expensive plan so these are all ETFs which are still extremely indicative of the indicies themselves.)

In [37]:
indicies = {
    "index_name" : [
    "VTI","SPY","DIA","QQQ","USO","GLD","USRT","VDE","XHE","SOXQ","XLK"
    ]}

In [80]:
%%writefile index_scraper.py
from collections import Counter
from pprint import pprint
from typing import List
from prefect.artifacts import create_markdown_artifact
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
from prefect import flow, task
from pydantic import BaseModel, field_validator, HttpUrl
from pydantic import BaseModel, FilePath, HttpUrl, NonNegativeInt
import requests
import time
from datetime import datetime, timedelta

from polygon import RESTClient
client = RESTClient(api_key="T0QpOfpp3phUHlnzbXCbzP2r0Z6Hvsj2")

from google.colab import userdata
my_api_key = userdata.get('PREFECT_API_KEY')


indicies = {
    "index_name" : [
    "VTI","SPY","DIA","QQQ","USO","GLD","USRT","VDE","XHE","SOXQ","XLK"
    ]}


class Index(BaseModel):
  ticker: str
  open: float
  close: float
  high: float
  low: float
  volume: float
  transactions: float

@task
def fetch_index_aggregates(index_name: str,from_date,to_date):
    idx_aggs = []

    #######Generalize the dates and include timestamp in Stock object##################
    for a in client.list_aggs(ticker=index_name, multiplier=1, timespan="day", from_=from_date, to=to_date, limit=50000):
      idx_aggs.append(a)

    # Store prices in class object
    idx_open = idx_aggs[0].open
    idx_close = idx_aggs[0].close
    idx_high = idx_aggs[0].high
    idx_low = idx_aggs[0].low
    idx_volume = idx_aggs[0].volume
    idx_transactions = idx_aggs[0].transactions

    # Create an Index object
    index_info = Index(
        ticker=index_name,
        open=idx_open,
        close=idx_close,
        high=idx_high,
        low=idx_low,
        volume=idx_volume,
        transactions = idx_transactions
    )
    return index_info

# Task to load index data into the database
@task
def load_index_into_db(
    index_obj: Index,
    db_fpath: FilePath
) -> None:
    index_json = index_obj.json()  # Serialize the Index object to JSON

    # Append the JSON string to the file
    with open(db_fpath, 'a') as file:
        file.write(index_json + '\n')

@flow
def scrape_index(index_name, from_date,to_date,db_filename: str = "extracted_indices.jsonl"):
    extracted_aggs = fetch_index_aggregates(index_name,from_date,to_date)
    db_result = load_index_into_db(extracted_aggs, db_filename)
    return extracted_aggs

def define_date_range():
    today = datetime.today()
    from_date = (today - timedelta(days=1)).strftime('%Y-%m-%d')
    to_date = today.strftime('%Y-%m-%d')
    return from_date, to_date

# Loop over a list of index names
today = datetime.today()
from_date = (today - timedelta(days=1)).strftime('%Y-%m-%d')
to_date = today.strftime('%Y-%m-%d')
for index_name in indicies['index_name']:
    scrape_index(index_name=index_name,from_date=from_date,to_date=to_date)

if __name__ == "__main__":
    scrape_index.serve(name="report-deployment",
                          tags=["onboarding"],
                          parameters={},
                          interval=86400)  # Set interval to 86400 seconds (24 hours)

Overwriting index_scraper.py


In [81]:
%run index_scraper.py

KeyboardInterrupt: 

**Combining both Index & Stock Reports into one ETL with one combined artifact**

In [54]:
%%writefile market_report_flow.py
# Generate report ETL flow
from prefect.artifacts import create_markdown_artifact
import matplotlib.pyplot as plt
import base64
from collections import Counter
from pprint import pprint
from typing import List
from prefect.artifacts import create_markdown_artifact
import matplotlib.pyplot as plt
from bs4 import BeautifulSoup
from prefect import flow, task
from pydantic import BaseModel, field_validator, HttpUrl
from pydantic import BaseModel, FilePath, HttpUrl, NonNegativeInt
import requests
import time
from datetime import datetime, timedelta
import pandas as pd

# Generating Stocks Part of Artifact

# Extract from json file of ticker info
@task
def extract_stocks(db_fpath):
  extract_df = pd.read_json(db_fpath, lines = True)
  # Calculate price change
  extract_df['price_change'] = extract_df['close'] - extract_df['open']
  return extract_df

# Aggregate stock ticker info for the report
@task
def topn_stocks(df):
  # Sort and select top 10 price increases and decreases
  top_10_increases = df.nlargest(10, 'price_change')
  top_10_decreases = df.nsmallest(10, 'price_change')
  # Sort by transactions and select the top 10
  top_10_transactions = df.nlargest(10, 'transactions')

  return top_10_increases, top_10_decreases, top_10_transactions

@task
# Function to add labels to the bars
def add_labels(ax, bars):
    for bar in bars:
        height = bar.get_height()
        ax.annotate(f'{height:.2f}',
                    xy=(bar.get_x() + bar.get_width() / 2, height),
                    xytext=(0, 3),  # 3 points vertical offset
                    textcoords="offset points",
                    ha='center', va='bottom')

def viz_top_10_increase(top_10_increases):
  # Generate visualizations for implementation in report
  fig, ax = plt.subplots(figsize=(10, 6))
  bars = ax.bar(top_10_increases['ticker'], top_10_increases['price_change'], color='green')
  add_labels(ax, bars)
  plt.xlabel('Ticker')
  plt.ylabel('Price Increase')
  plt.title('Top 10 Companies with Biggest Price Increase')
  plt.xticks(rotation=45)
  plt.grid(axis='y', linestyle='--', alpha=0.7)
  plt.savefig('top_10_increases.png')

def viz_top_10_decrease(top_10_decreases):
  # Generate visualizations for implementation in report
  fig, ax = plt.subplots(figsize=(10, 6))
  bars = ax.bar(top_10_decreases['ticker'], top_10_decreases['price_change'], color='red')
  add_labels(ax, bars)
  plt.xlabel('Ticker')
  plt.ylabel('Price Decrease')
  plt.title('Top 10 Companies with Biggest Price Decrease')
  plt.xticks(rotation=45)
  plt.grid(axis='y', linestyle='--', alpha=0.7)
  plt.savefig('top_10_decreases.png')

def viz_top_10_transactions(top_10_transactions):
  # Plotting horizontal bar chart
  plt.figure(figsize=(10, 8))
  plt.barh(top_10_transactions['ticker'], top_10_transactions['transactions'], color='teal')
  plt.xlabel('Transactions')
  plt.title('Top 10 Companies with Highest Transactions')
  plt.gca().invert_yaxis()  # Invert y-axis to have the highest values on top
  plt.grid(axis='x', linestyle='--', alpha=0.7)

  # Adding data labels
  for index, value in enumerate(top_10_transactions['transactions']):
      plt.text(value, index, f'{value:,}')  # Adding a thousands separator for readability
  plt.savefig('top_10_transactions.png')

# Generating Index Part of Artifact

@task
def extract_indices(db_fpath):
    extract_df = pd.read_json(db_fpath, lines=True)
    return extract_df

@task
def calculate_return(df):
    df['return'] = ((df['close'] - df['open']) / df['open']) * 100
    return df

@task
def visualize_returns(df):
    unique_tickers = df['ticker'].unique()
    plt.figure(figsize=(12, 8))
    plt.bar(unique_tickers, df.groupby('ticker')['return'].mean(), color='blue')
    plt.xlabel('Ticker')
    plt.ylabel('Return (%)')
    plt.title('Return Percentage for Market Indices')
    plt.xticks(rotation=45)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig('index_returns.png')

@task
def visualize_volume(df):
    plt.figure(figsize=(12, 8))
    plt.bar(df['ticker'], df['volume'], color='green')
    plt.xlabel('Ticker')
    plt.ylabel('Volume Traded (in 10s of millions)')
    plt.title('Volume Traded for Market Index ETFs')
    plt.xticks(rotation=45)
    plt.grid(axis='y', linestyle='--', alpha=0.7)
    plt.tight_layout()
    plt.savefig('index_volume.png')

# Function to save image as base64 string
@task
def save_image_base64(image_path):
    with open(image_path, "rb") as img_file:
        return base64.b64encode(img_file.read()).decode('utf-8')

# Create markdown artifact (summary report)
@task
def load_into_report(top_10_increases, top_10_decreases, top_10_transactions, df_returns):

  # Encode images as base64 strings
  top_10_increases_image = save_image_base64('top_10_increases.png')
  top_10_decreases_image = save_image_base64('top_10_decreases.png')
  top_10_transactions_image = save_image_base64('top_10_transactions.png')
  index_returns_image = save_image_base64('index_returns.png')
  index_volume_image = save_image_base64('index_volume.png')

  report_md = f"""
  # Daily Market Report
  ## Market Overview
  ### Index Returns


  ![Index Returns](data:image/png;base64,{index_returns_image})

  {df_returns.to_markdown()}

  ### Index Volume

  ![Index Volume](data:image/png;base64,{index_volume_image})

  ## Individual Stocks
  ### Stock Price Increases

  The top 10 stock increases were:

  {top_10_increases[['ticker', 'price_change']].to_markdown()}

  ![Top 10 Increases](data:image/png;base64,{top_10_increases_image})

  ### Stock Price Decreases

  The top 10 stock decreases were:

  {top_10_decreases[['ticker', 'price_change']].to_markdown()}

  ![Top 10 Decreases](data:image/png;base64,{top_10_decreases_image})

  ### Highest Transactions

  The top 10 companies with the highest transactions were:

  {top_10_transactions[['ticker', 'transactions']].to_markdown()}

  ![Top 10 Transactions](data:image/png;base64,{top_10_transactions_image})
  """
  create_markdown_artifact(
      markdown=report_md,
      key="stock-scrape-report",
      description="Stock Scraping Results")

@flow
def generate_report(db_fpath_stock = "extracted_stocks.jsonl", db_fpath_index = "extracted_indices.jsonl"):
  extracted_stocks = extract_stocks(db_fpath_stock)
  top_10_increases, top_10_decreases, top_10_transactions = topn_stocks(extracted_stocks)
  viz_top_10_increase(top_10_increases)
  viz_top_10_decrease(top_10_decreases)
  viz_top_10_transactions(top_10_transactions)
  extracted_indices = extract_indices(db_fpath_index)
  calculated_returns = calculate_return(extracted_indices)
  visualized_returns = visualize_returns(calculated_returns)
  visualized_volume = visualize_volume(extracted_indices)
  load_into_report(top_10_increases, top_10_decreases, top_10_transactions, calculated_returns)

generate_report()

if __name__ == "__main__":
    generate_report.serve(name="report-deployment",
                          tags=["onboarding"],
                          parameters={},
                          interval=86400)  # Set interval to 86400 seconds (24 hours)

Overwriting market_report_flow.py


In [77]:
%run market_report_flow.py

20:54:11.240 | INFO    | Flow run 'fuzzy-goose' - Downloading flow code from storage at '.'

        if __name__ == "__main__":
                generate_report()
20:54:12.741 | INFO    | Flow run 'fuzzy-goose' - Created task run 'extract_stocks-0' for task 'extract_stocks'
20:54:12.743 | INFO    | Flow run 'fuzzy-goose' - Executing 'extract_stocks-0' immediately...
20:54:13.066 | INFO    | Task run 'extract_stocks-0' - Finished in state Completed()
20:54:13.204 | INFO    | Flow run 'fuzzy-goose' - Created task run 'topn_stocks-0' for task 'topn_stocks'
20:54:13.205 | INFO    | Flow run 'fuzzy-goose' - Executing 'topn_stocks-0' immediately...
20:54:13.533 | INFO    | Task run 'topn_stocks-0' - Finished in state Completed()
20:54:13.962 | INFO    | Flow run 'fuzzy-goose' - Created task run 'add_labels-0' for task 'add_labels'
20:54:13.964 | INFO    | Flow run 'fuzzy-goose' - Executing 'add_labels-0' immediately...
20:54:14.235 | INFO    | Task run 'add_labels-0' - Finished in state Compl

KeyboardInterrupt: 

20:54:17.301 | ERROR   | Flow run 'fuzzy-goose' - Crash detected! Execution was aborted by an interrupt signal.
