# INSY336 Social Speculation for Harnessing Reddit to Forecast Bitcoin Fluctuations

## ETL Implementation

In [1]:
# helpers functions
import requests
import requests.auth
import datetime


def convert_date(date: float)->str:
    """Convert numeric date to string date (YYYY-MM-DD)
    
    Args:
        date (float): date in numeric format
    
    Returns:
        date (str): date in YYYY-MM-DD format
    """
    
    date_time = datetime.datetime.fromtimestamp(date)
    return date_time.strftime("%Y-%m-%d")

def get_authheaders(client_id: str, client_secret: str, username: str, password: str)->dict:
    """Get authorization headers from Reddit API
    
    Args:
        client_id (str): client id from Reddit API
        client_secret (str): client secret from Reddit API
        username (str): Reddit username
        password (str): Reddit password

    Returns:
        dict: authorization headers
    """

    client_auth = requests.auth.HTTPBasicAuth(client_id, client_secret)
    post_data = {"grant_type": "password", "username": username, "password": password}
    headers = {"User-Agent": f"ChangeMeClient/0.1 by {username}"}

    response = requests.post("https://www.reddit.com/api/v1/access_token", auth=client_auth, data=post_data, headers=headers)

    token = response.json()['access_token']
    type = response.json()['token_type']
    auth_str = f"{type} {token}"

    return {"Authorization": auth_str, "User-Agent": f"ChangeMeClient/0.1 by {username}"}


In [2]:
# importing needed packages
import requests
from textblob import TextBlob
import sqlite3
import pandas as pd
import plotly.express as px

In [4]:
# get the authentication headers
from credentials import client_id, client_secret, username, password

headers = get_authheaders(client_id, client_secret, username, password)

In [5]:
# create a database connection to the SQLite database file reddit.db using the sqlite3 library
conn = sqlite3.connect('reddit.db')

In [6]:
def create_table(conn: sqlite3.Connection, table_name: str, schema: str)->None:
    """Drop and Create table in database

    Difficulty: Easy
    
    Args:
        conn (sqlite3.Connection): connection to database
        table_name (str): table name
        schema (str): table schema

    Returns:    
        None
    """

    # drop table if exists
    query_drop = f"drop table if exists {table_name}"
    conn.execute(query_drop)
    # create table
    query_create = f"create table {table_name}({schema})"
    conn.execute(query_create)
    # commit change
    conn.commit()


In [7]:
schema_posts = """
    id varchar(10),
    title text,
    score int,
    ups int,
    downs int,
    upvote_ratio float,
    url text,
    num_comments int,
    created float,
    body text,
    sentiment float,
    subjectivity float,
    primary key (id)
"""

create_table(conn, "posts", schema_posts)

In [8]:
def extract_reddit(headers: dict, subreddit: str, after: str=None, limit: int=100)->dict:
    """Get new posts from a subreddit using Reddit API

    Difficulty: Medium
    
    Args:
        headers (dict): authorization headers (must be passed along to API)
        subreddit (str): subreddit name to get posts from
        after (str): after post id (see Reddit API documentation)
        limit (int): number of posts to get (max 100)
    
    Returns:
        json response from Reddit API (dict)
    """

    url = f'https://oauth.reddit.com/r/{subreddit}/new'

    params = {
        'after' : after,
        'limit' : limit
    }

    output = requests.get(url, headers = headers, params = params)

    return output.json() 

In [9]:
def get_sentiment(text: str)->float:
    """Get sentiment score from text.
    The function uses TextBlob to calculate the sentiment score of the text if the text is not empty.

    Difficulty: Medium
    
    Args:
        text (str): text to analyze

    Returns:
        sentiment score (float)
    """

    if len(text) != 0:
        blob = TextBlob(text)
        return blob.sentiment.polarity

In [10]:
def get_subjectivity(text: str)->float:
    """Get subjectivity score from text
    The function uses TextBlob to calculate the subjectivity score of the text if the text is not empty.

    Difficulty: Medium

    Args:
        text (str): text to analyze

    Returns:
        subjectivity score (float)    
    """

    if len(text) != 0:
        blob = TextBlob(text)
        return blob.sentiment.subjectivity

In [11]:
def transform_reddit(response: dict)->list:
    """Prepare posts for insert into database from Reddit API response and return list of dicts.
    The output must match the schema of the table, including sentiment and subjectivity scores calculated from the body of the post.
    You may want to use the convert_date, get_sentiment, and get_subjectivity functions to process the data. You don't need to define convert_date() as it is already defined in helpers.py and imported above.

    Difficulty: Medium

    Args:
        response (dict): json response from Reddit API

    Returns:
        list of dicts
    """

    transformed_data = []    
    for element in response['data']['children']:
        data = element['data']
        id = data['id']
        title = data['title']
        score = data['score']
        ups = data['ups']
        downs = data['downs']
        upvote_ratio = data['upvote_ratio']
        url = data['url']
        num_comments = data['num_comments']
        created = convert_date(data['created'])
        body = data['selftext']
        sentiment = get_sentiment(body)
        subjectivity = get_subjectivity(body)
        record = {'id':id,'title':title,'score':score,'ups':ups,'downs':downs,'upvote_ratio':upvote_ratio,'url':url,'num_comments':num_comments,'created':created,'body':body,'sentiment':sentiment,'subjectivity':subjectivity}
        transformed_data.append(record)

    return transformed_data

In [12]:
def load_reddit(conn: sqlite3.Connection, data: list)->None:
    """Insert data into database. 
    The function takes a list of dicts as an argument, constructs insert queries with placeholders for the values, and executes the queries. The target table posts must already exist in the database.

    Difficulty: Easy

    Args:
        conn (sqlite3.Connection): connection to database
        data (list): list of dicts to insert into database

    Returns:
        None
    """
    for record in data:
        query = f"insert into posts values (:id,:title,:score,:ups,:downs,:upvote_ratio,:url,:num_comments,:created,:body,:sentiment,:subjectivity)"
        conn.execute(query, record)

In [13]:
def reddit_etl(conn: sqlite3.Connection, headers: dict, subreddit: str, n: int=6, limit: int=100)->None:
    """Extract, transform, and load reddit data into database
    The reddit API returns a maximum of 100 posts per request. To get more than 100 posts, you must make multiple requests with the after parameter which points to the last post id of the previous request.
    Do not set n to a value greater than 6 as the Reddit API may return duplicate posts.

    Difficulty: Hard
    
    Args:
        conn (sqlite3.Connection): connection to database
        headers (dict): authorization headers (must be passed along to API)
        subreddit (str): subreddit name to get posts from
        n (int): number of requests to make to Reddit API
        limit (int): number of posts to get per request (max 100)

    Returns:
        None
    """
    response = extract_reddit(headers, subreddit, limit=limit)
    data = transform_reddit(response)
    load_reddit(conn,data)
    for iters in range(1,n):
        response = extract_reddit(headers, subreddit, response['data']['after'], limit)
        data = transform_reddit(response)
        load_reddit(conn, data)

In [14]:
# ETL pipeline for the crypto data
schema_crypto = """
    symbol varchar(10),
    date varchar(10),
    close float,
    primary key(symbol, date)
"""

create_table(conn, 'crypto', schema_crypto)

In [15]:
def extract_crypto(symbol: str, key: str, market='EUR')->dict:
    """Get crypto prices from AlphaVantage API

    Difficulty: Easy

    Args:
        symbol (str): crypto symbol (e.g. BTC)
        key (str): API key
        market (str): market

    Returns:
        json response from AlphaVantage API (dict)    
    """
    url = 'https://www.alphavantage.co/query'

    params = {
        'function': 'DIGITAL_CURRENCY_DAILY',
        'symbol': symbol,
        'market': market,
        'apikey': key
    }

    output = requests.get(url, params)

    return output.json()

In [16]:
from credentials import key

response = extract_crypto("BTC", key, market='EUR')
response.keys()


dict_keys(['Meta Data', 'Time Series (Digital Currency Daily)'])

In [17]:
response['Time Series (Digital Currency Daily)']['2024-10-24']

{'1. open': '61803.03000000',
 '2. high': '63574.05000000',
 '3. low': '61629.62000000',
 '4. close': '62976.58000000',
 '5. volume': '313.65572182'}

In [18]:
def transform_crypto(response: dict)->list:
    """Prepare crypto response for insert into database and return list of dicts.
    The output must match the schema of the table.

    Difficulty: Easy
    
    Args:
        response (dict): json response from AlphaVantage API

    Returns:
        list of dicts
    """
    symbol = response['Meta Data']['2. Digital Currency Code']
    raw_data = response['Time Series (Digital Currency Daily)']
    data_output = []
    for date, value in raw_data.items():
        record = {'symbol': symbol,'date': date, 'close': value['4. close']}
        data_output.append(record)

    return data_output

In [19]:
def load_crypto(conn: sqlite3.Connection, data: list)->None:
    """Insert data into database
    The function takes a list of dicts as an argument, constructs insert queries with placeholders for the values, and executes the queries. The target table crypto must already exist in the database.

    Difficulty: Easy
    
    Args:
        conn (sqlite3.Connection): connection to database
        data (list): list of dicts to insert into database

    Returns:
        None
    """

    query = 'insert into crypto values (:symbol, :date, :close)'
    conn.executemany(query, data)

In [20]:
def crypto_etl(conn: sqlite3.Connection, symbol: str, key: str, market: str='EUR'):
    """Extract, transform, and load crypto data into database

    Difficulty: Easy
    
    Args:
        conn (sqlite3.Connection): connection to database
        symbol (str): crypto symbol (e.g. BTC)
        key (str): API key
        market (str): market

    Returns:
        None
    """
    response = extract_crypto(symbol, key, market)
    if len(response) != 0:
        data = transform_crypto(response)

    load_crypto(conn, data)

In [21]:
# Close connection
conn.close()

In [22]:
# Connect to the database
conn = sqlite3.connect("reddit.db")

In [23]:
# create table
create_table(conn, "posts", schema_posts)
create_table(conn, 'crypto', schema_crypto)

# run etl to insert data
subreddit = 'btc'
reddit_etl(conn, headers, subreddit, n=8)
symbol = 'BTC'
crypto_etl(conn, symbol, key)

In [None]:
# Close connection
conn.close()