# ETL for Bitcoin Data Past 5

Code for extracting bitcoin data and storing it to DB

## Step 1: Extracting and Parsing CSV Data

### Download and Import Dependencies

In [155]:
%pip install pandas
%pip install psycopg2-binary
%pip install dotenv

Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Collecting dotenv
  Downloading dotenv-0.9.9-py2.py3-none-any.whl.metadata (279 bytes)
Collecting python-dotenv (from dotenv)
  Using cached python_dotenv-1.1.1-py3-none-any.whl.metadata (24 kB)
Downloading dotenv-0.9.9-py2.py3-none-any.whl (1.9 kB)
Using cached python_dotenv-1.1.1-py3-none-any.whl (20 kB)
Installing collected packages: python-dotenv, dotenv
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [dotenv]
[1A[2KSuccessfully installed dotenv-0.9.9 python-dotenv-1.1.1
Note: you may need to restart the kernel to use updated packages.


In [71]:
import pandas as pd
import os
import psycopg2
from io import StringIO
import re
from datetime import datetime, time, timezone

FILENAME_PATTERN = re.compile(r"^btcusd-(\d{4})-(\d{2})-(\d{2})")



In [None]:
def parse_file_name(file_name):
    stem, ext = os.path.splitext(file_name)

    if (ext != ".csv"):
        raise Exception(f"{file_name} Input must be CSV.")
    
    if (not FILENAME_PATTERN.match(stem)):
        raise Exception(f"{file_name} must be a valid file name (btcusd-YYYY-MM-DD.csv).")

    return '-'.join(stem.split('-')[1:])


2012-01-13


In [None]:
def extract_csv_contents(folder_path, file_name):

    file_path = os.path.join(folder_path, file_name)

    df = None
    if (os.path.isfile(file_path)):
        try:
            date = parse_file_name(file_name)
            df = pd.read_csv(file_path).dropna(axis=0, how="any")
            df["timestamp"] = pd.to_datetime(date + " " + df["Time"])
            df = df[['timestamp', "Open", "High", "Low", "Close", "Volume_(BTC)", "Volume_(Currency)", "Weighted_Price"]]
        except Exception as e:
            print(f"Problem reading file {file_path} with error {e}.")
    return df

    
df = extract_csv_contents("dataset", "btcusd-2012-10-02.csv")

In [139]:

conn = psycopg2.connect(database="postgres",
                        host="localhost",
                        user="postgres",
                        password="password",
                        port="5432")
cursor = conn.cursor()

In [None]:

def insert_df_to_db(df, cursor):
    
    if (df is None or df.empty):
        raise Exception("Dataframe is empty or None.")
    try:
        buffer = StringIO()
        df.to_csv(buffer, index=False, header=False)
        buffer.seek(0)

        cursor.copy_expert("""
            COPY bitcoin_prices(
                timestamp, open, high, low, close, volume_btc, volume_usd, weighted_price
            )
            FROM STDIN WITH CSV
            ON CONFLICT DO NOTHING
        """, buffer)

    except Exception as e:
        print("Problem copying CSV data into DB")

In [None]:
print(cursor.fetchall())
conn.commit()
conn.close()
cursor.close()

[]


## Sort filenames by date

In [35]:
def sort_files_by_date(folder_path):
    files = [f for f in os.listdir(folder_path)]
    files.sort()
    return files

print(sort_files_by_date("dataset"))


['btcusd-2011-12-31.csv', 'btcusd-2012-01-01.csv', 'btcusd-2012-01-02.csv', 'btcusd-2012-01-03.csv', 'btcusd-2012-01-04.csv', 'btcusd-2012-01-05.csv', 'btcusd-2012-01-06.csv', 'btcusd-2012-01-07.csv', 'btcusd-2012-01-08.csv', 'btcusd-2012-01-09.csv', 'btcusd-2012-01-10.csv', 'btcusd-2012-01-11.csv', 'btcusd-2012-01-12.csv', 'btcusd-2012-01-13.csv', 'btcusd-2012-01-14.csv', 'btcusd-2012-01-15.csv', 'btcusd-2012-01-16.csv', 'btcusd-2012-01-17.csv', 'btcusd-2012-01-18.csv', 'btcusd-2012-01-19.csv', 'btcusd-2012-01-20.csv', 'btcusd-2012-01-21.csv', 'btcusd-2012-01-22.csv', 'btcusd-2012-01-23.csv', 'btcusd-2012-01-24.csv', 'btcusd-2012-01-25.csv', 'btcusd-2012-01-26.csv', 'btcusd-2012-01-27.csv', 'btcusd-2012-01-28.csv', 'btcusd-2012-01-29.csv', 'btcusd-2012-01-30.csv', 'btcusd-2012-01-31.csv', 'btcusd-2012-02-01.csv', 'btcusd-2012-02-02.csv', 'btcusd-2012-02-03.csv', 'btcusd-2012-02-04.csv', 'btcusd-2012-02-05.csv', 'btcusd-2012-02-06.csv', 'btcusd-2012-02-07.csv', 'btcusd-2012-02-08.csv',

In [None]:
def get_max_timestamp(cursor):
    timestamp = cursor.execute("SELECT MAX(timestamp) FROM bitcoin_prices;")
    if (timestamp is not None and timestamp.tzinfo is None):
        timestamp = timestamp.replace(tzinfo=timezone.utc)
    return cursor.fetchone()[0]

## Iteratively Extract files

In [102]:
def insert_all_data_to_db(folder_path, conn, cursor):
    if (conn.closed == 1):
        raise Exception("DB connection is closed")
    
    if (cursor.closed == 1):
        raise Exception("DB cursor is closed")
    
    file_names = sort_files_by_date(folder_path)
    max_timestamp = get_max_timestamp(cursor) or datetime.min.replace(tzinfo=timezone.utc)

    for file_name in file_names:
        try:
            date = parse_file_name(file_name)
            curr_file_timestamp = datetime.combine(datetime.strptime(date, '%Y-%m-%d').date(), time.max).replace(tzinfo=timezone.utc)
            if (max_timestamp >= curr_file_timestamp):
                print(f"Skipping {file_name}")
                continue
            print(f"Parsing {file_name}")
            df = extract_csv_contents(folder_path, file_name)
            insert_df_to_db(df, cursor)
            conn.commit()
        except Exception as e:
            print(f"Problem reading file {file_name} with error {e}.")

## Put it all together

In [None]:
insert_all_data_to_db("dataset", conn, cursor)


In [138]:
conn.close()
cursor.close()

## Pasting Directly into DB

In [None]:
create_temp_table = f"""CREATE TEMP TABLE temp_bitcoin_prices (
  time_text       text PRIMARY KEY,
  open            NUMERIC,
  high            NUMERIC,
  low             NUMERIC,
  close           NUMERIC,
  volume_btc      NUMERIC,
  volume_usd      NUMERIC,
  weighted_price  NUMERIC
) ON COMMIT DROP;"""
cursor.execute(create_temp_table)



In [None]:
with open("dataset/btcusd-2011-12-31.csv", 'r') as f:
                cursor.copy_expert(f"""
                    COPY temp_bitcoin_prices("time_text", "open", "high", "low", "close", "volume_btc", "volume_usd", "weighted_price")
                    FROM STDIN WITH (FORMAT csv, HEADER, NULL '');
                    """, f)

In [142]:
date_str = "2011-12-31"
insert_sql = f"""
    INSERT INTO bitcoin_prices (timestamp, "open", "high", "low", "close", "volume_btc", "volume_usd", "weighted_price")
    SELECT
        ('{date_str}'::date + (time_text::interval))::timestamptz,
        "open",
        "high",
        "low",
        "close",
        "volume_btc",
        "volume_usd",
        "weighted_price"
    FROM
        temp_bitcoin_prices
    WHERE
        "open" IS NOT NULL AND
        "high" IS NOT NULL AND
        "low" IS NOT NULL AND
        "close" IS NOT NULL AND
        "volume_btc" IS NOT NULL AND
        "volume_usd" IS NOT NULL AND
        "weighted_price" IS NOT NULL
    ON CONFLICT (timestamp) DO NOTHING;
    """
cursor.execute(insert_sql)

In [152]:
def copy_csv_into_db(file_name, date_str, cursor):
    # temp table make
    create_temp_table = f"""CREATE TEMP TABLE temp_bitcoin_prices (
        time_text       text PRIMARY KEY,
        open            NUMERIC,
        high            NUMERIC,
        low             NUMERIC,
        close           NUMERIC,
        volume_btc      NUMERIC,
        volume_usd      NUMERIC,
        weighted_price  NUMERIC
        ) ON COMMIT DROP;"""
    cursor.execute(create_temp_table)

    # copy csv to temp
    with open(f"dataset/{file_name}", 'r') as f:
        cursor.copy_expert(f"""
            COPY temp_bitcoin_prices("time_text", "open", "high", "low", "close", "volume_btc", "volume_usd", "weighted_price")
            FROM STDIN WITH (FORMAT csv, HEADER, NULL '');
            """, f)
    
    # paste modified temp to db
    insert_sql = f"""
    INSERT INTO bitcoin_prices (timestamp, "open", "high", "low", "close", "volume_btc", "volume_usd", "weighted_price")
    SELECT
        ('{date_str}'::date + (time_text::interval))::timestamptz,
        "open",
        "high",
        "low",
        "close",
        "volume_btc",
        "volume_usd",
        "weighted_price"
    FROM
        temp_bitcoin_prices
    WHERE
        "open" IS NOT NULL AND
        "high" IS NOT NULL AND
        "low" IS NOT NULL AND
        "close" IS NOT NULL AND
        "volume_btc" IS NOT NULL AND
        "volume_usd" IS NOT NULL AND
        "weighted_price" IS NOT NULL
    ON CONFLICT (timestamp) DO NOTHING;
    """

    cursor.execute(insert_sql)

In [144]:
conn.commit()

In [153]:
def copy_all_data_to_db(folder_path, conn, cursor):
    if (conn.closed == 1):
        raise Exception("DB connection is closed")
    
    if (cursor.closed == 1):
        raise Exception("DB cursor is closed")
    
    file_names = sort_files_by_date(folder_path)
    max_timestamp = get_max_timestamp(cursor) or datetime.min.replace(tzinfo=timezone.utc)

    for file_name in file_names:
        try:
            date = parse_file_name(file_name)
            curr_file_timestamp = datetime.combine(datetime.strptime(date, '%Y-%m-%d').date(), time.max).replace(tzinfo=timezone.utc)
            if (max_timestamp >= curr_file_timestamp):
                print(f"Skipping {file_name}")
                continue
            print(f"Parsing {file_name}")
            copy_csv_into_db(file_name, date, cursor)
            conn.commit()
        except Exception as e:
            print(f"Problem reading file {file_name} with error {e}.")
        break

In [154]:
copy_all_data_to_db("dataset", conn, cursor)

Parsing btcusd-2011-12-31.csv
