<h1>MSIN0166: Data Engineering - Group Cousework</h1>

Title: State of the Crypto: Cryptocurrency Trends, Economic Correlations, and Sentiment<br/>
Authors: Tuhan Sapumanage, Sherlock Pi, Filippo Chen, Alifiya Kanpurwala

## Setting Up

<div style="background-color: #cce5ff; padding: 10px; border-radius: 5px;">
    <strong>Note:</strong> Environment variables are stored securely in the GitHub repository under Codespaces secrets.
</div>

### Libraries

In [7]:
pip install yfinance

Note: you may need to restart the kernel to use updated packages.


In [8]:
from sqlalchemy import create_engine
from pymongo import MongoClient
import pandas as pd
import yfinance as yf
import requests

### Getting Env Variables

In [1]:
pip install python-dotenv

Collecting python-dotenv
  Using cached python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB)
Using cached python_dotenv-1.0.1-py3-none-any.whl (19 kB)
Installing collected packages: python-dotenv
Successfully installed python-dotenv-1.0.1
Note: you may need to restart the kernel to use updated packages.


In [3]:
from dotenv import load_dotenv
import os

# Load environment variables from .env file
load_dotenv()

### Setting up PostgreSQL

In [9]:
import os
import sqlalchemy
from sqlalchemy import create_engine

# Create PostgreSQL connection string
pg_conn_string = f"postgresql+psycopg2://{os.getenv('POSTGRES_USERNAME')}:{os.getenv('POSTGRES_PASSWORD')}@{"uclba-de25v2.cluster-cowglvndjvxv.eu-west-2.rds.amazonaws.com"}:{5432}/{"postgres"}"

# Create the SQLAlchemy engine
pg_engine = create_engine(pg_conn_string, connect_args={"options": f"-c search_path={os.getenv('POSTGRES_SCHEMA')}"})

### Setting up MongoDB

In [10]:
from pymongo.mongo_client import MongoClient
from pymongo.server_api import ServerApi

uri = "mongodb+srv://ucl-de:"+os.getenv('MONGODB_PASSWORD')+"@ucl-de.auvdj.mongodb.net/?retryWrites=true&w=majority&appName=ucl-de"

# Create a new client and connect to the server
client = MongoClient(uri, server_api=ServerApi('1'))
db = client['ucl-de']

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")
except Exception as e:
    print(e)

Pinged your deployment. You successfully connected to MongoDB!


## Ingestion

### Crypto Data - NoSQL/JSON

<div style="background-color: #cce5ff; padding: 10px; border-radius: 5px;">
    <strong>Note:</strong> Only five (5) requests a minute so two loops for 10. Wait 60 seconds before executing 'coin_names_2'.
</div>

In [11]:
def get_coin_data(coin_names):
    for coin_name in coin_names:
        # Base URL for Polygon.io API
        url = 'https://api.polygon.io/v2/aggs/ticker/X:'+coin_name+'USD/range/1/day/2024-01-01/2024-12-31'
        
        # Send a GET request to the API
        response = requests.get(url, params={'apiKey': os.getenv('POLYGON_API_KEY')})

        # Check if the request was successful
        if response.status_code == 200:
            data = response.json()
            print(coin_name)
            print(data['results'][0])
            # Extract the 'results' JSON and rename fields
            data_to_save = [{
                'volume': result['v'],
                'volume_weighted': result['vw'],
                'open': result['o'],
                'close': result['c'],
                'high': result['h'],
                'low': result['l'],
                'timestamp': pd.to_datetime(result['t'], unit='ms'),
                'num_trades': result['n'],
                'coin_name': coin_name  # Add the coin_name field
            } for result in data['results']]     

            # Connect to MongoDB and insert the documents
            collection = db['crypto_data']
            collection.insert_many(data_to_save)
            print("Documents inserted successfully.")
        else:
            print(f"Error fetching data: {response.status_code}")

In [18]:
coin_names_1 = {"BTC", "ETH", "XRP", "SOL", "DOGE"}
get_coin_data(coin_names_1)

SOL
{'v': 1626237.539067202, 'vw': 105.8227, 'o': 101.7, 'c': 110.11, 'h': 110.44, 'l': 101.47, 't': 1704067200000, 'n': 108681}
Documents inserted successfully.
DOGE
{'v': 151978174.98105866, 'vw': 0.0907, 'o': 0.0894535, 'c': 0.09202, 'h': 0.092192, 'l': 0.0885, 't': 1704067200000, 'n': 22997}
Documents inserted successfully.
XRP
{'v': 37059909.13826048, 'vw': 0.6202, 'o': 0.6154, 'c': 0.6299, 'h': 0.63151, 'l': 0.60817, 't': 1704067200000, 'n': 40192}
Documents inserted successfully.
BTC
{'v': 11571.132272252362, 'vw': 43114.6615, 'o': 42241.1, 'c': 44220.78, 'h': 44240.8, 'l': 42175.65, 't': 1704067200000, 'n': 219298}
Documents inserted successfully.
ETH
{'v': 83717.80414219313, 'vw': 2308.4244, 'o': 2281.03, 'c': 2352.76, 'h': 2353.33, 'l': 2264.86, 't': 1704067200000, 'n': 138648}
Documents inserted successfully.


In [13]:
coin_names_2 = {"ADA", "TRX", "XLM", "AVAX", "LINK"}
get_coin_data(coin_names_2)

LINK
{'v': 1729847.2418694491, 'vw': 15.2438, 'o': 14.934, 'c': 15.56, 'h': 15.585, 'l': 14.812, 't': 1704067200000, 'n': 35351}
Documents inserted successfully.


### Stock Market Data - SQL/DF

In [21]:
# Getting Stock Data for S&P ETF & Top 10 Companies for 2024-01-01 to 2024-12-31
# Define the ticker symbols
tickers = ['SPY', 'AAPL', 'MSFT', 'NVDA', 'AMZN', 'META', 'GOOGL', 'TSLA', 'AVGO', 'GOOG', 'BRK-B']

# Download historical data for 2024
df_stock = yf.download(tickers, start="2024-01-01", end="2024-12-31")

# Reset the index to make the date a column and flatten the MultiIndex columns
df_stock.columns = ['_'.join(col).strip() for col in df_stock.columns.to_flat_index()]
df_stock.reset_index(inplace=True)

# Unpivot the DataFrame while keeping columns for 'Date' and 'Symbol'
df_stock = df_stock.melt(id_vars=['Date'], var_name='Symbol_Price', value_name='Value')

# Extract 'Symbol' and 'Price' from the 'Symbol_Price' column
df_stock[['Price', 'Symbol']] = df_stock['Symbol_Price'].str.split('_', expand=True)

# Drop the 'Symbol_Price' column
df_stock.drop(columns=['Symbol_Price'], inplace=True)

# Pivot the table to create a wide format with columns for High, Low, Open, Close, Volume
df_stock = df_stock.pivot_table(index=['Date', 'Symbol'], columns='Price', values='Value', aggfunc='first')

# Reset the index to flatten the result
df_stock.reset_index(inplace=True)

# Save the DataFrame to PostgreSQL as 'stock_data' table
df_stock.to_sql('stock_data', pg_engine, if_exists='replace', index=False)

# Display the first few rows
df_stock.head()

YF.download() has changed argument auto_adjust default to True


[*********************100%***********************]  11 of 11 completed


Price,Date,Symbol,Close,High,Low,Open,Volume
0,2024-01-02,AAPL,184.532089,187.315382,182.792533,186.033072,82488700.0
1,2024-01-02,AMZN,149.929993,152.380005,148.389999,151.539993,47339400.0
2,2024-01-02,AVGO,107.095505,108.735411,106.27752,107.760543,28831000.0
3,2024-01-02,BRK-B,362.459991,362.570007,355.940002,356.320007,4737000.0
4,2024-01-02,GOOG,138.902084,139.952119,137.090672,138.941904,20071900.0


### Crypto Posts - Web Scraping

<div style="background-color: #cce5ff; padding: 10px; border-radius: 5px;">
    <strong>Note:</strong> The code block below takes a considerable amount of time to execute (approximately 10–15 minutes). To streamline the workflow, we pre-executed the code and saved a snapshot of the results as a JSON file (method 2, below). This snapshot can be loaded and used directly; however, if re-execution is required, the code can simply be uncommented and run.<br/><br/>Both a modular and an automated version of this process are available in the GitHub repository, titled web_scrape.ipynb and web_scrape_100_most_recent.ipynb, respectively (with further context provided in Section 2.3, Scalability and Automation Consideration, of this report).
</div>

#### Method 1 - Get New

In [None]:
# pip install selenium

In [None]:
# pip install webdriver_manager

In [30]:
# import time
# import pandas as pd
# from datetime import datetime
# from selenium import webdriver
# from selenium.webdriver.chrome.options import Options
# from selenium.webdriver.chrome.service import Service
# from webdriver_manager.chrome import ChromeDriverManager
# from selenium.webdriver.common.by import By
# from selenium.common.exceptions import NoSuchElementException

In [31]:
# def parse_timestamp(timestamp_str):
#     """
#     Parse a timestamp string (e.g., "January 02, 2024, 02:36:13 PM")
#     into a datetime object. Adjust the format string if needed.
#     """
#     try:
#         return datetime.strptime(timestamp_str, "%B %d, %Y, %I:%M:%S %p")
#     except Exception:
#         return None

In [32]:
# def scrape_topic_posts(driver, topic_url, subject_title, subsection):
#     """
#     Given a topic URL, visits the page (with ;all appended) and extracts all posts.
#     For each post, extracts the timestamp and post content using the provided Relative XPath.
#     Returns a list of dictionaries with keys: 'timestamp', 'post', 'subject', 'subsection'.
#     """
#     full_url = topic_url
#     driver.get(full_url)
#     time.sleep(3)  # Allow time for the page to load

#     topic_posts = []
#     # Locate all post containers within #quickModForm
#     posts = driver.find_elements(
#         "xpath", "//*[@id='quickModForm']/table[contains(@class,'bordercolor')]/tbody/tr"
#     )

#     if not posts:
#         print("No posts found on topic page:", full_url)

#     for p in posts:
#         try:
#             # Extract timestamp using the provided Relative XPath
#             timestamp_text = p.find_element(
#                 "xpath", ".//td[2]/table/tbody/tr/td[2]/div[2]"
#             ).text.strip()
#             timestamp_obj = datetime.strptime(timestamp_text, "%B %d, %Y, %I:%M:%S %p")
#             timestamp_text = timestamp_obj.strftime("%Y-%m-%d")
#         except NoSuchElementException:
#             timestamp_text = "No timestamp"

#         try:
#             # Extract post content using a more precise XPath
#             post_text = p.find_element(
#                 "xpath", ".//td[2]/div[@class='post']"
#             ).text.strip()
#         except NoSuchElementException:
#             post_text = "No content"

#         topic_posts.append({
#             "timestamp": timestamp_text,
#             "post": post_text,
#             "subject": subject_title,
#             "subsection": subsection
#         })

#     return topic_posts

In [33]:
# def scrape_subsection_topics(num_pages, board_prefix, start_subject_num, start_date_str, 
#                              end_date_str, keywords):
#     """
#     Scrapes topics from a specific board (subsection) page that meet criteria:
#       - The topic's last post timestamp is within the specified date range.
#       - The topic title contains the given keyword (case-insensitive).
    
#     For each matching topic, the function visits the topic's URL (with ;all appended)
#     and extracts all posts (timestamp and post content).
#     """
#     start_date = datetime.strptime(start_date_str, "%Y-%m-%d")
#     end_date = datetime.strptime(end_date_str, "%Y-%m-%d")
    
#     # Configure headless Chrome browser
#     chrome_options = Options()
#     chrome_options.add_argument("--headless")
#     driver = webdriver.Chrome(
#         service=Service(ChromeDriverManager().install()),
#         options=chrome_options
#     )
    
#     all_posts = []
    
#     # Loop over board pages. Each page URL is formed using the subject_start_num (which increments by 40).
#     for page in range(num_pages):
#         subject_start_num = start_subject_num + page * 40
#         board_url = f"{board_prefix}{subject_start_num}"
#         print(f"Accessing board page: {board_url}")
#         driver.get(board_url)
#         time.sleep(3)
        
#         # Extract topic rows from the board page.
#         # The CSS selectors here assume that each topic is in a table row within a table with class "bordercolor".
#         topic_rows = driver.find_elements(By.CSS_SELECTOR, "#bodyarea table.bordercolor tr")
        
#         for row in topic_rows:
#             try:
#                 # Locate the topic link element
#                 link = row.find_element(By.CSS_SELECTOR, "td a[href*='topic=']")
#                 title = link.text.strip()
#                 topic_url = link.get_attribute("href")
                
#                 # Locate the element that holds the last post timestamp.
#                 last_post_elem = row.find_element(By.CSS_SELECTOR, "td.windowbg2.lastpostcol span")
#                 # Extract the timestamp string (taking only the first line)
#                 last_post_text = last_post_elem.text.strip().split("\n")[0]
#                 topic_last_post = parse_timestamp(last_post_text)
#                 if topic_last_post is None:
#                     continue
                
#                 # Filter topics by date range
#                 if not (start_date <= topic_last_post <= end_date):
#                     continue
                
#                 # Filter by keyword (case-insensitive)
#                 #if keyword.lower() not in title.lower():
#                 #    subsection = keyword
#                 #    continue
#                 matched_coin = None
#                 for abbr, full_name in keywords.items():
#                     if abbr.lower() in title.lower() or full_name.lower() in title.lower():
#                         matched_coin = abbr  # Assign the abbreviation (e.g., "BTC")
#                         break  # Stop checking after the first match
                
#                 # If no keyword match, skip this topic
#                 if not matched_coin:
#                     continue
                
#                 subsection = matched_coin
                
#                 # Here, we set the subsection name. You might extract this differently.
#                 print(f"Scraping topic: {title} | Last post: {last_post_text}")
                
#                 # Scrape all posts from the topic
#                 topic_posts = scrape_topic_posts(driver, topic_url, title, subsection)
#                 all_posts.extend(topic_posts)
#             except Exception as e:
#                 # If any error occurs in processing a row, skip it.
#                 continue
    
#     driver.quit()
#     df = pd.DataFrame(all_posts)
#     return df

In [None]:
# bitcoin = {
#     "BTC": "Bitcoin"
# }

# df_bitcoin_posts = scrape_subsection_topics(num_pages=50,
#                                            board_prefix="https://bitcointalk.org/index.php?board=1.",
#                                            start_subject_num=380, 
#                                            start_date_str="2024-01-01", 
#                                            end_date_str="2024-12-31", 
#                                            keywords=bitcoin)

In [None]:
# df_bitcoin_posts.info()
# df_bitcoin_posts.head()

In [None]:
# altcoin_namelist = {
#     "ETH": "Ethereum",
#     "XRP": "Ripple",
#     "SOL": "Solana",
#     "DOGE": "Dogecoin",
#     "ADA": "Cardano",
#     "TRX": "TRON",
#     "XLM": "Stellar",
#     "AVAX": "Avalanche",
#     "SHIB": "Shiba Inu"
# }

# df_altcoin_posts = scrape_subsection_topics(num_pages=50, 
#                                            board_prefix="https://bitcointalk.org/index.php?board=67.",
#                                            start_subject_num=400, 
#                                            start_date_str="2024-01-01", 
#                                            end_date_str="2024-12-31", 
#                                            keywords=altcoin_namelist)  # Pass as a dictionary

In [None]:
# df_altcoin_posts.info()
# df_altcoin_posts.head()

In [None]:
# # combine dataframes
# df_crypto_posts = pd.concat([df_bitcoin_posts, df_altcoin_posts], ignore_index=True)

In [None]:
# pip install textblob
# pip install re

In [None]:
# import pandas as pd
# import re
# from textblob import TextBlob

# # Function to clean text
# def clean_text(text):
#     # Remove quoted messages (common in forum posts)
#     text = re.sub(r'Quote from: .*?\n', '', text, flags=re.DOTALL)
    
#     # Remove extra newlines and multiple spaces
#     text = re.sub(r'\s+', ' ', text).strip()
    
#     return text

# # Function to compute sentiment polarity and subjectivity
# def compute_sentiment(text):
#     blob = TextBlob(text)
#     return blob.sentiment.polarity, blob.sentiment.subjectivity

In [None]:
# # Load DataFrame
# df_crypto_posts['cleaned_post'] = df_crypto_posts['post'].apply(clean_text)

# # Apply sentiment analysis on cleaned text
# df_crypto_posts[['polarity', 'subjectivity']] = df_crypto_posts['cleaned_post'].apply(
#     lambda txt: pd.Series(compute_sentiment(txt))
# )

In [None]:
# # Display the updated DataFrame
# df_crypto_posts.info()
# print(df_crypto_posts.head())

# # Save to json
# df_crypto_posts.to_json("crypto_posts.json", index=False)

#### Method 2 - Use Snapshot in JSON

In [35]:
import json

# Open and read the JSON file
with open('crypto_posts.json', 'r') as file:
    data = json.load(file)

# Initialize an empty list to store the formatted data
crypto_posts = []

# Get the keys of the timestamp and post data dynamically
timestamp_keys = data.get('timestamp', {}).keys()
post_keys = data.get('post', {}).keys()

# We will assume both 'timestamp' and 'post' have the same keys
# Loop through all available keys
for key in timestamp_keys:
    # Dynamically create a dictionary combining values from timestamp and post for each key
    crypto_posts.append({
        **{field: data[field].get(key, '') for field in data if isinstance(data[field], dict)},
    })

# Print the structured output
# print(json.dumps(crypto_posts, indent=4))

#### Loading results to MongoDB

In [38]:
# Set up MongoDB connection
# Other parts already set above
collection = db['crypto_posts']  # Replace with your collection name

# Insert the data into the collection
collection.insert_many(crypto_posts)

# Check if data is saved by querying the collection
saved_data = collection.find().limit(5)  # Get the first 5 documents for verification

# Print out the saved data
print("Data successfully saved. Sample:")
print(saved_data[1])

{'_id': ObjectId('67d2be0acf26c16f9920ded5'), 'timestamp': '2024-11-05', 'post': "Bitcoin has indeed exceeded my expectations. Not only its price has reached too far, but also its popularity globally amazes everyone. No one would thought in the first place that bitcoin will be this hyped and trending, but we all know the excellent potentials of bitcoin, so it's not surprising at all. There's still a lot of positive updates await for bitcoin, and we as bitcoiners, should always look forward for a brighter future waiting ahead of us.", 'subject': 'Has Bitcoin met your expectations from the time you bought your Bitcoin till now', 'subsection': 'BTC', 'cleaned_post': "Bitcoin has indeed exceeded my expectations. Not only its price has reached too far, but also its popularity globally amazes everyone. No one would thought in the first place that bitcoin will be this hyped and trending, but we all know the excellent potentials of bitcoin, so it's not surprising at all. There's still a lot of

## Transformation & Loading - Setting Up

### Setting up PySpark

In [4]:
import os
import pandas as pd
from pyspark.sql import SparkSession

mongo_package_name = "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1"
postgress_package_name = "org.postgresql:postgresql:42.3.2"

In [5]:
spark = SparkSession.builder \
    .config("spark.jars.packages", f"{postgress_package_name},{mongo_package_name}") \
    .appName("PostgresMongo") \
    .getOrCreate()

:: loading settings :: url = jar:file:/usr/local/spark-3.5.4-bin-hadoop3/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/jovyan/.ivy2/cache
The jars for the packages stored in: /home/jovyan/.ivy2/jars
org.postgresql#postgresql added as a dependency
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-0b7406a4-ae5f-4890-8257-e71490d46712;1.0
	confs: [default]
	found org.postgresql#postgresql;42.3.2 in central
	found org.checkerframework#checker-qual;3.5.0 in central
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 706ms :: artifacts dl 76ms
	:: modules in use:
	org.checkerframework#checker-qual;3.5.0 from central in [default]
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [

### Getting from MongoDB - Crypto Data

In [6]:
df_crypto = spark.read.format("mongo").option("uri", "mongodb+srv://ucl-de:"+os.getenv('MONGODB_PASSWORD')+"@ucl-de.auvdj.mongodb.net/ucl-de.crypto_data?retryWrites=true&w=majority").load()
df_crypto = df_crypto.drop("_id")

# Remove duplicate rows
df_crypto = df_crypto.dropDuplicates()

df_crypto.show(5)

[Stage 1:>                                                          (0 + 1) / 1]

+--------+---------+--------+--------+----------+--------+-------------------+--------------------+---------------+
|   close|coin_name|    high|     low|num_trades|    open|          timestamp|              volume|volume_weighted|
+--------+---------+--------+--------+----------+--------+-------------------+--------------------+---------------+
| 0.14146|      TRX|0.141525|0.140002|      2382|0.140538|2024-03-01 00:00:00|1.1768382651784228E7|          0.141|
|0.114479|      TRX| 0.11455| 0.11311|      1224| 0.11344|2024-06-04 01:00:00|   2260126.626597458|         0.1139|
|   25.69|     AVAX|   25.91|   24.01|     31005|    25.7|2024-08-01 01:00:00|   578591.4245804062|          25.11|
|  0.0953|      XLM|0.097527|  0.0947|     17095|0.097196|2024-10-21 01:00:00| 4.985202882246723E7|         0.0963|
|  0.5231|      ADA|  0.5435|  0.5073|     41020| 0.54192|2024-01-06 00:00:00| 3.461921054990041E7|          0.523|
+--------+---------+--------+--------+----------+--------+--------------

                                                                                

### Getting from PostgreSQL - Stock Data

In [7]:
# PostgreSQL connection details
pg_url = f"jdbc:postgresql://uclba-de25v2.cluster-cowglvndjvxv.eu-west-2.rds.amazonaws.com:5432/postgres"
pg_properties = {
    "user": os.getenv("POSTGRES_USERNAME"),
    "password": os.getenv("POSTGRES_PASSWORD"),
    "driver": "org.postgresql.Driver",
    "searchpath": os.getenv("POSTGRES_SCHEMA")  # Optional, specify schema if necessary
}

# Define the table name
table_name = "stock_data"  # Replace with your table name

# Form the SQL query
query = f"SELECT * FROM {os.getenv("POSTGRES_SCHEMA")}.{table_name}"

# Load data from PostgreSQL into Spark DataFrame
df_stocks = spark.read.jdbc(url=pg_url, table=f"({query}) as query", properties=pg_properties)

# Remove duplicate rows
df_stocks = df_stocks.dropDuplicates()

# Show the first 5 rows of the dataframe
df_stocks.show(5)

[Stage 4:>                                                          (0 + 1) / 1]

+-------------------+------+------------------+------------------+------------------+------------------+---------+
|               Date|Symbol|             Close|              High|               Low|              Open|   Volume|
+-------------------+------+------------------+------------------+------------------+------------------+---------+
|2024-01-29 00:00:00|  AMZN|161.25999450683594| 161.2899932861328|158.89999389648438|159.33999633789062|4.52704E7|
|2024-02-05 00:00:00|  GOOG| 144.2467803955078|145.97858335516779| 143.2315996965748|143.36097653848321|2.92544E7|
|2024-03-12 00:00:00|  AVGO| 127.4710464477539| 129.0409048326896|124.25437096442687|128.96294489563127| 4.2789E7|
|2024-03-19 00:00:00|  AMZN|175.89999389648438|176.08999633789062|173.52000427246094|174.22000122070312|2.68809E7|
|2024-03-20 00:00:00|  AVGO|126.44035339355469| 127.6284542487443|121.54425595762277|122.77596322516155| 4.0946E7|
+-------------------+------+------------------+------------------+--------------

                                                                                

### Getting from MongoDB - Crypto Posts

In [8]:
df_posts = spark.read.format("mongo").option("uri", "mongodb+srv://ucl-de:"+os.getenv('MONGODB_PASSWORD')+"@ucl-de.auvdj.mongodb.net/ucl-de.crypto_posts?retryWrites=true&w=majority").load()
df_posts = df_posts.drop("_id")

# Remove duplicate rows
df_posts = df_posts.dropDuplicates()

df_posts.show(5)

[Stage 8:>                                                          (0 + 1) / 1]

+--------------------+------------+--------------------+--------------------+------------+----------+----------+
|        cleaned_post|    polarity|                post|             subject|subjectivity|subsection| timestamp|
+--------------------+------------+--------------------+--------------------+------------+----------+----------+
|I'm into crypto s...|0.1349403122|I'm into crypto s...|Is bitcoin slowly...|0.4847337006|       BTC|2024-12-13|
|Don't forget, gol...|   0.0090625|Quote from: slama...|Bitcoin now more ...|     0.63625|       BTC|2024-11-15|
|The excitement wi...| 0.096969697|The excitement wi...|Hiccups that can ...|0.4901515152|       BTC|2024-10-10|
|It depends on you...|0.1594238683|Quote from: uneng...|Suggest Practical...|0.4605349794|       BTC|2024-07-14|
|Where do you stan...|      0.1875|Quote from: IceLi...|Regulating Bitcoi...|0.6356818182|       BTC|2024-07-02|
+--------------------+------------+--------------------+--------------------+------------+------

                                                                                

In [22]:
from pyspark.sql.functions import col

# Cast 'timestamp' column to TIMESTAMP type if it's in string format
df_posts = df_posts.withColumn("timestamp", col("timestamp").cast("timestamp"))

# Now show the updated dataframe
df_posts.show(5)

[Stage 18:>                                                         (0 + 1) / 1]

+--------------------+------------+--------------------+--------------------+------------+----------+-------------------+
|        cleaned_post|    polarity|                post|             subject|subjectivity|subsection|          timestamp|
+--------------------+------------+--------------------+--------------------+------------+----------+-------------------+
|I'm into crypto s...|0.1349403122|I'm into crypto s...|Is bitcoin slowly...|0.4847337006|       BTC|2024-12-13 00:00:00|
|Don't forget, gol...|   0.0090625|Quote from: slama...|Bitcoin now more ...|     0.63625|       BTC|2024-11-15 00:00:00|
|The excitement wi...| 0.096969697|The excitement wi...|Hiccups that can ...|0.4901515152|       BTC|2024-10-10 00:00:00|
|For now, we can a...|     0.14875|Quote from: Wakat...|All eyes On Trump...|0.5068055556|       BTC|2024-07-25 00:00:00|
|It depends on you...|0.1594238683|Quote from: uneng...|Suggest Practical...|0.4605349794|       BTC|2024-07-14 00:00:00|
+--------------------+--

                                                                                

### Setting Up S3 for Loading

In [None]:
# Needed for connecting to AWS S3
!pip install --user boto3

In [45]:
import boto3
import json
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession

# Get S3 credentials stored in Secrets Manager
def get_s3_credentials(secret_name="S3_access", region="eu-north-1"):
    session = boto3.Session(
        aws_access_key_id=os.environ["AWS_ACCESS_KEY_ID"],
        aws_secret_access_key=os.environ["AWS_SECRET_ACCESS_KEY"],
        region_name=os.environ["AWS_REGION"]
    )
    client = session.client("secretsmanager")
    response = client.get_secret_value(SecretId=secret_name)
    secret = json.loads(response["SecretString"])
    return secret["AWS_ACCESS_KEY_ID"], secret["AWS_SECRET_ACCESS_KEY"]

# Run Spark query, save to Parquet, upload to S3
def execute_query_and_upload_to_s3(query, parquet_file_name):
    from pyspark.sql import SparkSession
    import os

    # Execute Spark SQL query
    spark = SparkSession.builder.getOrCreate()
    df = spark.sql(query)

    # Save result to local parquet file (single file)
    local_dir = f"/tmp/{parquet_file_name}_dir"
    df = spark.sql(query)
    df.coalesce(1).write.mode("overwrite").parquet(local_dir)
    print(f"Query results saved locally at: {local_dir}")

    # Find the Parquet file within directory
    for filename in os.listdir(local_dir):
        if filename.endswith(".parquet"):
            parquet_path = os.path.join(local_dir, filename)
            final_local_path = f"/tmp/{parquet_file_name}"
            os.rename(parquet_path, final_local_path)
            break

    print(f"Final parquet file path: {final_local_path}")

    # Retrieve S3 credentials
    s3_access_key, s3_secret_key = get_s3_credentials()

    # Create S3 client
    s3 = boto3.client(
        's3',
        aws_access_key_id=s3_access_key,
        aws_secret_access_key=s3_secret_key,
        region_name=AWS_REGION
    )

    s3_key = f"data/{parquet_file_name}"

    # Upload to S3
    try:
        s3.upload_file(final_local_path, BUCKET_NAME, s3_key)
        print(f"File uploaded successfully: s3://{BUCKET_NAME}/{s3_key}")
    except Exception as e:
        print("Error uploading file to S3:", e)

## Transformation - PySpark

<div style="background-color: #cce5ff; padding: 10px; border-radius: 5px;">
    <strong>Notes:</strong>
    <ul><li>For efficiency and clarity, we've selected five of the most impactful transformations (2.5, 3.1, 4.1, 8.2 and 8.3), which have been saved as Parquet files and uploaded to S3. The narratives within the attached report correspond directly to these selections. Additionally, provided below is an inventory of over 30 other transformations and extractions, which can similarly be stored and accessed if required.</li>
    <li>As data engineers, it is essential to bridge technical findings with accessible language. To support decision-making, illustrative narratives have been included immediately after each technical section, providing clear, concise summaries for decision-makers</li></ul>
</div>

### 1. Understandng Data Structure

In [28]:
df_stocks.createOrReplaceTempView("stocks")
df_crypto.createOrReplaceTempView("crypto")
df_posts.createOrReplaceTempView("posts")

In [29]:
spark.sql("""
    SELECT 
        COUNT(*) AS total_rows,
        MIN(Date) AS start_date,
        MAX(Date) AS end_date
    FROM stocks
""").show()

+----------+-------------------+-------------------+
|total_rows|         start_date|           end_date|
+----------+-------------------+-------------------+
|      2761|2024-01-02 00:00:00|2024-12-30 00:00:00|
+----------+-------------------+-------------------+



In [30]:
spark.sql("""
    SELECT 
        COUNT(*) AS total_rows,
        MIN(timestamp) AS start_date,
        MAX(timestamp) AS end_date
    FROM crypto
""").show()

[Stage 8:>                                                          (0 + 1) / 1]

+----------+-------------------+-------------------+
|total_rows|         start_date|           end_date|
+----------+-------------------+-------------------+
|     44286|2024-01-01 00:00:00|2024-12-31 00:00:00|
+----------+-------------------+-------------------+



                                                                                

In [31]:
spark.sql("""
    SELECT 
        COUNT(*) AS total_rows,
        MIN(CAST(timestamp AS TIMESTAMP)) AS start_date,
        MAX(CAST(timestamp AS TIMESTAMP)) AS end_date
    FROM posts
    WHERE timestamp IS NOT NULL
""").show()

+----------+-------------------+-------------------+
|total_rows|         start_date|           end_date|
+----------+-------------------+-------------------+
|      3052|2010-10-19 00:00:00|2024-12-30 00:00:00|
+----------+-------------------+-------------------+



We begin by inspecting the time range and volume of records across our three datasets:

- **Crypto**: 3,660 records from **2024-01-01** to **2024-12-31** — daily records for the period; higher frequency because crypto exchanges do not close on holidays 
- **Stocks**: 2,761 records spanning from **2024-01-02** to **2024-12-30** — confirms daily entries across multiple companies for a full year (excluding holidays, when the exchanges are closed).
- **Posts**: 1,432 sentiment-tagged posts ranging from **2010-10-19** to **2024-12-30** — though historical, we'll likely focus on 2024 to align with price data.
This confirms all datasets have overlapping coverage for **2024**, making them suitable for correlation and trend analysis.

### 2. Overviews
#### 2.1 Top traded stocks

In [32]:
# Top traded stocks
spark.sql("""
    SELECT Symbol, SUM(Volume) AS total_traded_volume
    FROM stocks
    GROUP BY Symbol
    ORDER BY total_traded_volume DESC
    LIMIT 5
""").show()

+------+-------------------+
|Symbol|total_traded_volume|
+------+-------------------+
|  NVDA|       9.4937251E10|
|  TSLA|      2.38213787E10|
|   SPY|      1.44110064E10|
|  AAPL|      1.43565874E10|
|  AMZN|      1.03029356E10|
+------+-------------------+



In [33]:
# Top traded crypto
spark.sql("""
    SELECT coin_name, SUM(volume) AS total_traded_volume
    FROM crypto
    GROUP BY coin_name
    ORDER BY total_traded_volume DESC
    LIMIT 5
""").show()

[Stage 17:>                                                         (0 + 1) / 1]

+---------+--------------------+
|coin_name| total_traded_volume|
+---------+--------------------+
|     DOGE|3.936139643772202E12|
|      XRP|7.842841752612482E11|
|      ADA|1.159059070260934...|
|      XLM| 7.91642754160789E10|
|      TRX|2.149682109878055...|
+---------+--------------------+



                                                                                

#### 2.2 Daily Price Gap

In [15]:
# In stocks
spark.sql("""
   SELECT Symbol, Date, (High - Low) AS price_gap 
   FROM stocks
""").show()

+------+-------------------+------------------+
|Symbol|               Date|         price_gap|
+------+-------------------+------------------+
|  AAPL|2024-01-02 00:00:00| 4.522848372763065|
|  AMZN|2024-01-02 00:00:00|3.9900054931640625|
|  AVGO|2024-01-02 00:00:00|2.4578913357388643|
| BRK-B|2024-01-02 00:00:00|   6.6300048828125|
|  GOOG|2024-01-02 00:00:00| 2.864706685965018|
| GOOGL|2024-01-02 00:00:00| 2.959273593909728|
|  META|2024-01-02 00:00:00|13.099953782164846|
|  MSFT|2024-01-02 00:00:00| 9.044104012460366|
|  NVDA|2024-01-02 00:00:00|1.6995150988554357|
|   SPY|2024-01-02 00:00:00|3.1397485945930157|
|  TSLA|2024-01-02 00:00:00| 6.839996337890625|
|  AAPL|2024-01-03 00:00:00| 2.435390462707147|
|  AMZN|2024-01-03 00:00:00| 2.720001220703125|
|  AVGO|2024-01-03 00:00:00|1.9428331710988118|
| BRK-B|2024-01-03 00:00:00| 6.779998779296875|
|  GOOG|2024-01-03 00:00:00| 2.650480301344345|
| GOOGL|2024-01-03 00:00:00|2.5407923261034284|
|  META|2024-01-03 00:00:00|  4.75186803

In [16]:
# In Crypto
spark.sql("""
   SELECT coin_name, DATE(timestamp) AS date, (high - low) AS price_gap
   FROM crypto
""").show()

+---------+----------+------------------+
|coin_name|      date|         price_gap|
+---------+----------+------------------+
|      SOL|2024-01-01| 8.969999999999999|
|      SOL|2024-01-02|11.030000000000001|
|      SOL|2024-01-03|39.040000000000006|
|      SOL|2024-01-04|11.590000000000003|
|      SOL|2024-01-05|10.239999999999995|
|      SOL|2024-01-06| 8.820000000000007|
|      SOL|2024-01-07|  9.22999999999999|
|      SOL|2024-01-08|16.049999999999997|
|      SOL|2024-01-09|13.260000000000005|
|      SOL|2024-01-10|13.530000000000001|
|      SOL|2024-01-11|              9.75|
|      SOL|2024-01-12|14.227999999999994|
|      SOL|2024-01-13| 8.010000000000005|
|      SOL|2024-01-14|11.829999999999998|
|      SOL|2024-01-15|3.9680000000000035|
|      SOL|2024-01-16| 4.658000000000001|
|      SOL|2024-01-17|6.6299999999999955|
|      SOL|2024-01-18|12.189999999999998|
|      SOL|2024-01-19| 8.599999999999994|
|      SOL|2024-01-20| 4.879000000000005|
+---------+----------+------------

#### 2.4 Monthly Volatility

In [17]:
#Stocks
spark.sql("""
   SELECT Symbol, DATE_TRUNC('month', Date) AS month, STDDEV(Close) AS monthly_volatility 
   FROM stocks 
   GROUP BY Symbol, month
""").show()

+------+-------------------+------------------+
|Symbol|              month|monthly_volatility|
+------+-------------------+------------------+
|  NVDA|2024-03-01 00:00:00|  3.19225511782661|
|  AMZN|2024-08-01 00:00:00| 6.445405383445364|
|  AVGO|2024-12-01 00:00:00|  31.6445725773839|
|  TSLA|2024-11-01 00:00:00| 34.60325024954025|
|   SPY|2024-03-01 00:00:00|5.3122021266211235|
|  AAPL|2024-04-01 00:00:00|  2.75979884664453|
|  MSFT|2024-10-01 00:00:00| 6.505105737121832|
|   SPY|2024-04-01 00:00:00| 8.037746030590498|
|  AAPL|2024-08-01 00:00:00| 6.894340138490561|
| GOOGL|2024-01-01 00:00:00|5.3201708695985905|
|   SPY|2024-08-01 00:00:00|15.359290788146952|
|   SPY|2024-02-01 00:00:00| 5.617393502720848|
|  AVGO|2024-03-01 00:00:00|5.4289384974893045|
|  META|2024-02-01 00:00:00|20.360763034805082|
|   SPY|2024-05-01 00:00:00| 8.381732485560372|
| BRK-B|2024-09-01 00:00:00| 7.740633561813219|
|  AVGO|2024-07-01 00:00:00|  9.31982569561325|
|  AAPL|2024-03-01 00:00:00|2.8734643351

In [18]:
#crypto
spark.sql("""
   SELECT coin_name, DATE_TRUNC('month', timestamp) AS month, STDDEV(close) AS monthly_volatility
   FROM crypto
   GROUP BY coin_name, month
""").show()

[Stage 26:>                                                         (0 + 1) / 1]

+---------+-------------------+--------------------+
|coin_name|              month|  monthly_volatility|
+---------+-------------------+--------------------+
|     DOGE|2024-12-01 00:00:00| 0.05146356701032425|
|      DOT|2024-09-01 00:00:00|  0.2517486375449671|
|      ETH|2024-11-01 00:00:00|   364.9937401275454|
|      BTC|2024-03-01 00:00:00|   3109.738688745365|
|      ADA|2024-01-01 00:00:00|0.037844254610782065|
|      DOT|2024-07-01 00:00:00|  0.3036426374709631|
|      TRX|2024-07-01 00:00:00|0.004066681153474227|
|      ADA|2024-12-01 00:00:00|  0.1285970226814527|
|      XLM|2024-10-01 00:00:00|0.002259648524964068|
|      ADA|2024-09-01 00:00:00| 0.02518933206245201|
|      BTC|2024-02-01 00:00:00|   5125.139363336347|
|      SOL|2024-06-01 00:00:00|  13.106615748238415|
|      TRX|2024-11-01 00:00:00|0.018150246206198057|
|      DOT|2024-10-01 00:00:00| 0.13797729570125747|
|      ETH|2024-02-01 00:00:00|  333.45818930924503|
|      SOL|2024-04-01 00:00:00|   18.932788491

                                                                                

#### 2.5 Volume Spikes

In [9]:
# Import required functions
from pyspark.sql.functions import avg, col, round as round_
from pyspark.sql.window import Window
#stock
window_stock = Window.partitionBy("Symbol").orderBy("Date").rowsBetween(-7, -1)

df_stocks = df_stocks.withColumn("avg_weekly_vol", avg("Volume").over(window_stock)) \
                     .withColumn("volume_spike", col("Volume") / col("avg_weekly_vol"))

df_stocks.createOrReplaceTempView("stocks")

#crypto
window_crypto = Window.partitionBy("coin_name").orderBy("timestamp").rowsBetween(-7, -1)

df_crypto = df_crypto.withColumn("avg_weekly_vol", avg("volume").over(window_crypto)) \
                     .withColumn("volume_spike", col("volume") / col("avg_weekly_vol"))

df_crypto.createOrReplaceTempView("crypto")

In [24]:
#stocks
spark.sql("""
    SELECT Symbol AS asset, Date AS date, ROUND(volume_spike, 2) AS volume_spike, 'Stock' AS type
    FROM stocks
    WHERE volume_spike IS NOT NULL
    ORDER BY volume_spike DESC
    LIMIT 10
""").show()

+-----+-------------------+------------+-----+
|asset|               date|volume_spike| type|
+-----+-------------------+------------+-----+
| AAPL|2024-09-20 00:00:00|        6.37|Stock|
| META|2024-02-02 00:00:00|        4.51|Stock|
| META|2024-04-25 00:00:00|        4.39|Stock|
| META|2024-12-20 00:00:00|        4.15|Stock|
| AVGO|2024-12-13 00:00:00|        3.93|Stock|
| GOOG|2024-06-21 00:00:00|        3.84|Stock|
| TSLA|2024-10-24 00:00:00|        3.72|Stock|
| AMZN|2024-08-02 00:00:00|         3.3|Stock|
| AVGO|2024-05-31 00:00:00|        3.29|Stock|
|BRK-B|2024-12-20 00:00:00|        3.18|Stock|
+-----+-------------------+------------+-----+



                                                                                

In [10]:
#crypto
spark.sql("""
    SELECT coin_name AS asset, timestamp AS date, ROUND(volume_spike, 2) AS volume_spike, 'Crypto' AS type
    FROM crypto
    WHERE volume_spike IS NOT NULL
    ORDER BY volume_spike DESC
    LIMIT 10
""").show()


                                                                                

+-----+-------------------+------------+------+
|asset|               date|volume_spike|  type|
+-----+-------------------+------------+------+
|  TRX|2024-12-03 00:00:00|       15.97|Crypto|
|  TRX|2024-08-20 01:00:00|        8.53|Crypto|
|  ETH|2024-08-05 01:00:00|        7.38|Crypto|
| LINK|2024-08-05 01:00:00|        6.71|Crypto|
| AVAX|2024-06-22 01:00:00|        6.28|Crypto|
| DOGE|2024-02-28 00:00:00|         6.0|Crypto|
| AVAX|2024-04-13 01:00:00|        5.94|Crypto|
|  ADA|2024-11-10 00:00:00|        5.93|Crypto|
|  XRP|2024-11-12 00:00:00|         5.6|Crypto|
| LINK|2024-04-12 01:00:00|        5.47|Crypto|
+-----+-------------------+------------+------+



#### Loading - 2.5 Volume Spikes

In [None]:
AWS_REGION = os.environ['AWS_REGION'] 
BUCKET_NAME = os.environ['BUCKET_NAME'] 

query_to_s3 = """
SELECT coin_name AS asset, timestamp AS date, ROUND(volume_spike, 2) AS volume_spike, 'Crypto' AS type
FROM crypto
WHERE volume_spike IS NOT NULL
ORDER BY volume_spike DESC
LIMIT 10
"""

# Define the filename for storing crypto volume spike results as a Parquet file
parquet_file_name = "crypto_top_volume_spikes.parquet"

# Execute the provided SQL query, store results locally as Parquet,
# and then upload the file to AWS S3
execute_query_and_upload_to_s3(query_to_s3, parquet_file_name)

                                                                                

Query results saved locally at: /tmp/crypto_top_volume_spikes.parquet_dir
Final parquet file path: /tmp/crypto_top_volume_spikes.parquet
File uploaded successfully: s3://ucl-de/data/crypto_top_volume_spikes.parquet


**Overviews - Summary**

Our EDA dives into key activity and volatility metrics across both stock and crypto markets for 2024, offering a cross-asset perspective.

**Trading Volume**
- Among stocks, **NVDA**, **TSLA**, and **SPY** dominate in terms of total traded volume — aligning with their popularity and market visibility.
- In crypto, **DOGE** surprisingly leads, followed by **XRP** and **ADA** — highlighting how community-driven or meme-based assets can overshadow even large-cap coins in trading activity.

**Daily Price Gap (High - Low)**
- For stocks, significant price gaps were observed in **META**, **TSLA**, and **MSFT**, indicating reactive price movement — possibly driven by news, earnings, or macro events.
- For crypto, **SOLANA (SOL)** showed large price swings across multiple consecutive days, showing intense volatility and potential for high intraday gains or losses.

**Monthly Volatility**
- Stocks like **TSLA (Nov)** and **AVGO (Dec)** posted extreme standard deviations in closing prices, marking them as high-risk, high-opportunity assets during those windows.
- Crypto volatility peaked with **ETH**, **BTC**, and **SOL**, particularly in November and December — potentially tied to market speculation or ecosystem events.

**Volume Spikes (Abnormal Activity)**
- For stocks, names like **AAPL**, **META**, and **GOOG** showed days with trading volumes 3x–6x above their 7-day average — ideal candidates for identifying news or investor sentiment shifts.
- In crypto, **ADA**, **SOL**, and **TRX** showed enormous spikes (some >100x their weekly average) — usually a sign of breakout trading, whale moves, or news-triggered hype.

### 3. Performance Metrics
#### 3.1 Daily Return

In [18]:
from pyspark.sql.functions import lag, col, first, last, weekofyear
from pyspark.sql.window import Window

# Stocks: Daily Return
window_stock = Window.partitionBy("Symbol").orderBy("Date")
df_stocks = df_stocks.withColumn(
    "daily_return",
    (col("Close") - lag("Close").over(window_stock)) / lag("Close").over(window_stock)
)

# Crypto: Daily Return
window_crypto = Window.partitionBy("coin_name").orderBy("timestamp")
df_crypto = df_crypto.withColumn(
    "daily_return",
    (col("close") - lag("close").over(window_crypto)) / lag("close").over(window_crypto)
)

df_stocks.createOrReplaceTempView("stocks")
df_crypto.createOrReplaceTempView("crypto")

In [19]:
# STOCKS
spark.sql("""
   SELECT Symbol, Date, ROUND(daily_return, 4) AS daily_return 
   FROM stocks 
   WHERE daily_return IS NOT NULL 
   ORDER BY Symbol, Date
""").show()

+------+-------------------+------------+
|Symbol|               Date|daily_return|
+------+-------------------+------------+
|  AAPL|2024-01-03 00:00:00|     -0.0075|
|  AAPL|2024-01-04 00:00:00|     -0.0127|
|  AAPL|2024-01-05 00:00:00|      -0.004|
|  AAPL|2024-01-08 00:00:00|      0.0242|
|  AAPL|2024-01-09 00:00:00|     -0.0023|
|  AAPL|2024-01-10 00:00:00|      0.0057|
|  AAPL|2024-01-11 00:00:00|     -0.0032|
|  AAPL|2024-01-12 00:00:00|      0.0018|
|  AAPL|2024-01-16 00:00:00|     -0.0123|
|  AAPL|2024-01-17 00:00:00|     -0.0052|
|  AAPL|2024-01-18 00:00:00|      0.0326|
|  AAPL|2024-01-19 00:00:00|      0.0155|
|  AAPL|2024-01-22 00:00:00|      0.0122|
|  AAPL|2024-01-23 00:00:00|      0.0067|
|  AAPL|2024-01-24 00:00:00|     -0.0035|
|  AAPL|2024-01-25 00:00:00|     -0.0017|
|  AAPL|2024-01-26 00:00:00|      -0.009|
|  AAPL|2024-01-29 00:00:00|     -0.0036|
|  AAPL|2024-01-30 00:00:00|     -0.0192|
|  AAPL|2024-01-31 00:00:00|     -0.0194|
+------+-------------------+------

In [20]:
#CRYPTO
spark.sql("""
   SELECT coin_name, timestamp AS date, ROUND(daily_return, 4) AS daily_return
   FROM crypto
   WHERE daily_return IS NOT NULL
   ORDER BY coin_name, timestamp
""").show()


[Stage 47:>                                                         (0 + 1) / 1]

+---------+-------------------+------------+
|coin_name|               date|daily_return|
+---------+-------------------+------------+
|      ADA|2024-01-02 00:00:00|     -0.0293|
|      ADA|2024-01-03 00:00:00|     -0.0795|
|      ADA|2024-01-04 00:00:00|      0.0237|
|      ADA|2024-01-05 00:00:00|     -0.0493|
|      ADA|2024-01-06 00:00:00|     -0.0354|
|      ADA|2024-01-07 00:00:00|     -0.0543|
|      ADA|2024-01-08 00:00:00|      0.0948|
|      ADA|2024-01-09 00:00:00|     -0.0541|
|      ADA|2024-01-10 00:00:00|      0.1056|
|      ADA|2024-01-11 00:00:00|      0.0275|
|      ADA|2024-01-12 00:00:00|     -0.0593|
|      ADA|2024-01-13 00:00:00|      0.0029|
|      ADA|2024-01-14 00:00:00|     -0.0446|
|      ADA|2024-01-15 00:00:00|      0.0059|
|      ADA|2024-01-16 00:00:00|      0.0145|
|      ADA|2024-01-17 00:00:00|     -0.0139|
|      ADA|2024-01-18 00:00:00|      -0.049|
|      ADA|2024-01-19 00:00:00|       0.004|
|      ADA|2024-01-20 00:00:00|       0.023|
|      ADA

                                                                                

#### Loading - 3.1 Daily Return

In [21]:
query_to_s3 = """
SELECT coin_name, timestamp AS date, ROUND(daily_return, 4) AS daily_return
FROM crypto
WHERE daily_return IS NOT NULL
ORDER BY coin_name, timestamp
"""

# Define the filename for storing crypto volume spike results as a Parquet file
parquet_file_name = "crypto_daily_return.parquet"

# Execute the provided SQL query, store results locally as Parquet,
# and then upload the file to AWS S3
execute_query_and_upload_to_s3(query_to_s3, parquet_file_name)

                                                                                

Query results saved locally at: /tmp/crypto_daily_return.parquet_dir
Final parquet file path: /tmp/crypto_daily_return.parquet
File uploaded successfully: s3://ucl-de/data/crypto_daily_return.parquet


#### 3.2 Best and Worst Performers

In [25]:
# Stocks: Total Return (Open vs Close)
df_stocks = df_stocks.withColumn(
    "total_return",
    (col("Close") - col("Open")) / col("Open")
)

# Crypto: Total Return (Open vs Close)
df_crypto = df_crypto.withColumn(
    "total_return",
    (col("close") - col("open")) / col("open")
)

df_stocks.createOrReplaceTempView("stocks")
df_crypto.createOrReplaceTempView("crypto")


In [26]:
# Stocks: Best Performer
spark.sql("""
    SELECT Symbol AS asset, ROUND(total_return, 4) AS return, 'Stock' AS type
    FROM stocks
    ORDER BY return DESC
    LIMIT 1
""").show()
# Crypto: Best Performer
spark.sql("""
    SELECT coin_name AS asset, ROUND(total_return, 4) AS return, 'Crypto' AS type
    FROM crypto
    ORDER BY return DESC
    LIMIT 1
""").show()


+-----+------+-----+
|asset|return| type|
+-----+------+-----+
| AVGO|0.0962|Stock|
+-----+------+-----+



[Stage 42:>                                                         (0 + 1) / 1]

+-----+------+------+
|asset|return|  type|
+-----+------+------+
|  TRX|0.9589|Crypto|
+-----+------+------+



                                                                                

In [27]:
# Stocks: Worst Performer
spark.sql("""
    SELECT Symbol AS asset, ROUND(total_return, 4) AS return, 'Stock' AS type
    FROM stocks
    ORDER BY return ASC
    LIMIT 1
""").show()

# Crypto: Worst Performer
spark.sql("""
    SELECT coin_name AS asset, ROUND(total_return, 4) AS return, 'Crypto' AS type
    FROM crypto
    ORDER BY return ASC
    LIMIT 1
""").show()

+-----+------+-----+
|asset|return| type|
+-----+------+-----+
| TSLA|-0.094|Stock|
+-----+------+-----+

+-----+-------+------+
|asset| return|  type|
+-----+-------+------+
|  TRX|-0.2474|Crypto|
+-----+-------+------+



                                                                                

#### 3.3 Custom Period Return

In [28]:
# Stocks: First & Last Close Return
window_period_stock = Window.partitionBy("Symbol").orderBy("Date")
df_stocks = df_stocks.withColumn("first_close", first("Close").over(window_period_stock)) \
                     .withColumn("last_close", last("Close").over(window_period_stock)) \
                     .withColumn("total_return_custom", (col("last_close") - col("first_close")) / col("first_close"))

# Crypto: First & Last Close Return
window_period_crypto = Window.partitionBy("coin_name").orderBy("timestamp")
df_crypto = df_crypto.withColumn("first_close", first("close").over(window_period_crypto)) \
                     .withColumn("last_close", last("close").over(window_period_crypto)) \
                     .withColumn("total_return_custom", (col("last_close") - col("first_close")) / col("first_close"))

df_stocks.createOrReplaceTempView("stocks")
df_crypto.createOrReplaceTempView("crypto")


In [29]:
# Top performers (Stock + Crypto)
spark.sql("""
    SELECT Symbol AS asset, ROUND(total_return_custom, 4) AS return, 'Stock' AS type
    FROM stocks
    GROUP BY Symbol, total_return_custom
    ORDER BY return DESC
    LIMIT 1
""").show()

spark.sql("""
    SELECT coin_name AS asset, ROUND(total_return_custom, 4) AS return, 'Crypto' AS type
    FROM crypto
    GROUP BY coin_name, total_return_custom
    ORDER BY return DESC
    LIMIT 1
""").show()

                                                                                

+-----+------+-----+
|asset|return| type|
+-----+------+-----+
| NVDA|2.0915|Stock|
+-----+------+-----+



[Stage 48:>                                                         (0 + 1) / 1]

+-----+------+------+
|asset|return|  type|
+-----+------+------+
| DOGE|4.0736|Crypto|
+-----+------+------+



                                                                                

#### 3.4 Weekly return

In [30]:
# Stocks
df_stocks = df_stocks.withColumn("week", weekofyear("Date"))
# Crypto
df_crypto = df_crypto.withColumn("week", weekofyear("timestamp"))

df_stocks.createOrReplaceTempView("stocks")
df_crypto.createOrReplaceTempView("crypto")
    

In [31]:
# Stocks: Weekly return
spark.sql("""
    SELECT Symbol AS asset, week, ROUND(AVG(daily_return), 4) AS weekly_return, 'Stock' AS type
    FROM stocks
    GROUP BY Symbol, week
    ORDER BY Symbol, week
""").show()

# Crypto: Weekly return
spark.sql("""
    SELECT coin_name AS asset, week, ROUND(AVG(daily_return), 4) AS weekly_return, 'Crypto' AS type
    FROM crypto
    GROUP BY coin_name, week
    ORDER BY coin_name, week
""").show()

+-----+----+-------------+-----+
|asset|week|weekly_return| type|
+-----+----+-------------+-----+
| AAPL|   1|      -0.0094|Stock|
| AAPL|   2|       0.0052|Stock|
| AAPL|   3|       0.0077|Stock|
| AAPL|   4|       9.0E-4|Stock|
| AAPL|   5|      -0.0069|Stock|
| AAPL|   6|       0.0035|Stock|
| AAPL|   7|       -0.007|Stock|
| AAPL|   8|       3.0E-4|Stock|
| AAPL|   9|      -0.0031|Stock|
| AAPL|  10|        -0.01|Stock|
| AAPL|  11|       0.0022|Stock|
| AAPL|  12|      -2.0E-4|Stock|
| AAPL|  13|      -0.0011|Stock|
| AAPL|  14|      -0.0022|Stock|
| AAPL|  15|       0.0083|Stock|
| AAPL|  16|      -0.0134|Stock|
| AAPL|  17|       0.0052|Stock|
| AAPL|  18|       0.0165|Stock|
| AAPL|  19|      -1.0E-4|Stock|
| AAPL|  20|       0.0074|Stock|
+-----+----+-------------+-----+
only showing top 20 rows



[Stage 54:>                                                         (0 + 1) / 1]

+-----+----+-------------+------+
|asset|week|weekly_return|  type|
+-----+----+-------------+------+
|  ADA|   1|      -0.0039|Crypto|
|  ADA|   2|       0.0015|Crypto|
|  ADA|   3|      -8.0E-4|Crypto|
|  ADA|   4|      -5.0E-4|Crypto|
|  ADA|   5|       2.0E-4|Crypto|
|  ADA|   6|        0.002|Crypto|
|  ADA|   7|       0.0028|Crypto|
|  ADA|   8|      -9.0E-4|Crypto|
|  ADA|   9|       0.0044|Crypto|
|  ADA|  10|      -1.0E-4|Crypto|
|  ADA|  11|      -9.0E-4|Crypto|
|  ADA|  12|      -8.0E-4|Crypto|
|  ADA|  13|       1.0E-4|Crypto|
|  ADA|  14|      -0.0019|Crypto|
|  ADA|  15|      -0.0043|Crypto|
|  ADA|  16|       0.0014|Crypto|
|  ADA|  17|      -0.0016|Crypto|
|  ADA|  18|      -1.0E-4|Crypto|
|  ADA|  19|      -9.0E-4|Crypto|
|  ADA|  20|       0.0014|Crypto|
+-----+----+-------------+------+
only showing top 20 rows



                                                                                

**Performance Metrics Summary**

This section captures how individual stock and crypto assets performed over time, using return-based metrics to assess both consistency and extremes.

**Daily Return**
- **Stocks** like **AAPL** showed expected day-to-day fluctuations, reflecting natural market cycles. Values range from small negative to moderate positive returns, highlighting both correction and growth phases.
- **Crypto** assets (e.g. **ADA**) showed repeated zero returns, suggesting data granularity issues or periods of low activity — though some days reflect sharp price drops or gains, emphasizing crypto’s jumpy nature.

**Best & Worst Performing Assets**
- Over the open-to-close daily window:
  - **Top stock performer:** **AVGO**, signaling strong intraday growth.
  - **Top crypto performer:** **TRX**, showing potential for explosive short-term gains.
  - **Worst stock performer:** **TSLA**, possibly tied to a bearish phase or correction.
  - **Worst crypto performer:** Also **TRX**, reinforcing the volatility theme in crypto.
- Crypto assets can swing from top to bottom across different periods — unlike stocks, which tend to be more stable.

**Custom Period Return (First to Last Close)**
- Over the full dataset timeline:
  - **Top stock:** **NVDA**, with a return >2x — consistent with its strong 2024 rally.
  - **Top crypto:** **DOGE**, boasting over 4x return — meme coins often outperform in bull cycles.
- Stocks show solid growth; crypto shows outsized returns but with less predictability.

**Weekly Return**
- Weekly stock returns (e.g. **AAPL**) offer smoother insights than daily — still minor swings, but more readable trends.
- Weekly crypto returns (e.g. **ADA**) range from mild to volatile, but average out better over time — suggesting improved stability when zoomed out.

### 4. Volatility & Risk
#### 4.1 Rolling 7-day volatility

In [23]:
# Import dependencies
from pyspark.sql.functions import stddev, avg, col,year
from pyspark.sql.window import Window

#Rolling 7-Day Volatility

# Stock window
rolling_window_stock = Window.partitionBy("Symbol").orderBy("Date").rowsBetween(-6, 0)
df_stocks = df_stocks.withColumn("7_day_volatility", stddev("daily_return").over(rolling_window_stock))
df_stocks.createOrReplaceTempView("stocks")

# Crypto window
rolling_window_crypto = Window.partitionBy("coin_name").orderBy("timestamp").rowsBetween(-6, 0)
df_crypto = df_crypto.withColumn("7_day_volatility", stddev("daily_return").over(rolling_window_crypto))
df_crypto.createOrReplaceTempView("crypto")


In [24]:
# Query rolling volatility (Stocks)
spark.sql("""
    SELECT Symbol, Date, ROUND(7_day_volatility, 4) AS rolling_volatility
    FROM stocks
    WHERE 7_day_volatility IS NOT NULL
    ORDER BY Symbol, Date
""").show()

+------+-------------------+------------------+
|Symbol|               Date|rolling_volatility|
+------+-------------------+------------------+
|  AAPL|2024-01-04 00:00:00|            0.0037|
|  AAPL|2024-01-05 00:00:00|            0.0044|
|  AAPL|2024-01-08 00:00:00|            0.0165|
|  AAPL|2024-01-09 00:00:00|            0.0143|
|  AAPL|2024-01-10 00:00:00|            0.0131|
|  AAPL|2024-01-11 00:00:00|             0.012|
|  AAPL|2024-01-12 00:00:00|            0.0115|
|  AAPL|2024-01-16 00:00:00|            0.0115|
|  AAPL|2024-01-17 00:00:00|            0.0116|
|  AAPL|2024-01-18 00:00:00|            0.0144|
|  AAPL|2024-01-19 00:00:00|             0.015|
|  AAPL|2024-01-22 00:00:00|            0.0153|
|  AAPL|2024-01-23 00:00:00|            0.0147|
|  AAPL|2024-01-24 00:00:00|            0.0152|
|  AAPL|2024-01-25 00:00:00|            0.0134|
|  AAPL|2024-01-26 00:00:00|            0.0141|
|  AAPL|2024-01-29 00:00:00|            0.0092|
|  AAPL|2024-01-30 00:00:00|            

In [25]:
# Query rolling volatility (Crypto)
spark.sql("""
    SELECT coin_name, timestamp AS date, ROUND(7_day_volatility, 4) AS rolling_volatility
    FROM crypto
    WHERE 7_day_volatility IS NOT NULL
    ORDER BY coin_name, date
""").show()

[Stage 78:>                                                         (0 + 1) / 1]

+---------+-------------------+------------------+
|coin_name|               date|rolling_volatility|
+---------+-------------------+------------------+
|      ADA|2024-01-03 00:00:00|            0.0354|
|      ADA|2024-01-04 00:00:00|            0.0516|
|      ADA|2024-01-05 00:00:00|            0.0434|
|      ADA|2024-01-06 00:00:00|            0.0376|
|      ADA|2024-01-07 00:00:00|            0.0346|
|      ADA|2024-01-08 00:00:00|            0.0591|
|      ADA|2024-01-09 00:00:00|            0.0606|
|      ADA|2024-01-10 00:00:00|            0.0709|
|      ADA|2024-01-11 00:00:00|             0.071|
|      ADA|2024-01-12 00:00:00|            0.0724|
|      ADA|2024-01-13 00:00:00|            0.0704|
|      ADA|2024-01-14 00:00:00|             0.069|
|      ADA|2024-01-15 00:00:00|            0.0582|
|      ADA|2024-01-16 00:00:00|            0.0537|
|      ADA|2024-01-17 00:00:00|            0.0318|
|      ADA|2024-01-18 00:00:00|              0.03|
|      ADA|2024-01-19 00:00:00|

                                                                                

#### Loading - 4.1 Rolling 7-day volatility

In [26]:
query_to_s3 = """
SELECT coin_name, timestamp AS date, ROUND(7_day_volatility, 4) AS rolling_volatility
FROM crypto
WHERE 7_day_volatility IS NOT NULL
ORDER BY coin_name, date
"""

# Define the filename for storing crypto volume spike results as a Parquet file
parquet_file_name = "crypto_volatility.parquet"

# Execute the provided SQL query, store results locally as Parquet,
# and then upload the file to AWS S3
execute_query_and_upload_to_s3(query_to_s3, parquet_file_name)

                                                                                

Query results saved locally at: /tmp/crypto_volatility.parquet_dir
Final parquet file path: /tmp/crypto_volatility.parquet
File uploaded successfully: s3://ucl-de/data/crypto_volatility.parquet


#### 4.2 Sharpe Ratio Calculation

In [41]:
# Stocks
spark.sql("""
    SELECT Symbol,
           ROUND(AVG(daily_return), 4) AS avg_ret,
           ROUND(STDDEV(daily_return), 4) AS volatility,
           ROUND((AVG(daily_return) - 0.02) / STDDEV(daily_return), 2) AS sharpe_ratio
    FROM stocks
    GROUP BY Symbol
    ORDER BY sharpe_ratio DESC
""").show()



+------+-------+----------+------------+
|Symbol|avg_ret|volatility|sharpe_ratio|
+------+-------+----------+------------+
|  TSLA| 0.0029|    0.0401|       -0.43|
|  NVDA| 0.0047|    0.0331|       -0.46|
|  AVGO| 0.0037|     0.034|       -0.48|
|  META| 0.0024|     0.023|       -0.76|
|  AMZN| 0.0017|    0.0177|       -1.03|
| GOOGL| 0.0015|    0.0177|       -1.05|
|  GOOG| 0.0015|    0.0175|       -1.06|
|  AAPL| 0.0013|    0.0141|       -1.32|
|  MSFT| 7.0E-4|    0.0126|       -1.54|
| BRK-B| 9.0E-4|    0.0092|       -2.08|
|   SPY|  0.001|    0.0079|        -2.4|
+------+-------+----------+------------+



In [42]:
# Crypto
spark.sql("""
    SELECT coin_name,
           ROUND(AVG(daily_return), 4) AS avg_ret,
           ROUND(STDDEV(daily_return), 4) AS volatility,
           ROUND((AVG(daily_return) - 0.02) / STDDEV(daily_return), 2) AS sharpe_ratio
    FROM crypto
    GROUP BY coin_name
    ORDER BY sharpe_ratio DESC
""").show()

[Stage 78:>                                                         (0 + 1) / 1]

+---------+-------+----------+------------+
|coin_name|avg_ret|volatility|sharpe_ratio|
+---------+-------+----------+------------+
|      DOT| 3.0E-4|    0.0444|       -0.44|
|      XLM| 0.0019|    0.0395|       -0.46|
|      TRX| 4.0E-4|      0.02|       -0.98|
|      ADA| 3.0E-4|    0.0167|       -1.18|
|     DOGE| 3.0E-4|    0.0139|       -1.41|
|     AVAX| 1.0E-4|    0.0132|       -1.51|
|      XRP| 3.0E-4|    0.0111|       -1.78|
|      SOL| 1.0E-4|    0.0104|        -1.9|
|      ETH| 1.0E-4|    0.0088|       -2.27|
|      BTC| 2.0E-4|     0.007|       -2.85|
+---------+-------+----------+------------+



                                                                                

**Volatility & Risk Summary**

This section explores risk dynamics across stocks and crypto assets using **rolling volatility** and the **Sharpe ratio**, offering a cross-asset lens on return consistency and volatility exposure.

**Rolling 7-Day Volatility**
- Among stocks, **AAPL**, **META**, and **GOOG** exhibited rolling volatility ranging between **1%–1.6%**, showing relatively stable yet noticeable intraday fluctuations.
- In crypto, **ADA** and **TRX** demonstrated much higher volatility bursts — crossing **3%+** on several occasions, reinforcing the market's **short-term unpredictability** and sensitivity to news or volume surges.
- Rolling volatility effectively captures **near-term market choppiness**, helping identify **risk windows** and timing entry/exit points for trades.

**Sharpe Ratio (Risk-Adjusted Return)**
- Stock assets like **TSLA**, **AVGO**, and **NVDA** led with relatively higher (but still **negative**) Sharpe ratios, hinting at **suboptimal risk-adjusted returns** across the board in 2024.
- Crypto Sharpe ratios were notably worse: **BTC**, **ETH**, and **DOGE** posted values as low as **-2.85**, **-2.27**, and **-1.41**, reflecting poor reward for the high volatility endured.
- These figures suggest that while crypto may offer **explosive returns**, they rarely compensate for the corresponding **risk volatility**, especially when benchmarked to a 2% baseline return.

### 5. Temporal Trends & Patterns
#### 5.1 Quarterly average price

In [43]:
# Stocks
spark.sql("""
    SELECT DATE_TRUNC('quarter', Date) AS quarter, Symbol, 
           ROUND(AVG(Close), 2) AS avg_quarterly_price
    FROM stocks
    GROUP BY quarter, Symbol
    ORDER BY quarter, Symbol
""").show()

+-------------------+------+-------------------+
|            quarter|Symbol|avg_quarterly_price|
+-------------------+------+-------------------+
|2024-01-01 00:00:00|  AAPL|             180.87|
|2024-01-01 00:00:00|  AMZN|             166.93|
|2024-01-01 00:00:00|  AVGO|             122.32|
|2024-01-01 00:00:00| BRK-B|             393.34|
|2024-01-01 00:00:00|  GOOG|             143.83|
|2024-01-01 00:00:00| GOOGL|             142.54|
|2024-01-01 00:00:00|  META|             444.61|
|2024-01-01 00:00:00|  MSFT|              401.4|
|2024-01-01 00:00:00|  NVDA|              72.46|
|2024-01-01 00:00:00|   SPY|             491.81|
|2024-01-01 00:00:00|  TSLA|             195.37|
|2024-04-01 00:00:00|  AAPL|             185.75|
|2024-04-01 00:00:00|  AMZN|              183.7|
|2024-04-01 00:00:00|  AVGO|             138.91|
|2024-04-01 00:00:00| BRK-B|             408.62|
|2024-04-01 00:00:00|  GOOG|             169.54|
|2024-04-01 00:00:00| GOOGL|             168.01|
|2024-04-01 00:00:00

In [44]:
# Crypto
spark.sql("""
    SELECT DATE_TRUNC('quarter', timestamp) AS quarter, coin_name,
           ROUND(AVG(close), 2) AS avg_quarterly_price
    FROM crypto
    GROUP BY quarter, coin_name
    ORDER BY quarter, coin_name
""").show()

[Stage 84:>                                                         (0 + 1) / 1]

+-------------------+---------+-------------------+
|            quarter|coin_name|avg_quarterly_price|
+-------------------+---------+-------------------+
|2024-01-01 00:00:00|      ADA|                0.6|
|2024-01-01 00:00:00|     AVAX|              41.42|
|2024-01-01 00:00:00|      BTC|            53574.8|
|2024-01-01 00:00:00|     DOGE|               0.11|
|2024-01-01 00:00:00|      DOT|               8.21|
|2024-01-01 00:00:00|      ETH|            2920.36|
|2024-01-01 00:00:00|      SOL|             123.64|
|2024-01-01 00:00:00|      TRX|               0.12|
|2024-01-01 00:00:00|      XLM|               0.12|
|2024-01-01 00:00:00|      XRP|               0.58|
|2024-04-01 00:00:00|      ADA|               0.46|
|2024-04-01 00:00:00|     AVAX|              35.47|
|2024-04-01 00:00:00|      BTC|           65679.27|
|2024-04-01 00:00:00|     DOGE|               0.15|
|2024-04-01 00:00:00|      DOT|               6.98|
|2024-04-01 00:00:00|      ETH|            3372.51|
|2024-04-01 

                                                                                

#### 5.2 Yearly performance

In [45]:
# Stocks
spark.sql("""
    SELECT YEAR(Date) AS year, 
           ROUND(AVG((Close - Open)/Open)*100, 2) AS avg_yearly_return
    FROM stocks
    GROUP BY year
    ORDER BY year DESC
""").show()

+----+-----------------+
|year|avg_yearly_return|
+----+-----------------+
|2024|             0.03|
+----+-----------------+



In [46]:
# Crypto
spark.sql("""
    SELECT YEAR(timestamp) AS year, 
           ROUND(AVG((close - open)/open)*100, 2) AS avg_yearly_return
    FROM crypto
    GROUP BY year
    ORDER BY year DESC
""").show()

+----+-----------------+
|year|avg_yearly_return|
+----+-----------------+
|2024|             0.28|
+----+-----------------+



                                                                                

#### 5.3 Moving Averages

In [47]:
# Stocks
window_50_stock = Window.partitionBy("Symbol").orderBy("Date").rowsBetween(-49, 0)
window_200_stock = Window.partitionBy("Symbol").orderBy("Date").rowsBetween(-199, 0)

df_stocks = df_stocks.withColumn("SMA_50", avg("Close").over(window_50_stock)) \
                     .withColumn("SMA_200", avg("Close").over(window_200_stock))
df_stocks.createOrReplaceTempView("stocks")

# Crypto
window_50_crypto = Window.partitionBy("coin_name").orderBy("timestamp").rowsBetween(-49, 0)
window_200_crypto = Window.partitionBy("coin_name").orderBy("timestamp").rowsBetween(-199, 0)

df_crypto = df_crypto.withColumn("SMA_50", avg("close").over(window_50_crypto)) \
                     .withColumn("SMA_200", avg("close").over(window_200_crypto))
df_crypto.createOrReplaceTempView("crypto")


In [48]:
#stocks
spark.sql("""
   SELECT Symbol, Date, ROUND(SMA_50, 2) AS SMA_50, ROUND(SMA_200, 2) AS SMA_200 
   FROM stocks 
   WHERE SMA_50 IS NOT NULL AND SMA_200 IS NOT NULL 
   ORDER BY Symbol, Date
""").show()

+------+-------------------+------+-------+
|Symbol|               Date|SMA_50|SMA_200|
+------+-------------------+------+-------+
|  AAPL|2024-01-02 00:00:00|184.53| 184.53|
|  AAPL|2024-01-03 00:00:00|183.84| 183.84|
|  AAPL|2024-01-04 00:00:00|182.84| 182.84|
|  AAPL|2024-01-05 00:00:00|182.15| 182.15|
|  AAPL|2024-01-08 00:00:00|182.61| 182.61|
|  AAPL|2024-01-09 00:00:00|182.85| 182.85|
|  AAPL|2024-01-10 00:00:00|183.17| 183.17|
|  AAPL|2024-01-11 00:00:00|183.33| 183.33|
|  AAPL|2024-01-12 00:00:00| 183.5|  183.5|
|  AAPL|2024-01-16 00:00:00| 183.4|  183.4|
|  AAPL|2024-01-17 00:00:00|183.24| 183.24|
|  AAPL|2024-01-18 00:00:00|183.59| 183.59|
|  AAPL|2024-01-19 00:00:00|184.12| 184.12|
|  AAPL|2024-01-22 00:00:00|184.73| 184.73|
|  AAPL|2024-01-23 00:00:00|185.35| 185.35|
|  AAPL|2024-01-24 00:00:00|185.85| 185.85|
|  AAPL|2024-01-25 00:00:00|186.27| 186.27|
|  AAPL|2024-01-26 00:00:00|186.55| 186.55|
|  AAPL|2024-01-29 00:00:00|186.76| 186.76|
|  AAPL|2024-01-30 00:00:00|186.

In [49]:
#crypto
spark.sql("""
    SELECT coin_name, timestamp AS date, 
           ROUND(SMA_50, 2) AS SMA_50, ROUND(SMA_200, 2) AS SMA_200
    FROM crypto
    WHERE SMA_50 IS NOT NULL AND SMA_200 IS NOT NULL
    ORDER BY coin_name, date
""").show()

[Stage 96:>                                                         (0 + 1) / 1]

+---------+-------------------+------+-------+
|coin_name|               date|SMA_50|SMA_200|
+---------+-------------------+------+-------+
|      ADA|2024-01-01 00:00:00|  0.62|   0.62|
|      ADA|2024-01-01 00:00:00|  0.62|   0.62|
|      ADA|2024-01-01 00:00:00|  0.62|   0.62|
|      ADA|2024-01-01 00:00:00|  0.62|   0.62|
|      ADA|2024-01-01 00:00:00|  0.62|   0.62|
|      ADA|2024-01-01 00:00:00|  0.62|   0.62|
|      ADA|2024-01-01 00:00:00|  0.62|   0.62|
|      ADA|2024-01-02 00:00:00|  0.62|   0.62|
|      ADA|2024-01-02 00:00:00|  0.62|   0.62|
|      ADA|2024-01-02 00:00:00|  0.62|   0.62|
|      ADA|2024-01-02 00:00:00|  0.62|   0.62|
|      ADA|2024-01-02 00:00:00|  0.62|   0.62|
|      ADA|2024-01-02 00:00:00|  0.62|   0.62|
|      ADA|2024-01-02 00:00:00|  0.61|   0.61|
|      ADA|2024-01-03 00:00:00|  0.61|   0.61|
|      ADA|2024-01-03 00:00:00|  0.61|   0.61|
|      ADA|2024-01-03 00:00:00|   0.6|    0.6|
|      ADA|2024-01-03 00:00:00|   0.6|    0.6|
|      ADA|20

                                                                                

**Temporal Trends & Patterns Summary**

This section examines how stock and crypto assets evolve across different time horizons — quarterly, yearly, and with moving averages — to highlight seasonality, momentum, and macro-level patterns.

**Quarterly Average Price**
- In stocks, **META**, **SPY**, and **MSFT** exhibited consistently high quarterly averages — reflecting strength and stability across quarters.
- For crypto, top-value assets like **BTC** and **ETH** posted the highest quarterly averages (e.g., **BTC > $65k in Q2**), while **ADA**, **XLM**, and **TRX** showed lower averages but potential for sharper relative gains due to their price sensitivity.

**Yearly Performance (2024)**
- **Crypto** outperformed stocks in 2024:  
  - Stocks returned an average of **+0.03%**, indicating a flat year possibly due to broader market uncertainty.
  - Cryptos surged with **+0.28%** on average — driven by recovery sentiment and speculative inflows.
- This stark contrast underscores how market cycles can affect asset classes differently.

**Moving Averages (SMA 50 vs. SMA 200)**
- For stocks like **AAPL**, the **50-day SMA remained below the 200-day SMA** for extended periods — signaling caution or bearish trend continuation.
- In crypto, moving averages for **ADA**, **XRP**, and **DOGE** were often flat or closely aligned, indicating low trend momentum or range-bound movement during certain periods.
- These crossovers (or lack thereof) help flag potential trend reversals or confirm trend continuations.

### 6. Sentiment Analysis

In [50]:
#for sentiment label
from pyspark.sql.functions import when
df_posts = df_posts.withColumn("sentiment_label",
    when(col("polarity") > 0.1, "Positive")
    .when(col("polarity") < -0.1, "Negative")
    .otherwise("Neutral"))
df_posts.createOrReplaceTempView("posts")


In [51]:
#for rolling sentiment
rolling = Window.orderBy("timestamp").rowsBetween(-6, 0)
df_posts = df_posts.withColumn("rolling_sentiment", avg("polarity").over(rolling))
df_posts.createOrReplaceTempView("posts")


#### 6.1 Avg daily sentiment

In [52]:
spark.sql("""
   SELECT timestamp, ROUND(AVG(polarity), 4) AS avg_daily_polarity 
   FROM posts 
   GROUP BY timestamp 
   ORDER BY timestamp
""").show()

+-------------------+------------------+
|          timestamp|avg_daily_polarity|
+-------------------+------------------+
|               NULL|               0.0|
|2010-10-19 00:00:00|            0.1292|
|2010-10-21 00:00:00|               0.0|
|2010-10-22 00:00:00|            0.2313|
|2010-10-25 00:00:00|               0.0|
|2010-10-26 00:00:00|            0.3143|
|2010-10-27 00:00:00|              0.23|
|2010-10-31 00:00:00|             0.119|
|2013-04-02 00:00:00|            0.3575|
|2013-04-04 00:00:00|            0.0406|
|2013-04-18 00:00:00|               0.1|
|2020-05-03 00:00:00|             0.188|
|2020-05-13 00:00:00|            0.2069|
|2020-06-12 00:00:00|            0.1014|
|2020-07-08 00:00:00|               0.8|
|2020-09-15 00:00:00|            0.1606|
|2020-09-22 00:00:00|            -0.082|
|2020-09-23 00:00:00|              0.34|
|2020-12-03 00:00:00|             0.112|
|2022-12-22 00:00:00|           -0.0712|
+-------------------+------------------+
only showing top

#### 6.2 Sentiment by topic

In [53]:
spark.sql("""
   SELECT timestamp, subsection, AVG(polarity) AS avg_polarity 
   FROM posts 
   GROUP BY timestamp, subsection
""").show()

+-------------------+----------+-------------------+
|          timestamp|subsection|       avg_polarity|
+-------------------+----------+-------------------+
|2024-01-02 00:00:00|       BTC|     0.169642572845|
|2024-12-24 00:00:00|       BTC|      0.07767320245|
|2024-10-28 00:00:00|      DOGE|0.05538033395714286|
|2024-10-29 00:00:00|      DOGE|0.17081651805384615|
|2024-11-06 00:00:00|       BTC|       0.1938449306|
|2024-11-08 00:00:00|       BTC|0.11535687230000002|
|2024-11-18 00:00:00|       XRP|       0.0732195037|
|2024-07-23 00:00:00|       ETH|       0.0758122896|
|2024-02-08 00:00:00|       BTC|     0.182236925225|
|2020-12-03 00:00:00|       SOL|              0.112|
|2024-06-04 00:00:00|       XRP|0.14836647725000002|
|2010-10-19 00:00:00|       BTC|0.12916666666666668|
|2024-01-03 00:00:00|       BTC| 0.1788957548111111|
|2024-12-05 00:00:00|       TRX|0.09723930480000001|
|2024-10-09 00:00:00|       BTC|      0.19213541665|
|2024-03-20 00:00:00|       BTC|0.139483351299

#### 6.3 Positive & Negative posts

In [54]:
spark.sql("""
   SELECT timestamp, cleaned_post, polarity 
   FROM posts 
   ORDER BY polarity 
   DESC LIMIT 5
""").show()

+-------------------+--------------------+--------+
|          timestamp|        cleaned_post|polarity|
+-------------------+--------------------+--------+
|2024-07-14 00:00:00|OnlyFans An excel...|     1.0|
|2024-03-13 00:00:00|I agree to the ot...|     1.0|
|2020-07-08 00:00:00|Hey guys $SOL is ...|     0.8|
|2024-06-22 00:00:00|Can Bitcoin be a ...|     0.7|
|2024-01-02 00:00:00|I dont know that ...|   0.625|
+-------------------+--------------------+--------+



In [55]:
spark.sql("""
   SELECT timestamp, cleaned_post, polarity 
   FROM posts 
   ORDER BY polarity 
   ASC LIMIT 5
""").show()

+-------------------+--------------------+-------------+
|          timestamp|        cleaned_post|     polarity|
+-------------------+--------------------+-------------+
|2024-07-28 00:00:00|I agree, currency...|         -0.8|
|2024-07-28 00:00:00|I agree, currency...|       -0.345|
|2024-03-11 00:00:00|Kucoin o Coinbase...|-0.3333333333|
|2024-03-15 00:00:00|What wallet are y...|-0.3083333333|
|2010-10-19 00:00:00|-10BTC for forced...|         -0.3|
+-------------------+--------------------+-------------+



#### 6.4 Sentiment label counts

In [56]:
spark.sql("""
   SELECT sentiment_label, COUNT(*) AS count
   FROM posts 
   GROUP BY sentiment_label
""").show()

+---------------+-----+
|sentiment_label|count|
+---------------+-----+
|       Positive|  853|
|        Neutral|  635|
|       Negative|   38|
+---------------+-----+



#### 6.5  Sentiment volume

In [57]:
spark.sql("""
   SELECT timestamp, AVG(polarity) AS avg_polarity, COUNT(post) AS post_count 
   FROM posts 
   GROUP BY timestamp 
   ORDER BY timestamp
""").show()

+-------------------+-------------------+----------+
|          timestamp|       avg_polarity|post_count|
+-------------------+-------------------+----------+
|               NULL|                0.0|        94|
|2010-10-19 00:00:00|0.12916666666666668|         3|
|2010-10-21 00:00:00|                0.0|         1|
|2010-10-22 00:00:00|            0.23125|         5|
|2010-10-25 00:00:00|                0.0|         2|
|2010-10-26 00:00:00|       0.3142857143|         1|
|2010-10-27 00:00:00| 0.2299886621142857|         7|
|2010-10-31 00:00:00|        0.119047619|         1|
|2013-04-02 00:00:00|        0.357521645|         2|
|2013-04-04 00:00:00|           0.040625|         1|
|2013-04-18 00:00:00|                0.1|         1|
|2020-05-03 00:00:00|0.18797348484999998|         6|
|2020-05-13 00:00:00|0.20685714285999998|         5|
|2020-06-12 00:00:00|      0.10139668365|         2|
|2020-07-08 00:00:00|                0.8|         1|
|2020-09-15 00:00:00|     0.160602678575|     

**Sentiment Analysis Summary**

This section explores the tone and emotion behind user-generated content, uncovering how sentiment trends correlate with asset discussions over time.

**Avg Daily Sentiment**
- Overall average polarity fluctuated across the timeline, with noticeable **positive spikes** in 2013 and **negative dips** around late 2020 and early 2022.
- Daily sentiment varies, indicating **market mood swings** likely tied to major events or price movements.

**Sentiment by Topic**
- **BTC** consistently appeared across timestamps with mild to moderately positive sentiment (e.g., Jan 2 and Nov 6).
- Posts tagged under **DOGE**, **SOL**, and **TRX** also show mostly positive polarity, reaffirming strong community backing.
- **ETH** appeared slightly neutral to positive, while **XRP** sentiment fluctuated.

**Top Positive & Negative Posts**
- The **most positive posts** expressed enthusiasm and optimism toward specific coins like $SOL and BTC adoption.
- The **most negative posts** reflected strong skepticism or frustrations — often hinting at scams or exchange issues (e.g., Kucoin, forced withdrawal mentions).

**Sentiment Label Distribution**
- Most posts were **Positive (853)** or **Neutral (635)** — indicating a predominantly bullish or balanced tone in discussions.
- Very few posts were **Negative (38)**, suggesting either a generally optimistic community or low representation of contrarian opinions.

**Sentiment Volume**
- Post volume was **heaviest during neutral sentiment days**, though high positivity also aligned with days of elevated activity.
- Days like **2020-07-08** and **2020-05-13** saw extremely high polarity (+0.8, +0.26) paired with spikes in post count — likely around breakout discussions or news hype.

### 7. Merging All Tables

In [28]:
from pyspark.sql.functions import avg, col

# Aggregate stock data
stocks_prepped = df_stocks.groupBy("Date").agg(
    avg("Open").alias("s_open"),
    avg("Close").alias("s_close")
).withColumnRenamed("Date", "DATE")

# Aggregate crypto data
crypto_prepped = df_crypto.groupBy("timestamp").agg(
    avg("open").alias("c_open"),
    avg("close").alias("c_close")
).withColumnRenamed("timestamp", "DATE")

# Aggregate sentiment data
sentiment_prepped = df_posts.groupBy("timestamp").agg(
    avg("polarity").alias("sentiment")
).withColumnRenamed("timestamp", "DATE")


In [29]:
# Join all 3 on DATE
transformed_data = stocks_prepped \
    .join(crypto_prepped, on="DATE", how="inner") \
    .join(sentiment_prepped, on="DATE", how="inner")

transformed_data.createOrReplaceTempView("transformed_data")


### 8. Combined Analysis
#### 8.1 Price Comparison Between Stocks & Crypto

In [30]:
#Daily Change Percentage (Stocks vs Crypto)
spark.sql("""
SELECT 
  DATE,
  ROUND(((s_close - s_open)/s_open)*100, 2) AS stock_pct_change,
  ROUND(((c_close - c_open)/c_open)*100, 2) AS crypto_pct_change
FROM transformed_data
ORDER BY DATE
""").show()

                                                                                

+-------------------+----------------+-----------------+
|               DATE|stock_pct_change|crypto_pct_change|
+-------------------+----------------+-----------------+
|2024-01-02 00:00:00|           -0.32|             1.69|
|2024-01-03 00:00:00|           -0.03|            -4.78|
|2024-01-24 00:00:00|             0.0|             0.44|
|2024-01-25 00:00:00|           -0.14|            -0.37|
|2024-01-26 00:00:00|            0.03|             4.57|
|2024-01-31 00:00:00|           -1.08|            -1.05|
|2024-02-01 00:00:00|            0.63|             1.22|
|2024-02-08 00:00:00|            0.09|             2.12|
|2024-02-09 00:00:00|            0.72|             4.09|
|2024-02-13 00:00:00|            0.21|            -0.45|
|2024-02-14 00:00:00|            0.56|             4.28|
|2024-02-26 00:00:00|            -0.9|             5.22|
|2024-02-27 00:00:00|             0.1|             4.52|
|2024-02-28 00:00:00|            0.03|             9.25|
|2024-03-05 00:00:00|          

#### 8.2 Sentiment vs Daily Returns

In [31]:
spark.sql("""
SELECT 
  DATE,
  ROUND(((s_close - s_open)/s_open)*100, 2) AS stock_pct_change,
  ROUND(((c_close - c_open)/c_open)*100, 2) AS crypto_pct_change,
  ROUND(sentiment, 3) AS avg_sentiment
FROM transformed_data
ORDER BY DATE
""").show()

                                                                                

+-------------------+----------------+-----------------+-------------+
|               DATE|stock_pct_change|crypto_pct_change|avg_sentiment|
+-------------------+----------------+-----------------+-------------+
|2024-01-02 00:00:00|           -0.32|             1.69|         0.17|
|2024-01-03 00:00:00|           -0.03|            -4.78|        0.179|
|2024-01-24 00:00:00|             0.0|             0.44|        0.125|
|2024-01-25 00:00:00|           -0.14|            -0.37|        0.204|
|2024-01-26 00:00:00|            0.03|             4.57|        0.064|
|2024-01-31 00:00:00|           -1.08|            -1.05|        0.097|
|2024-02-01 00:00:00|            0.63|             1.22|       -0.028|
|2024-02-08 00:00:00|            0.09|             2.12|        0.182|
|2024-02-09 00:00:00|            0.72|             4.09|        0.235|
|2024-02-13 00:00:00|            0.21|            -0.45|        0.174|
|2024-02-14 00:00:00|            0.56|             4.28|        0.169|
|2024-

#### Loading - 8.2 Sentiment vs Daily Returns

In [32]:
query_to_s3 = """
SELECT 
  DATE,
  ROUND(((s_close - s_open)/s_open)*100, 2) AS stock_pct_change,
  ROUND(((c_close - c_open)/c_open)*100, 2) AS crypto_pct_change,
  ROUND(sentiment, 3) AS avg_sentiment
FROM transformed_data
ORDER BY DATE
"""

# Define the filename for storing crypto volume spike results as a Parquet file
parquet_file_name = "sentiment_vs_daily_return.parquet"

# Execute the provided SQL query, store results locally as Parquet,
# and then upload the file to AWS S3
execute_query_and_upload_to_s3(query_to_s3, parquet_file_name)

                                                                                

Query results saved locally at: /tmp/sentiment_vs_daily_return.parquet_dir
Final parquet file path: /tmp/sentiment_vs_daily_return.parquet
File uploaded successfully: s3://ucl-de/data/sentiment_vs_daily_return.parquet


#### 8.3 Days When Sentiment Was High but Prices Dropped

In [70]:
spark.sql("""
SELECT 
  DATE,
  ROUND(sentiment, 3) AS sentiment,
  ROUND(((s_close - s_open)/s_open)*100, 2) AS stock_pct_change,
  ROUND(((c_close - c_open)/c_open)*100, 2) AS crypto_pct_change
FROM transformed_data
WHERE sentiment > 0.3 
  AND (((s_close - s_open)/s_open) < 0 OR ((c_close - c_open)/c_open) < 0)
ORDER BY DATE
""").show()

                                                                                

+-------------------+---------+----------------+-----------------+
|               DATE|sentiment|stock_pct_change|crypto_pct_change|
+-------------------+---------+----------------+-----------------+
|2024-03-13 00:00:00|    0.323|           -0.24|             2.27|
+-------------------+---------+----------------+-----------------+



#### Loading - 8.3 Days When Sentiment Was High but Prices Dropped

In [34]:
query_to_s3 = """
SELECT 
  DATE,
  ROUND(sentiment, 3) AS sentiment,
  ROUND(((s_close - s_open)/s_open)*100, 2) AS stock_pct_change,
  ROUND(((c_close - c_open)/c_open)*100, 2) AS crypto_pct_change
FROM transformed_data
WHERE sentiment > 0.3 
  AND (((s_close - s_open)/s_open) < 0 OR ((c_close - c_open)/c_open) < 0)
ORDER BY DATE
"""

# Define the filename for storing crypto volume spike results as a Parquet file
parquet_file_name = "sentiment_high_prices_low.parquet"

# Execute the provided SQL query, store results locally as Parquet,
# and then upload the file to AWS S3
execute_query_and_upload_to_s3(query_to_s3, parquet_file_name)

                                                                                

Query results saved locally at: /tmp/sentiment_high_prices_low.parquet_dir
Final parquet file path: /tmp/sentiment_high_prices_low.parquet
File uploaded successfully: s3://ucl-de/data/sentiment_high_prices_low.parquet


#### 8.4 Days When Sentiment Was Low but Prices Rose

In [71]:
spark.sql("""
SELECT
    DATE,
    ROUND(sentiment, 3) AS sentiment,
    ROUND(((s_close - s_open)/s_open)*100, 2) AS stock_pct_change,
    ROUND(((c_close - c_open)/c_open)*100, 2) AS crypto_pct_change
FROM transformed_data
WHERE sentiment < 0.05
  AND (((s_close - s_open)/s_open) > 0 OR ((c_close - c_open)/c_open) > 0)
ORDER BY DATE
""").show()

                                                                                

+-------------------+---------+----------------+-----------------+
|               DATE|sentiment|stock_pct_change|crypto_pct_change|
+-------------------+---------+----------------+-----------------+
|2024-02-01 00:00:00|   -0.028|            0.63|             1.22|
|2024-02-26 00:00:00|   -0.121|            -0.9|             5.24|
|2024-12-16 00:00:00|    0.025|            1.22|             1.62|
+-------------------+---------+----------------+-----------------+



#### Loading - 8.4 Days When Sentiment Was Low but Prices Rose

In [35]:
query_to_s3 = """
SELECT
    DATE,
    ROUND(sentiment, 3) AS sentiment,
    ROUND(((s_close - s_open)/s_open)*100, 2) AS stock_pct_change,
    ROUND(((c_close - c_open)/c_open)*100, 2) AS crypto_pct_change
FROM transformed_data
WHERE sentiment < 0.05
  AND (((s_close - s_open)/s_open) > 0 OR ((c_close - c_open)/c_open) > 0)
ORDER BY DATE
"""

# Define the filename for storing crypto volume spike results as a Parquet file
parquet_file_name = "sentiment_low_prices_high.parquet"

# Execute the provided SQL query, store results locally as Parquet,
# and then upload the file to AWS S3
execute_query_and_upload_to_s3(query_to_s3, parquet_file_name)

                                                                                

Query results saved locally at: /tmp/sentiment_low_prices_high.parquet_dir
Final parquet file path: /tmp/sentiment_low_prices_high.parquet
File uploaded successfully: s3://ucl-de/data/sentiment_low_prices_high.parquet


#### 8.5 Weekly Avg Sentiment and Market Return

In [73]:
spark.sql("""
SELECT 
  WEEKOFYEAR(DATE) AS week_num,
  ROUND(AVG(sentiment), 3) AS avg_weekly_sentiment,
  ROUND(AVG((s_close - s_open)/s_open)*100, 2) AS avg_stock_return,
  ROUND(AVG((c_close - c_open)/c_open)*100, 2) AS avg_crypto_return
FROM transformed_data
GROUP BY WEEKOFYEAR(DATE)
ORDER BY week_num
""").show()

                                                                                

+--------+--------------------+----------------+-----------------+
|week_num|avg_weekly_sentiment|avg_stock_return|avg_crypto_return|
+--------+--------------------+----------------+-----------------+
|       1|                0.15|           -0.07|            -1.34|
|       4|               0.131|           -0.04|             1.55|
|       5|               0.035|           -0.23|             0.09|
|       6|               0.208|            0.41|             3.11|
|       7|               0.171|            0.39|             1.92|
|       9|               0.029|           -0.26|             6.35|
|      10|               0.087|           -0.25|            -0.45|
|      11|               0.158|           -0.06|             0.16|
|      12|                 0.1|            0.36|            -1.26|
|      13|               0.124|           -0.24|             0.54|
|      44|               0.124|           -0.66|             1.28|
|      45|               0.136|            0.96|             3

#### 8.6 Lagged Sentiment vs Future Returns

In [74]:
spark.sql("""
SELECT 
  DATE,
  ROUND(sentiment, 3) AS sentiment,
  LEAD(ROUND(((s_close - s_open)/s_open)*100), 1) OVER (ORDER BY DATE) AS next_day_stock_return,
  LEAD(ROUND(((c_close - c_open)/c_open)*100), 1) OVER (ORDER BY DATE) AS next_day_crypto_return
FROM transformed_data
""").show()

25/03/10 17:26:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:26:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:26:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:26:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:26:13 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:26:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 1

+-------------------+---------+---------------------+----------------------+
|               DATE|sentiment|next_day_stock_return|next_day_crypto_return|
+-------------------+---------+---------------------+----------------------+
|2024-01-02 00:00:00|     0.17|                  0.0|                  -5.0|
|2024-01-03 00:00:00|    0.179|                  0.0|                   0.0|
|2024-01-24 00:00:00|    0.125|                  0.0|                   0.0|
|2024-01-25 00:00:00|    0.204|                  0.0|                   5.0|
|2024-01-26 00:00:00|    0.064|                 -1.0|                  -1.0|
|2024-01-31 00:00:00|    0.097|                  1.0|                   1.0|
|2024-02-01 00:00:00|   -0.028|                  0.0|                   2.0|
|2024-02-08 00:00:00|    0.182|                  1.0|                   4.0|
|2024-02-09 00:00:00|    0.235|                  0.0|                   0.0|
|2024-02-13 00:00:00|    0.174|                  1.0|                   4.0|

#### 8.7 Sentiment Leading to Breakouts

In [81]:
# Big Next-Day Jumps After Sentiment Spikes
spark.sql("""
SELECT 
   DATE,
   sentiment,
   LEAD(((c_close - c_open)/c_open)*100, 1) OVER (ORDER BY DATE) AS next_day_crypto_return,
   LEAD(((s_close - s_open)/s_open)*100, 1) OVER (ORDER BY DATE) AS next_day_stock_return
FROM transformed_data
WHERE sentiment > 0.3
ORDER BY DATE
""").show()

25/03/10 17:31:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:31:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:31:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:31:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:31:17 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:31:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 1

+-------------------+------------------+----------------------+---------------------+
|               DATE|         sentiment|next_day_crypto_return|next_day_stock_return|
+-------------------+------------------+----------------------+---------------------+
|2024-03-13 00:00:00|0.3234941020785715|                  NULL|                 NULL|
+-------------------+------------------+----------------------+---------------------+



25/03/10 17:31:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/10 17:31:18 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.


#### 8.8 Big Move Days with Sentiment Neutrality


In [83]:
#Detect Market Movers Without Sentiment Spike
spark.sql("""
SELECT 
  DATE,
  sentiment,
  ((s_close - s_open)/s_open)*100 AS stock_move,
  ((c_close - c_open)/c_open)*100 AS crypto_move
FROM transformed_data
WHERE ABS(sentiment) < 0.05 AND (ABS((s_close - s_open)/s_open) > 3 OR ABS((c_close - c_open)/c_open) > 5)
""").show()

+----+---------+----------+-----------+
|DATE|sentiment|stock_move|crypto_move|
+----+---------+----------+-----------+
+----+---------+----------+-----------+



**Insights from Combined Analysis**


**8.1 Price Comparison Between Stocks & Crypto**
- Crypto markets show **greater daily volatility** compared to stock markets.
- Even on relatively flat stock days, crypto often exhibits **larger swings**, reinforcing the high-risk/high-reward profile of crypto assets.

**8.2 Sentiment vs Daily Returns**
- There appears to be a **moderate positive correlation** between sentiment polarity and market returns.
- However, the relationship is **not always consistent**, especially for stocks.

**8.3 High Sentiment but Prices Dropped**

- On **2024-03-13**, sentiment was high (0.323), yet stock prices **fell**, and crypto rose modestly.
- This shows **optimistic sentiment does not always lead to market gains**, possibly due to overhype or delayed reactions.

**8.4 Low Sentiment but Prices Rose**

- Several dates (e.g., 2024-02-01, 2024-02-26) saw **negative or neutral sentiment but strong market gains**.
- Suggests **market rebounds or hidden drivers** that sentiment analysis may miss.

**8.5 Weekly Avg Sentiment & Market Return**

- Some weeks with **positive sentiment** align with higher returns (Week 6, 7, 46).
- But some weeks show **positive sentiment with negative returns**, reinforcing that **sentiment alone isn't sufficient**.

**8.6 Lagged Sentiment vs Future Returns**

- No clear or strong predictive power of sentiment on **next-day returns**.
- Some high-sentiment days are followed by losses (e.g., crypto -5%), suggesting markets may **already price in sentiment**.

**8.7 Sentiment Spikes Leading to Breakouts**

- Limited evidence that **sentiment spikes lead to breakouts**.
- In fact, after high sentiment days, next-day returns were either missing or inconsistent — possibly due to **data gaps** or **weak lag correlation**.

**8.8 Big Move Days with Sentiment Neutrality**

- No results found.
- Suggests that **major market moves are typically accompanied by stronger sentiment**, reinforcing the link between news and volatility.

**Final Conclusion**

The analysis shows:
- **Crypto markets** are more volatile than stocks.
- **Sentiment is partially correlated** with market returns — especially crypto — but **not reliably predictive**.
- There are **exceptions** where sentiment and price movement diverge.
- Sentiment analysis adds **context but not certainty** to market prediction — combining it with technical/volume data may improve models.

This end-to-end querying builds a **360-degree view** of how public mood interacts with price behavior across asset classes.


## Loading

<div style="background-color: #cce5ff; padding: 10px; border-radius: 5px;">
    <strong>Notes:</strong>
    Combined with Transformations for clarity. Setup details can be found under 'Setting up S3 for Loading', while uploads are documented in sections 2.5, 3.1, 4.1, 8.2, 8.3, and 8.4, each titled with the prefix 'Loading - '.
</div>

## Querying the Warehouse

<div style="background-color: #cce5ff; padding: 10px; border-radius: 5px;">
    <strong>Notes:</strong>
    Thanks to the comprehensive work completed during the ETL phases, queries can now be executed with ease, offering low latency and improved clarity, as demonstrated below.
</div>

In [6]:
pip install duckdb

Collecting duckdb
  Using cached duckdb-1.2.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (966 bytes)
Using cached duckdb-1.2.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (20.2 MB)
Installing collected packages: duckdb
Successfully installed duckdb-1.2.1
Note: you may need to restart the kernel to use updated packages.


In [7]:
import duckdb

#### Querying - 2.5 Volume Spikes

In [10]:
# Set S3 credentials (skip if public bucket)
duckdb.sql(f"""
SET s3_region='{os.environ["AWS_REGION"]}';
SET s3_access_key_id='{os.environ["AWS_ACCESS_KEY_ID"]}';
SET s3_secret_access_key='{os.environ["AWS_SECRET_ACCESS_KEY"]}';
""")

# Query directly from S3
df = duckdb.sql("""
SELECT *
FROM 's3://ucl-de/data/crypto_top_volume_spikes.parquet'
""").df()

df.head()

Unnamed: 0,asset,date,volume_spike,type
0,TRX,2024-12-03,15.97,Crypto
1,TRX,2024-08-20,8.53,Crypto
2,ETH,2024-08-05,7.38,Crypto
3,LINK,2024-08-05,6.71,Crypto
4,AVAX,2024-06-22,6.28,Crypto


#### Querying - 3.1 Daily Return

In [11]:
# Set S3 credentials (skip if public bucket)
duckdb.sql(f"""
SET s3_region='{os.environ["AWS_REGION"]}';
SET s3_access_key_id='{os.environ["AWS_ACCESS_KEY_ID"]}';
SET s3_secret_access_key='{os.environ["AWS_SECRET_ACCESS_KEY"]}';
""")

# Query directly from S3
df = duckdb.sql("""
SELECT *
FROM 's3://ucl-de/data/crypto_daily_return.parquet'
""").df()

df.head()

Unnamed: 0,coin_name,date,daily_return
0,ADA,2024-01-02,-0.0293
1,ADA,2024-01-03,-0.0795
2,ADA,2024-01-04,0.0237
3,ADA,2024-01-05,-0.0493
4,ADA,2024-01-06,-0.0354


#### Querying - 4.1 Rolling 7-day volatility

In [12]:
# Set S3 credentials (skip if public bucket)
duckdb.sql(f"""
SET s3_region='{os.environ["AWS_REGION"]}';
SET s3_access_key_id='{os.environ["AWS_ACCESS_KEY_ID"]}';
SET s3_secret_access_key='{os.environ["AWS_SECRET_ACCESS_KEY"]}';
""")

# Query directly from S3
df = duckdb.sql("""
SELECT *
FROM 's3://ucl-de/data/crypto_volatility.parquet'
""").df()

df.head()

Unnamed: 0,coin_name,date,rolling_volatility
0,ADA,2024-01-03,0.0354
1,ADA,2024-01-04,0.0516
2,ADA,2024-01-05,0.0434
3,ADA,2024-01-06,0.0376
4,ADA,2024-01-07,0.0346


#### Querying - 8.2 Sentiment vs Daily Returns

In [13]:
# Set S3 credentials (skip if public bucket)
duckdb.sql(f"""
SET s3_region='{os.environ["AWS_REGION"]}';
SET s3_access_key_id='{os.environ["AWS_ACCESS_KEY_ID"]}';
SET s3_secret_access_key='{os.environ["AWS_SECRET_ACCESS_KEY"]}';
""")

# Query directly from S3
df = duckdb.sql("""
SELECT *
FROM 's3://ucl-de/data/sentiment_vs_daily_return.parquet'
""").df()

df.head()

Unnamed: 0,DATE,stock_pct_change,crypto_pct_change,avg_sentiment
0,2024-01-02,-0.32,1.69,0.17
1,2024-01-03,-0.03,-4.78,0.179
2,2024-01-24,0.0,0.44,0.125
3,2024-01-25,-0.14,-0.37,0.204
4,2024-01-26,0.03,4.57,0.064


#### Querying - 8.3 Days When Sentiment Was High but Prices Dropped

In [14]:
# Set S3 credentials (skip if public bucket)
duckdb.sql(f"""
SET s3_region='{os.environ["AWS_REGION"]}';
SET s3_access_key_id='{os.environ["AWS_ACCESS_KEY_ID"]}';
SET s3_secret_access_key='{os.environ["AWS_SECRET_ACCESS_KEY"]}';
""")

# Query directly from S3
df = duckdb.sql("""
SELECT *
FROM 's3://ucl-de/data/sentiment_high_prices_low.parquet'
""").df()

df.head()

Unnamed: 0,DATE,sentiment,stock_pct_change,crypto_pct_change
0,2024-03-13,0.323,-0.24,2.27


#### Querying - 8.4 Days When Sentiment Was Low but Prices Rose

In [15]:
# Set S3 credentials (skip if public bucket)
duckdb.sql(f"""
SET s3_region='{os.environ["AWS_REGION"]}';
SET s3_access_key_id='{os.environ["AWS_ACCESS_KEY_ID"]}';
SET s3_secret_access_key='{os.environ["AWS_SECRET_ACCESS_KEY"]}';
""")

# Query directly from S3
df = duckdb.sql("""
SELECT *
FROM 's3://ucl-de/data/sentiment_low_prices_high.parquet'
""").df()

df.head()

Unnamed: 0,DATE,sentiment,stock_pct_change,crypto_pct_change
0,2024-02-01,-0.028,0.63,1.22
1,2024-02-26,-0.121,-0.9,5.22
2,2024-12-16,0.025,1.22,1.62
