In [53]:
import os

from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from cassandra.auth import PlainTextAuthProvider
from cassandra import WriteTimeout

from pyspark.sql import SparkSession
from pyspark.sql.functions import col

import csv

from fredapi import Fred

import pandas as pd

import yfinance as yf
from datetime import datetime, timedelta
import time
from tqdm import tqdm

In [2]:
# Load ticker symbols from nasdaq screener csv file
def load_tickers(filepath):
    df = pd.read_csv(filepath)
    return df.iloc[:, 0].tolist()

In [3]:
# Fetch historical data for a given ticker
def fetch_stock_data(ticker):
    # Calculate the date 5 years ago from today
    end_date = datetime.now()
    start_date = end_date - timedelta(days=365*5)
    
    # Fetch historical data
    data = yf.download(ticker, start=start_date.strftime('%Y-%m-%d'), end=end_date.strftime('%Y-%m-%d'))
    return data

In [None]:
# Load ticker symbols
tickers = load_tickers('nasdaq_screener.csv')

# Create a subfolder for the financial data
output_dir = 'fin_data'
os.makedirs(output_dir, exist_ok=True)

# Dictionary to hold data for all tickers
all_data = {}

# Loop through each ticker and fetch its data
index = 0
for ticker in tickers:
    try:
        data = fetch_stock_data(ticker)
        all_data[ticker] = data
        # Save data
        data.to_csv(os.path.join(output_dir, f'{ticker}_5_years_data.csv'))
    except Exception as e:
        print(f"Failed to fetch data for {ticker}: {e}")

In [None]:
# Define the directory containing the CSV files
source_dir = os.path.expanduser('fin_data/')

# List all files in the directory
files = os.listdir(source_dir)

# Loop through each file
for file in files:
    file_path = os.path.join(source_dir, file)
    
    # Check if the file is a CSV file
    if file.endswith('.csv'):
        try:
            # Load the CSV file
            df = pd.read_csv(file_path)
            
            # Check if the file is empty or contains NA values
            if df.empty or df.isna().any().any():
                os.remove(file_path)  # Remove the file if it's empty or contains NA
                print(f"Deleted '{file}' because it was empty or contained NA values.")
        except Exception as e:
            print(f"Error processing {file}: {e}")

print("Cleanup complete.")


In [2]:
def create_schema(session):
    # CQL to create a keyspace
    session.execute("""
        CREATE KEYSPACE IF NOT EXISTS fin_data WITH replication = {
            'class': 'SimpleStrategy', 
            'replication_factor': 1
        }
    """)

    # CQL to use the keyspace
    session.execute("USE fin_data")

    # CQL to create a table
    session.execute("""
        CREATE TABLE IF NOT EXISTS ohlcv (
            symbol text,
            date timestamp,
            open double,
            high double,
            low double,
            close double,
            adj_close double,
            volume bigint,
            PRIMARY KEY (symbol, date)
        )
    """)

In [3]:
def parse_date(date_str):
    return datetime.strptime(date_str, '%Y-%m-%d')

In [4]:
def execute_with_retries(session, batch, max_retries=5):
    retries = 0
    backoff_time = 2  # Start with 2 seconds backoff time
    while retries < max_retries:
        try:
            session.execute(batch)
            break  # Break the loop if execution is successful
        except WriteTimeout:
            print(f"WriteTimeout: retrying {retries+1}/{max_retries} after {backoff_time}s...")
            time.sleep(backoff_time)  # Sleep for backoff_time seconds before retrying
            backoff_time *= 2  # Double the backoff time for the next retry
            retries += 1
    if retries == max_retries:
        print("Failed to execute batch after several retries.")

In [5]:
def load_data(session, file_path, symbol):
    batch = BatchStatement()
    batch_size = 0  # Track the size of the batch

    with open(file_path, newline='') as csvfile:
        reader = csv.DictReader(csvfile)
        for row in reader:
            batch.add(
                """
                INSERT INTO fin_data.ohlcv (symbol, date, open, high, low, close, adj_close, volume)
                VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
                """,
                (symbol, parse_date(row['Date']), float(row['Open']), float(row['High']),
                 float(row['Low']), float(row['Close']), float(row['Adj Close']), int(row['Volume']))
            )
            batch_size += 1
            if batch_size >= 100:  # Execute batch after collecting 50 statements
                execute_with_retries(session, batch)
                batch = BatchStatement()  # Reset batch after execution
                batch_size = 0  # Reset batch size

        if batch_size > 0:  # Ensure any remaining statements are executed
            execute_with_retries(session, batch)

        print(f"Data from {symbol} loaded successfully.")

In [7]:
def drop_table(keyspace_name, table_name):
    """ Drop a table in a given keyspace """
    # Set up connection parameters
    auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')  # Update credentials if needed
    cluster = Cluster(['127.0.0.1'], auth_provider=auth_provider)  # Adjust IP if Cassandra is hosted elsewhere
    session = cluster.connect(keyspace_name)  # Connect to the specified keyspace

    # Drop table
    try:
        session.execute(f"DROP TABLE IF EXISTS {table_name};")
        print(f"Table {table_name} has been dropped successfully.")
    except Exception as e:
        print(f"An error occurred while dropping the table: {e}")
    finally:
        # Clean up, close the session and cluster connection
        session.shutdown()
        cluster.shutdown()

In [8]:
def query_symbol_data(symbol, start_date, end_date):
    
    cluster = Cluster(['127.0.0.1'])  # Adjust if Cassandra is hosted elsewhere
    session = cluster.connect('fin_data')

    # Prepare the query
    query = """
        SELECT * FROM ohlcv WHERE symbol = %s AND date >= %s AND date <= %s;
    """
    # Convert string dates to datetime objects
    start_dt = datetime.strptime(start_date, '%Y-%m-%d')
    end_dt = datetime.strptime(end_date, '%Y-%m-%d')

    # Execute the query
    try:
        rows = session.execute(query, (symbol, start_dt, end_dt))
        for row in rows:
            print(row)
    except Exception as e:
        print(f"An error occurred: {e}")
    finally:
        # Clean up, close the session and cluster connection
        session.shutdown()
        cluster.shutdown()

In [9]:
def create_table():
    # Connect to Cassandra
    # Adjust the connection settings as needed for your setup
    auth_provider = PlainTextAuthProvider(username='cassandra', password='cassandra')  # Update with actual credentials
    cluster = Cluster(['127.0.0.1'], auth_provider=auth_provider)
    session = cluster.connect()

    # Create the schema
    create_schema(session)

    print("Keyspace and table created successfully.")

    # Clean up
    session.shutdown()
    cluster.shutdown()

In [57]:
create_table()



Keyspace and table created successfully.


In [58]:
cluster = Cluster(['127.0.0.1'])  # Adjust if Cassandra is hosted elsewhere
session = cluster.connect()

i = 0
data_dir = '/home/xca7/Desktop/Storage-Solutions-for-Big-Data-CA1/fin_data'
for filename in tqdm(os.listdir(data_dir)):
    if i > 100:
        break
    i += 1
    if filename.endswith('.csv'):
        symbol = filename.split('_')[0]  # Extract the symbol from the filename
        load_data(session, os.path.join(data_dir, filename), symbol)

session.shutdown()
cluster.shutdown()

  0%|          | 1/6638 [00:00<1:31:24,  1.21it/s]

Data from PEBK loaded successfully.


  0%|          | 2/6638 [00:01<1:28:25,  1.25it/s]

Data from ARR loaded successfully.


  0%|          | 3/6638 [00:01<1:07:15,  1.64it/s]

Data from GCTS loaded successfully.


  0%|          | 4/6638 [00:02<1:04:35,  1.71it/s]

Data from HYFM loaded successfully.


  0%|          | 5/6638 [00:03<1:07:54,  1.63it/s]

Data from ALVR loaded successfully.


  0%|          | 6/6638 [00:03<1:05:02,  1.70it/s]

Data from CLBT loaded successfully.


  0%|          | 8/6638 [00:04<47:20,  2.33it/s]  

Data from FA loaded successfully.
Data from ZURA loaded successfully.


  0%|          | 9/6638 [00:05<1:00:28,  1.83it/s]

Data from ANTE loaded successfully.


  0%|          | 10/6638 [00:05<1:08:37,  1.61it/s]

Data from GORO loaded successfully.


  0%|          | 11/6638 [00:06<1:14:18,  1.49it/s]

Data from SEEL loaded successfully.


  0%|          | 12/6638 [00:07<1:18:41,  1.40it/s]

Data from LDOS loaded successfully.


  0%|          | 13/6638 [00:08<1:21:07,  1.36it/s]

Data from MGA loaded successfully.


  0%|          | 14/6638 [00:08<1:14:09,  1.49it/s]

Data from SNTI loaded successfully.


  0%|          | 15/6638 [00:09<1:18:20,  1.41it/s]

Data from PXS loaded successfully.


  0%|          | 16/6638 [00:10<1:20:47,  1.37it/s]

Data from PRMW loaded successfully.


  0%|          | 17/6638 [00:10<1:05:27,  1.69it/s]

Data from YOSH loaded successfully.


  0%|          | 18/6638 [00:11<1:12:03,  1.53it/s]

Data from OSUR loaded successfully.
Data from GROMW loaded successfully.


  0%|          | 20/6638 [00:12<58:27,  1.89it/s]  

Data from FNLC loaded successfully.


  0%|          | 21/6638 [00:15<2:03:44,  1.12s/it]

Data from PKG loaded successfully.


  0%|          | 22/6638 [00:15<1:44:36,  1.05it/s]

Data from KRNL loaded successfully.


  0%|          | 23/6638 [00:16<1:36:08,  1.15it/s]

Data from BILL loaded successfully.


  0%|          | 24/6638 [00:17<1:32:38,  1.19it/s]

Data from AFBI loaded successfully.


  0%|          | 25/6638 [00:17<1:30:36,  1.22it/s]

Data from JNJ loaded successfully.


  0%|          | 26/6638 [00:18<1:29:13,  1.24it/s]

Data from CHRS loaded successfully.


  0%|          | 27/6638 [00:19<1:27:34,  1.26it/s]

Data from TRAW loaded successfully.


  0%|          | 28/6638 [00:20<1:26:59,  1.27it/s]

Data from APLS loaded successfully.


  0%|          | 29/6638 [00:20<1:26:19,  1.28it/s]

Data from TDC loaded successfully.


  0%|          | 30/6638 [00:21<1:25:23,  1.29it/s]

Data from NYMT loaded successfully.


  0%|          | 31/6638 [00:22<1:24:05,  1.31it/s]

Data from OMAB loaded successfully.


  0%|          | 32/6638 [00:22<1:10:25,  1.56it/s]

Data from ALVO loaded successfully.


  0%|          | 33/6638 [00:23<1:12:04,  1.53it/s]

Data from SMTC loaded successfully.


  1%|          | 34/6638 [00:24<1:21:23,  1.35it/s]

Data from GROM loaded successfully.


  1%|          | 35/6638 [00:25<1:22:02,  1.34it/s]

Data from BBVA loaded successfully.


  1%|          | 36/6638 [00:25<1:06:57,  1.64it/s]

Data from TNON loaded successfully.


  1%|          | 38/6638 [00:26<48:24,  2.27it/s]  

Data from HTOO loaded successfully.
Data from VSTS loaded successfully.


  1%|          | 39/6638 [00:26<57:43,  1.91it/s]

Data from WU loaded successfully.
Data from GMFIW loaded successfully.


  1%|          | 41/6638 [00:27<49:00,  2.24it/s]

Data from NTNX loaded successfully.
Data from MNDR loaded successfully.


  1%|          | 43/6638 [00:27<37:56,  2.90it/s]

Data from JTAI loaded successfully.


  1%|          | 44/6638 [00:28<46:42,  2.35it/s]

Data from GAME loaded successfully.


  1%|          | 45/6638 [00:29<54:01,  2.03it/s]

Data from PZZA loaded successfully.


  1%|          | 47/6638 [00:30<47:29,  2.31it/s]

Data from MSN loaded successfully.
Data from GLADZ loaded successfully.


  1%|          | 48/6638 [00:30<55:45,  1.97it/s]

Data from BCSF loaded successfully.


  1%|          | 49/6638 [00:31<1:02:22,  1.76it/s]

Data from LOMA loaded successfully.


  1%|          | 50/6638 [00:32<1:02:40,  1.75it/s]

Data from FRES loaded successfully.


  1%|          | 51/6638 [00:33<1:13:53,  1.49it/s]

Data from WMPN loaded successfully.
Data from AIMDW loaded successfully.


  1%|          | 53/6638 [00:33<59:37,  1.84it/s]  

Data from WKC loaded successfully.


  1%|          | 54/6638 [00:34<1:04:38,  1.70it/s]

Data from JEF loaded successfully.


  1%|          | 55/6638 [00:35<1:07:48,  1.62it/s]

Data from GTE loaded successfully.


  1%|          | 56/6638 [00:35<1:10:43,  1.55it/s]

Data from WFCF loaded successfully.


  1%|          | 57/6638 [00:36<1:12:18,  1.52it/s]

Data from FFIV loaded successfully.


  1%|          | 58/6638 [00:37<1:13:08,  1.50it/s]

Data from NDAQ loaded successfully.


  1%|          | 60/6638 [00:38<57:45,  1.90it/s]  

Data from CPT loaded successfully.
Data from CWD loaded successfully.


  1%|          | 61/6638 [00:38<1:03:01,  1.74it/s]

Data from SXTC loaded successfully.


  1%|          | 62/6638 [00:39<1:06:44,  1.64it/s]

Data from RENB loaded successfully.


  1%|          | 63/6638 [00:40<1:09:54,  1.57it/s]

Data from EFSC loaded successfully.
Data from ONFOW loaded successfully.


  1%|          | 65/6638 [00:41<55:52,  1.96it/s]  

Data from MIN loaded successfully.


  1%|          | 66/6638 [00:41<52:35,  2.08it/s]

Data from HTZ loaded successfully.
Data from OCSAW loaded successfully.


  1%|          | 68/6638 [00:42<51:39,  2.12it/s]

Data from NERV loaded successfully.


  1%|          | 69/6638 [00:43<57:22,  1.91it/s]

Data from FIVN loaded successfully.


  1%|          | 70/6638 [00:43<1:03:48,  1.72it/s]

Data from XPL loaded successfully.


  1%|          | 71/6638 [00:44<1:06:57,  1.63it/s]

Data from SCL loaded successfully.


  1%|          | 72/6638 [00:45<1:10:30,  1.55it/s]

Data from EURN loaded successfully.


  1%|          | 73/6638 [00:45<1:04:04,  1.71it/s]

Data from JZXN loaded successfully.


  1%|          | 74/6638 [00:46<56:02,  1.95it/s]  

Data from DRCT loaded successfully.


  1%|          | 75/6638 [00:46<56:46,  1.93it/s]

Data from LI loaded successfully.


  1%|          | 76/6638 [00:47<1:02:47,  1.74it/s]

Data from MTRN loaded successfully.


  1%|          | 77/6638 [00:47<1:00:00,  1.82it/s]

Data from OPT loaded successfully.


  1%|          | 78/6638 [00:48<1:03:36,  1.72it/s]

Data from CHPT loaded successfully.


  1%|          | 80/6638 [00:49<52:39,  2.08it/s]  

Data from INLX loaded successfully.
Data from CLCO loaded successfully.
Data from SDAWW loaded successfully.


  1%|          | 82/6638 [00:50<47:06,  2.32it/s]

Data from PIM loaded successfully.


  1%|▏         | 83/6638 [00:50<1:00:16,  1.81it/s]

Data from TXT loaded successfully.


  1%|▏         | 84/6638 [00:51<52:15,  2.09it/s]  

Data from SCCG loaded successfully.


  1%|▏         | 85/6638 [00:51<51:04,  2.14it/s]

Data from LDTC loaded successfully.


  1%|▏         | 86/6638 [00:51<45:12,  2.42it/s]

Data from ILAG loaded successfully.


  1%|▏         | 87/6638 [00:52<54:08,  2.02it/s]

Data from BGR loaded successfully.


  1%|▏         | 88/6638 [00:53<59:50,  1.82it/s]

Data from OPXS loaded successfully.


  1%|▏         | 89/6638 [00:54<1:05:36,  1.66it/s]

Data from BROG loaded successfully.
Data from STSSW loaded successfully.


  1%|▏         | 91/6638 [00:54<53:30,  2.04it/s]  

Data from PAAS loaded successfully.


  1%|▏         | 92/6638 [00:55<59:09,  1.84it/s]

Data from KMDA loaded successfully.


  1%|▏         | 93/6638 [00:56<1:03:07,  1.73it/s]

Data from VEV loaded successfully.


  1%|▏         | 94/6638 [00:56<1:06:50,  1.63it/s]

Data from THRY loaded successfully.


  1%|▏         | 95/6638 [00:57<58:19,  1.87it/s]  

Data from MTEK loaded successfully.


  1%|▏         | 96/6638 [00:57<1:04:00,  1.70it/s]

Data from CECO loaded successfully.


  1%|▏         | 97/6638 [00:58<1:07:33,  1.61it/s]

Data from UBX loaded successfully.


  1%|▏         | 98/6638 [00:59<1:09:59,  1.56it/s]

Data from FHTX loaded successfully.


  1%|▏         | 99/6638 [00:59<1:11:52,  1.52it/s]

Data from ADUS loaded successfully.


  2%|▏         | 100/6638 [01:00<1:13:07,  1.49it/s]

Data from YUM loaded successfully.


  2%|▏         | 101/6638 [01:01<1:06:11,  1.65it/s]

Data from EMKR loaded successfully.





In [55]:
#drop_table('fin_data', 'ohlcv')



Table ohlcv has been dropped successfully.


In [54]:
def create_spark_session():
    spark = SparkSession.builder \
        .appName("Cassandra Integration Example") \
        .config("spark.cassandra.connection.host", "localhost") \
        .config("spark.jars.packages", "com.datastax.spark:spark-cassandra-connector_2.12:3.5.0") \
        .getOrCreate()
    return spark

In [None]:
# Initialize Spark session
spark = create_spark_session()

In [26]:
def list_unique_symbols(spark, keyspace, table):
    # Load the data from Cassandra into a DataFrame
    df = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(keyspace=keyspace, table=table) \
        .load()
    
    # Select distinct symbols and collect them into a list
    symbols = df.select("symbol").distinct().collect()
    
    # Extract symbols from rows
    symbol_list = [row['symbol'] for row in symbols]
    print("Unique symbols:", symbol_list)
    return symbol_list

In [39]:
keyspace = 'fin_data'
table = 'ohlcv'
list_unique_symbols(spark, keyspace, table)
print()




Unique symbols: ['ARR', 'PEBK', 'HYFM', 'ZURA', 'CLBT', 'GORO', 'ANTE', 'GCTS', 'ALVR', 'SEEL', 'FA']



                                                                                

In [41]:
def query_symbol_data(spark, keyspace, table, symbol, start_date, end_date):
    # Load data from the specified table and keyspace
    df = spark.read \
        .format("org.apache.spark.sql.cassandra") \
        .options(keyspace=keyspace, table=table) \
        .load()

    # Filter data for the specified symbol and date range
    filtered_df = df.filter(
        (col("symbol") == symbol) & 
        (col("date") >= start_date) & 
        (col("date") <= end_date)
    )

    # Convert to Pandas DataFrame
    pandas_df = filtered_df.toPandas()
    return pandas_df

In [48]:
# Example usage

keyspace = 'fin_data'
table = 'ohlcv'
symbol = 'PEBK'  # Example symbol
start_date = '2019-01-01'  # Example start date
end_date = '2024-01-31'  # Example end date
    
query_symbol_data(spark, keyspace, table, symbol, start_date, end_date)


Unnamed: 0,symbol,date,adj_close,close,high,low,open,volume
0,PEBK,2019-04-23 01:00:00,25.188534,27.450001,27.450001,27.309999,27.309999,1600
1,PEBK,2019-04-24 01:00:00,24.885725,27.120001,27.459999,26.750000,27.450001,2400
2,PEBK,2019-04-25 01:00:00,24.968306,27.209999,27.750000,27.209999,27.750000,2300
3,PEBK,2019-04-26 01:00:00,24.922428,27.160000,27.790001,26.900000,27.309999,138100
4,PEBK,2019-04-29 01:00:00,25.142654,27.400000,27.730000,27.070000,27.070000,18200
...,...,...,...,...,...,...,...,...
1198,PEBK,2024-01-25 00:00:00,29.670000,29.670000,29.750000,29.540001,29.650000,3200
1199,PEBK,2024-01-26 00:00:00,29.799999,29.799999,29.959999,29.799999,29.959999,3700
1200,PEBK,2024-01-29 00:00:00,29.670000,29.670000,29.799999,29.500000,29.590000,6600
1201,PEBK,2024-01-30 00:00:00,29.570000,29.570000,29.879999,29.570000,29.610001,1600


In [None]:
# Lets construct some portfolios and see how they perform!