# Project 3 EDA Notebook - Building SQL ETL Pipeline with ML Integrations

In [1]:
# !pip install duckdb==0.8.1
# !pip install yfinance==0.2.31 --quiet
# !pip install jupysql duckdb-engine --quiet
# !pip install toml --quiet



You should consider upgrading via the 'c:\users\owner\documents\anaconda package\python.exe -m pip install --upgrade pip' command.


In [2]:
import duckdb
import pytz
import pandas as pd
import numpy as np
import yfinance as yf
from datetime import datetime, timedelta

# %reload_ext sql

In [3]:
today = datetime.today()
date = today - timedelta(days=729)
print(f"730 days prior to today's date, {today} is the {date}")

730 days prior to today's date, 2023-10-12 20:30:18.417953 is the 2021-10-13 20:30:18.417953


In [4]:
# Getting historical market data
msft = yf.Ticker("MSFT")
hist = msft.history(period="1d", interval="1h", start=date, end=today)
# hist = msft.history(period="max")
print(hist.shape)
hist

(3502, 7)


Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits
Datetime,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
2021-10-14 09:30:00-04:00,299.209991,301.079987,297.829987,300.359985,6985730,0.0,0.0
2021-10-14 10:30:00-04:00,300.369995,300.700012,299.850006,300.619995,2472984,0.0,0.0
2021-10-14 11:30:00-04:00,300.619995,302.000000,300.250000,301.609985,2395688,0.0,0.0
2021-10-14 12:30:00-04:00,301.609985,301.839996,301.320007,301.500000,1919182,0.0,0.0
2021-10-14 13:30:00-04:00,301.510010,301.970001,301.135010,301.969910,1675775,0.0,0.0
...,...,...,...,...,...,...,...
2023-10-12 11:30:00-04:00,332.260010,332.829987,331.709991,332.570007,1645154,0.0,0.0
2023-10-12 12:30:00-04:00,332.589996,332.959991,329.760010,330.890015,2590785,0.0,0.0
2023-10-12 13:30:00-04:00,330.859985,331.070007,328.720001,329.209991,2228010,0.0,0.0
2023-10-12 14:30:00-04:00,329.199890,331.119995,329.029999,330.757111,1431656,0.0,0.0


In [5]:
hist_1986_2021 = msft.history(period="max", interval="1d", end=date)
print(hist_1986_2021.shape)
hist_1986_2021

(8971, 7)


Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1986-03-13 00:00:00-05:00,0.055004,0.063093,0.055004,0.060396,1031788800,0.0,0.0
1986-03-14 00:00:00-05:00,0.060396,0.063632,0.060396,0.062553,308160000,0.0,0.0
1986-03-17 00:00:00-05:00,0.062553,0.064172,0.062553,0.063632,133171200,0.0,0.0
1986-03-18 00:00:00-05:00,0.063632,0.064172,0.061475,0.062014,67766400,0.0,0.0
1986-03-19 00:00:00-05:00,0.062014,0.062553,0.060396,0.060936,47894400,0.0,0.0
...,...,...,...,...,...,...,...
2021-10-07 00:00:00-04:00,289.927478,291.361520,288.689919,289.603363,20430500,0.0,0.0
2021-10-08 00:00:00-04:00,290.948980,291.361520,288.532762,289.603363,17685700,0.0,0.0
2021-10-11 00:00:00-04:00,287.707667,292.667793,287.540679,288.994354,19298600,0.0,0.0
2021-10-12 00:00:00-04:00,290.084614,290.182840,287.147828,287.668396,17974100,0.0,0.0


In [6]:
msft_history = pd.concat([hist_1986_2021, hist])
print(msft_history.shape)
msft_history

(12473, 7)


Unnamed: 0,Open,High,Low,Close,Volume,Dividends,Stock Splits
1986-03-13 00:00:00-05:00,0.055004,0.063093,0.055004,0.060396,1031788800,0.0,0.0
1986-03-14 00:00:00-05:00,0.060396,0.063632,0.060396,0.062553,308160000,0.0,0.0
1986-03-17 00:00:00-05:00,0.062553,0.064172,0.062553,0.063632,133171200,0.0,0.0
1986-03-18 00:00:00-05:00,0.063632,0.064172,0.061475,0.062014,67766400,0.0,0.0
1986-03-19 00:00:00-05:00,0.062014,0.062553,0.060396,0.060936,47894400,0.0,0.0
...,...,...,...,...,...,...,...
2023-10-12 11:30:00-04:00,332.260010,332.829987,331.709991,332.570007,1645154,0.0,0.0
2023-10-12 12:30:00-04:00,332.589996,332.959991,329.760010,330.890015,2590785,0.0,0.0
2023-10-12 13:30:00-04:00,330.859985,331.070007,328.720001,329.209991,2228010,0.0,0.0
2023-10-12 14:30:00-04:00,329.199890,331.119995,329.029999,330.757111,1431656,0.0,0.0


In [7]:
msft_history = msft_history.rename_axis(["DateTime"]).reset_index().rename(columns={
    "DateTime": "datetime",
     "Open": "open",
     "High": "high",
     "Low": "low",
     "Close": "close",
     "Volume": "volume",
     "Dividends": "dividends",
     "Stock Splits": "stock_splits",
    })

In [8]:
msft_history.tail()

Unnamed: 0,datetime,open,high,low,close,volume,dividends,stock_splits
12465,2023-10-09 11:30:00-04:00,326.839996,328.459991,326.829987,327.820007,1732688,0.0,0.0
12466,2023-10-09 12:30:00-04:00,327.825012,328.75,327.549988,328.75,1236455,0.0,0.0
12467,2023-10-09 13:30:00-04:00,328.769989,330.299988,328.709991,329.959991,1821354,0.0,0.0
12468,2023-10-09 14:30:00-04:00,329.959991,330.149994,329.058502,329.5,1882691,0.0,0.0
12469,2023-10-09 15:30:00-04:00,329.480011,330.040009,329.209991,329.820007,1678565,0.0,0.0


### Trying SQL Queries

### Attaching local database to `MotherDuck` cloud

In [8]:
# Checking connection for next time

con = duckdb.connect('md:?motherduck_token=eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzZXNzaW9uIjoicnVpei5yaXZlcmE5My5nbWFpbC5jb20iLCJlbWFpbCI6InJ1aXoucml2ZXJhOTNAZ21haWwuY29tIiwidXNlcklkIjoiZGJiMmE2ZmYtMmZjNi00YjM2LTkzOGQtYzBmMzI5MWRlMWY4IiwiaWF0IjoxNjk2OTQ3NTYwLCJleHAiOjE3Mjg1MDUxNjB9.tPVEIxeMyT0pw8Qn3HBZrsb-f7RIdsFvTeIZrwmLyvg') 
# con.sql("ATTACH 'stock_data.duckdb'")
con.sql("SHOW DATABASES").show()

┌────────────────┐
│ database_name  │
│    varchar     │
├────────────────┤
│ my_db          │
│ sample_data    │
│ stocks_clouddb │
└────────────────┘



In [17]:
# con.sql("CREATE DATABASE stocks_clouddb FROM 'stock_data.duckdb'")
# con.sql("SHOW DATABASES").show()

### Refreshing data with the most recent data:

In [41]:
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzZXNzaW9uIjoicnVpei5yaXZlcmE5My5nbWFpbC5jb20iLCJlbWFpbCI6InJ1aXoucml2ZXJhOTNAZ21haWwuY29tIiwidXNlcklkIjoiZGJiMmE2ZmYtMmZjNi00YjM2LTkzOGQtYzBmMzI5MWRlMWY4IiwiaWF0IjoxNjk2OTQ3NTYwLCJleHAiOjE3Mjg1MDUxNjB9.tPVEIxeMyT0pw8Qn3HBZrsb-f7RIdsFvTeIZrwmLyvg"
ticker = "MSFT"

def refresh_data(token, ticker):
    """
    This function refreshes the MotherDuck cloud database by extracting stock prices at 2 min intervals from our latest entry 
    to the datetime value of when this function was activated.

    Parameters
    ----------
    token: str
        The token for connecting to the MotherDuck cloud instance.
    ticker: str
        The ticker symbol for the stock you want to pull data from
    """
    
    con = duckdb.connect(f"md:?motherduck_token={token}") 
    recent_day_query = """
    SELECT datetime
    FROM stocks_clouddb.msft_data
    ORDER BY datetime DESC
    LIMIT 1
    """

    latest_day = con.sql(recent_day_query).fetchall()[0][0]
    today = datetime.today()
    symbol = "MSFT"

    # Extracting the historical market data (i.e. latest_day = (latest_day - timedelta(days=5)))
    ticker = yf.Ticker(symbol)
    
    recent_data = ticker.history(period="1d", interval="2m", start=latest_day, end=today)
    recent_data = recent_data.rename_axis(["DateTime"]).reset_index().rename(columns={
            "DateTime": "datetime",
            "Open": "open_price",
            "High": "day_high",
            "Low": "day_low",
            "Close": "close_price",
            "Volume": "volume",
            "Dividends": "dividends",
            "Stock Splits": "stock_splits"
        })
    
    con.sql("INSERT INTO stocks_clouddb.msft_data SELECT * FROM recent_data")
    con.commit()
    con.close()   

In [23]:
recent_day_query = """
SELECT datetime
FROM stocks_clouddb.msft_data
ORDER BY datetime DESC
LIMIT 1
"""

latest_day = con.sql(recent_day_query).fetchall()[0][0]
print(type(latest_day))
print(latest_day)

<class 'datetime.datetime'>
2023-10-09 12:30:00


In [3]:
today = datetime.today()
symbol = "MSFT"

latest_day = "2000-01-10"
# Extracting the historical market data (i.e. latest_day = (latest_day - timedelta(days=5)))
ticker = yf.Ticker(symbol)
recent_data = ticker.history(period="1d", interval="2m", start=latest_day, end=today)
recent_data = recent_data.rename_axis(["DateTime"]).reset_index().rename(columns={
        "DateTime": "datetime",
        "Open": "open_price",
        "High": "day_high",
        "Low": "day_low",
        "Close": "close_price",
        "Volume": "volume",
        "Dividends": "dividends",
        "Stock Splits": "stock_splits"
    })

print(recent_data.shape)
recent_data

MSFT: 2m data not available for startTime=947480400 and endTime=1697327357. The requested range must be within the last 60 days.


(0, 7)


Unnamed: 0,datetime,open_price,day_high,day_low,close_price,Adj Close,volume


In [40]:
# Loading data into the stocks_clouddb.msft_data table in the Mother Duck cloud
### con.sql("SELECT * FROM recent_data")
### con.sql("SELECT * FROM stocks_clouddb.msft_data LIMIT 5")

# con.sql("INSERT INTO stocks_clouddb.msft_data SELECT * FROM recent_data")
# con.commit()
# con.close()

┌──────────────────────┬────────────────────┬────────────────────┬───┬────────┬───────────┬──────────────┐
│       datetime       │     open_price     │      day_high      │ … │ volume │ dividends │ stock_splits │
│ timestamp with tim…  │       double       │       double       │   │ int64  │  double   │    double    │
├──────────────────────┼────────────────────┼────────────────────┼───┼────────┼───────────┼──────────────┤
│ 2023-10-09 09:30:0…  │ 327.82501220703125 │       328.06640625 │ … │  25883 │       0.0 │          0.0 │
│ 2023-10-09 09:32:0…  │ 328.07000732421875 │  328.2298889160156 │ … │  49058 │       0.0 │          0.0 │
│ 2023-10-09 09:34:0…  │  328.0299987792969 │   328.239990234375 │ … │  35785 │       0.0 │          0.0 │
│ 2023-10-09 09:36:0…  │ 327.97698974609375 │ 328.05999755859375 │ … │  35046 │       0.0 │          0.0 │
│ 2023-10-09 09:38:0…  │  327.9599914550781 │  328.1300048828125 │ … │  47758 │       0.0 │          0.0 │
│ 2023-10-09 09:40:0…  │ 328.09210205

In [7]:
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJzZXNzaW9uIjoicnVpei5yaXZlcmE5My5nbWFpbC5jb20iLCJlbWFpbCI6InJ1aXoucml2ZXJhOTNAZ21haWwuY29tIiwidXNlcklkIjoiZGJiMmE2ZmYtMmZjNi00YjM2LTkzOGQtYzBmMzI5MWRlMWY4IiwiaWF0IjoxNjk2OTQ3NTYwLCJleHAiOjE3Mjg1MDUxNjB9.tPVEIxeMyT0pw8Qn3HBZrsb-f7RIdsFvTeIZrwmLyvg"
share_token = "1e70690c-4ff5-4979-9354-3f85db380eca"

con = duckdb.connect(f"md:?motherduck_token={token}") 
# con.sql("ATTACH 'stock_data.duckdb'")
con.sql("SHOW DATABASES").show()

┌────────────────┐
│ database_name  │
│    varchar     │
├────────────────┤
│ my_db          │
│ sample_data    │
│ stocks_clouddb │
└────────────────┘



In [9]:
df = con.sql("SELECT * FROM stocks_clouddb.msft_data").fetchdf().copy()
df.tail()

Unnamed: 0,datetime,open_price,day_high,day_low,close_price,volume,dividends,stock_splits
12465,2023-10-09 08:30:00,326.839996,328.459991,326.829987,327.820007,1732688,0.0,0.0
12466,2023-10-09 09:30:00,327.825012,328.75,327.549988,328.75,1236455,0.0,0.0
12467,2023-10-09 10:30:00,328.769989,330.299988,328.709991,329.959991,1821354,0.0,0.0
12468,2023-10-09 11:30:00,329.959991,330.149994,329.058502,329.5,1882691,0.0,0.0
12469,2023-10-09 12:30:00,329.480011,330.040009,329.209991,329.820007,1678565,0.0,0.0


┌───────────────┐
│ database_name │
│    varchar    │
├───────────────┤
│ my_db         │
│ sample_data   │
│ stock_data    │
└───────────────┘



In [12]:
print(con.sql("""
SELECT *
FROM stock_data.msft_data
ORDER BY datetime DESC
LIMIT 5;
"""))

┌─────────────────────┬────────────────────┬────────────────────┬───┬─────────┬───────────┬──────────────┐
│      datetime       │     open_price     │      day_high      │ … │ volume  │ dividends │ stock_splits │
│      timestamp      │       double       │       double       │   │  int32  │  double   │    float     │
├─────────────────────┼────────────────────┼────────────────────┼───┼─────────┼───────────┼──────────────┤
│ 2023-10-09 12:30:00 │  329.4800109863281 │  330.0400085449219 │ … │ 1678565 │       0.0 │          0.0 │
│ 2023-10-09 11:30:00 │  329.9599914550781 │  330.1499938964844 │ … │ 1882691 │       0.0 │          0.0 │
│ 2023-10-09 10:30:00 │  328.7699890136719 │ 330.29998779296875 │ … │ 1821354 │       0.0 │          0.0 │
│ 2023-10-09 09:30:00 │ 327.82501220703125 │             328.75 │ … │ 1236455 │       0.0 │          0.0 │
│ 2023-10-09 08:30:00 │  326.8399963378906 │  328.4599914550781 │ … │ 1732688 │       0.0 │          0.0 │
├─────────────────────┴──────────────

In [13]:
 # open the local db
local_con = duckdb.connect("stock_data.duckdb") 

# load the motherduck extension
local_con.sql("LOAD motherduck")

# The from indicates the file to upload. An empty path indicates the current database 
local_con.sql("CREATE DATABASE stocks_clouddb FROM CURRENT_DATABASE()") 

# local_con = duckdb.connect("md:")
# local_con.sql("CREATE DATABASE cloud_stocks_db FROM 'stock_data.duckdb'")

IOException: IO Error: Cannot open file "c:\users\owner\documents\github\sql-etl-with-ml\src\etl\stock_data.duckdb": The process cannot access the file because it is being used by another process.


In [3]:
print(conn.sql("""
SHOW TABLES;
"""))

┌───────────┐
│   name    │
│  varchar  │
├───────────┤
│ msft_data │
└───────────┘



In [7]:
# %%sql

# SELECT *
# FROM msft_data
# LIMIT 5;

print(conn.sql("""
SELECT *
FROM msft_data
ORDER BY datetime DESC
LIMIT 5;
"""))

┌─────────────────────┬────────────────────┬────────────────────┬───┬─────────┬───────────┬──────────────┐
│      datetime       │     open_price     │      day_high      │ … │ volume  │ dividends │ stock_splits │
│      timestamp      │       double       │       double       │   │  int32  │  double   │    float     │
├─────────────────────┼────────────────────┼────────────────────┼───┼─────────┼───────────┼──────────────┤
│ 2023-10-09 12:30:00 │  329.4800109863281 │  330.0400085449219 │ … │ 1678565 │       0.0 │          0.0 │
│ 2023-10-09 11:30:00 │  329.9599914550781 │  330.1499938964844 │ … │ 1882691 │       0.0 │          0.0 │
│ 2023-10-09 10:30:00 │  328.7699890136719 │ 330.29998779296875 │ … │ 1821354 │       0.0 │          0.0 │
│ 2023-10-09 09:30:00 │ 327.82501220703125 │             328.75 │ … │ 1236455 │       0.0 │          0.0 │
│ 2023-10-09 08:30:00 │  326.8399963378906 │  328.4599914550781 │ … │ 1732688 │       0.0 │          0.0 │
├─────────────────────┴──────────────

### Building a DAG Data Pipeline using Airflow:

In [3]:
import sys
print(sys.version)

3.7.3 (default, Mar 27 2019, 17:13:21) [MSC v.1915 64 bit (AMD64)]


In [1]:
!pip install apache-airflow==2.7.2

ERROR: Ignored the following versions that require a different python version: 2.7.0 Requires-Python ~=3.8; 2.7.0b1 Requires-Python ~=3.8; 2.7.0rc1 Requires-Python ~=3.8; 2.7.0rc2 Requires-Python ~=3.8; 2.7.1 Requires-Python ~=3.8; 2.7.1rc1 Requires-Python ~=3.8; 2.7.1rc2 Requires-Python ~=3.8; 2.7.2 Requires-Python ~=3.8; 2.7.2rc1 Requires-Python ~=3.8
ERROR: Could not find a version that satisfies the requirement apache-airflow==2.7.2 (from versions: 1.10.9-bin, 1.8.1, 1.8.2rc1, 1.8.2, 1.9.0, 1.10.0, 1.10.1b1, 1.10.1rc2, 1.10.1, 1.10.2b2, 1.10.2rc1, 1.10.2rc2, 1.10.2rc3, 1.10.2, 1.10.3b1, 1.10.3b2, 1.10.3rc1, 1.10.3rc2, 1.10.3, 1.10.4b2, 1.10.4rc1, 1.10.4rc2, 1.10.4rc3, 1.10.4rc4, 1.10.4rc5, 1.10.4, 1.10.5rc1, 1.10.5, 1.10.6rc1, 1.10.6rc2, 1.10.6, 1.10.7rc1, 1.10.7rc2, 1.10.7rc3, 1.10.7, 1.10.8rc1, 1.10.8, 1.10.9rc1, 1.10.9, 1.10.10rc1, 1.10.10rc2, 1.10.10rc3, 1.10.10rc4, 1.10.10rc5, 1.10.10, 1.10.11rc1, 1.10.11rc2, 1.10.11, 1.10.12rc1, 1.10.12rc2, 1.10.12rc3, 1.10.12rc4, 1.10.12, 1.

In [16]:
from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta

ModuleNotFoundError: No module named 'airflow'

In [None]:
# Your script content here
script_content = """
import duckdb
import pandas as pd
import yfinance as yf
from datetime import datetime

token = "your_token_here"
ticker = "MSFT"

def refresh_data(token, ticker):
    con = duckdb.connect(f"md:?motherduck_token={token}") 
    recent_day_query = """
    SELECT datetime
    FROM stocks_clouddb.msft_data
    ORDER BY datetime DESC
    LIMIT 1
    """

    latest_day = con.sql(recent_day_query).fetchall()[0][0]
    today = datetime.today()

    # Extracting the historical market data (i.e. latest_day = (latest_day - timedelta(days=5)))
    ticker = yf.Ticker(symbol)
    recent_data = ticker.history(period="1d", interval="2m", start=latest_day, end=today)
    recent_data = recent_data.rename_axis(["DateTime"]).reset_index().rename(columns={
            "DateTime": "datetime",
            "Open": "open_price",
            "High": "day_high",
            "Low": "day_low",
            "Close": "close_price",
            "Volume": "volume",
            "Dividends": "dividends",
            "Stock Splits": "stock_splits"
        })
    
    con.sql("INSERT INTO stocks_clouddb.msft_data SELECT * FROM recent_data")
    con.commit()
    con.close()  

if __name__ == "__main__":
    refresh_data(token, ticker)
"""

# Define default_args dictionary to specify default parameters for the DAG
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,
    'start_date': datetime(2023, 10, 13),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),
}

# Define the DAG
dag = DAG(
    'refresh_msft_data',
    default_args=default_args,
    description='Refresh MSFT data every Friday',
    schedule_interval='@weekly',
)

# Define the PythonOperator task
refresh_task = PythonOperator(
    task_id='refresh_msft_data_task',
    python_callable=lambda: exec(script_content),
    dag=dag,
)

# Set task dependencies
refresh_task

if __name__ == "__main__":
    dag.cli()
