In [31]:
!pip install cassandra-driver pandas
import pandas as pd
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import SimpleStatement
import json
from datetime import datetime




In [2]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

# Path to your secure connect bundle and token JSON file
secure_connect_bundle_path = "secure-connect-stockdata.zip"
token_file_path = "StockData-token.json"

# Load the token file to extract credentials
with open(token_file_path) as f:
    secrets = json.load(f)

# Extract the client ID and client secret from the token
CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

# Define the authentication provider using the extracted credentials
auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)

# Set up the cluster connection using the secure connect bundle and authentication provider
cloud_config = {
    'secure_connect_bundle': secure_connect_bundle_path
}

# Create a Cassandra cluster connection
cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

session.set_keyspace('tickers')

# Verify the connection by executing a simple query (optional)
row = session.execute("select release_version from system.local").one()
if row:
    print(f"Cassandra Version: {row[0]}")
else:
    print("An error occurred.")




Cassandra Version: 4.0.0.6816


In [3]:
import pandas as pd

# Load the CSV file
file_path = r'C:\Users\rysad\OneDrive\Desktop\Data Science\Fall 2024\Big Data Tools & Techniques\Final Assignment\Stocktickersdata\tickers_data.csv'  # Updated path
data = pd.read_csv(file_path)

# Show the first few rows and check the column names
print("Sample Data:")
print(data.head())

# Confirm the total rows and columns
print(f"\nNumber of Rows: {data.shape[0]}, Number of Columns: {data.shape[1]}")


Sample Data:
       ticker                             name  market  locale  active  \
0     X:00USD  00 Token - United States dollar  crypto  global    True   
1  X:1INCHUSD     1inch - United States dollar  crypto  global    True   
2     X:A8USD  Ancient8 - United States dollar  crypto  global    True   
3   X:AAVEUSD      Aave - United States dollar  crypto  global    True   
4    X:ABTUSD  Arcblock - United States dollar  crypto  global    True   

  currency_symbol         currency_name base_currency_symbol  \
0             USD  United States dollar                   00   
1             USD  United States dollar                1INCH   
2             USD  United States dollar                   A8   
3             USD  United States dollar                 AAVE   
4             USD  United States dollar                  ABT   

  base_currency_name      last_updated_utc  
0           00 Token  2017-01-01T00:00:00Z  
1              1inch  2017-01-01T00:00:00Z  
2           Ancient8  

In [4]:
# Check the column names and data types in your CSV
print("Column names and data types:")
print(data.dtypes)

# Show first few rows of the data to inspect it
print("Sample Data:")
print(data.head())


Column names and data types:
ticker                  object
name                    object
market                  object
locale                  object
active                    bool
currency_symbol         object
currency_name           object
base_currency_symbol    object
base_currency_name      object
last_updated_utc        object
dtype: object
Sample Data:
       ticker                             name  market  locale  active  \
0     X:00USD  00 Token - United States dollar  crypto  global    True   
1  X:1INCHUSD     1inch - United States dollar  crypto  global    True   
2     X:A8USD  Ancient8 - United States dollar  crypto  global    True   
3   X:AAVEUSD      Aave - United States dollar  crypto  global    True   
4    X:ABTUSD  Arcblock - United States dollar  crypto  global    True   

  currency_symbol         currency_name base_currency_symbol  \
0             USD  United States dollar                   00   
1             USD  United States dollar                1INCH 

In [5]:
create_table_query = """
    CREATE TABLE IF NOT EXISTS stock_data (
        unique_id UUID PRIMARY KEY,
        ticker TEXT,
        name TEXT,
        market TEXT,
        locale TEXT,
        active BOOLEAN,
        currency_symbol TEXT,
        currency_name TEXT,
        base_currency_symbol TEXT,
        base_currency_name TEXT,
        last_updated_utc TEXT
    );
"""

# Execute the query to create the table
try:
    session.execute(create_table_query)
    print("Table 'stock_data' created successfully!")
except Exception as e:
    print(f"Error creating table: {e}")


Table 'stock_data' created successfully!


In [17]:
import pandas as pd
import numpy as np
from cassandra.query import BatchStatement
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import uuid
from datetime import datetime

# Load CSV data
csv_file_path = r"C:\Users\rysad\OneDrive\Desktop\Data Science\Fall 2024\Big Data Tools & Techniques\Final Assignment\Stocktickersdata\tickers_data.csv"
data = pd.read_csv(csv_file_path)

# Convert 'last_updated_utc' to datetime format
data['last_updated_utc'] = pd.to_datetime(data['last_updated_utc'], errors='coerce')

# Replace NaN values with appropriate defaults
data = data.replace({np.nan: None})  # Cassandra supports None for NULL

# Prepare the INSERT query for Cassandra
insert_query = session.prepare("""
    INSERT INTO stock_data (
        unique_id, ticker, name, market, locale, active, 
        currency_symbol, currency_name, base_currency_symbol, 
        base_currency_name, last_updated_utc
    ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""")

# Batch insert
batch_size = 1000
counter = 0
batch = BatchStatement()

# Iterate over the rows and insert data into the table
for _, row in data.iterrows():
    batch.add(insert_query, (
        uuid.uuid4(),  # Generate a unique UUID for the `unique_id`
        row['ticker'], 
        row['name'], 
        row['market'], 
        row['locale'], 
        bool(row['active']) if row['active'] else False,  # Convert 'active' to boolean
        row['currency_symbol'], 
        row['currency_name'], 
        row['base_currency_symbol'], 
        row['base_currency_name'], 
        str(row['last_updated_utc']) if row['last_updated_utc'] else None  # Convert to string or set to None
    ))
    
    counter += 1
    if counter % batch_size == 0:  # Execute the batch after `batch_size` rows
        session.execute(batch)
        print(f"Inserted {counter} rows")
        batch.clear()

# Insert any remaining rows
if counter % batch_size != 0:
    session.execute(batch)
    print(f"Inserted remaining {counter % batch_size} rows")


# Final message
print(f"Bulk insert completed! Total rows inserted: {counter}")


Inserted 1000 rows
Inserted 2000 rows
Inserted 3000 rows
Inserted 4000 rows
Inserted 5000 rows
Inserted 6000 rows
Inserted 7000 rows
Inserted 8000 rows
Inserted 9000 rows
Inserted 10000 rows
Inserted 11000 rows
Inserted 12000 rows
Inserted 13000 rows
Inserted 14000 rows
Inserted 15000 rows
Inserted 16000 rows
Inserted 17000 rows
Inserted 18000 rows
Inserted 19000 rows
Inserted 20000 rows
Inserted 21000 rows
Inserted 22000 rows
Inserted 23000 rows
Inserted 24000 rows
Inserted 25000 rows
Inserted 26000 rows
Inserted 27000 rows
Inserted 28000 rows
Inserted 29000 rows
Inserted 30000 rows
Inserted 31000 rows
Inserted 32000 rows
Inserted 33000 rows
Inserted 34000 rows
Inserted 35000 rows
Inserted 36000 rows
Inserted 37000 rows
Inserted 38000 rows
Inserted 39000 rows
Inserted 40000 rows
Inserted 41000 rows
Inserted 42000 rows
Inserted 43000 rows
Inserted 44000 rows
Inserted 45000 rows
Inserted 46000 rows
Inserted 47000 rows
Inserted 48000 rows
Inserted 49000 rows
Inserted 50000 rows
Inserted 

In [18]:
import numpy as np

# Replace NaN values with an empty string
data = data.replace({np.nan: ''})

# Confirm there are no NaN values and print a sample of the data
print("NaN values replaced. Data sample:")
print(data.head())

# Confirm there are no more NaN values
print(f"Check for NaN values after replacement: {data.isna().sum()}")


NaN values replaced. Data sample:
       ticker                             name  market  locale  active  \
0     X:00USD  00 Token - United States dollar  crypto  global    True   
1  X:1INCHUSD     1inch - United States dollar  crypto  global    True   
2     X:A8USD  Ancient8 - United States dollar  crypto  global    True   
3   X:AAVEUSD      Aave - United States dollar  crypto  global    True   
4    X:ABTUSD  Arcblock - United States dollar  crypto  global    True   

  currency_symbol         currency_name base_currency_symbol  \
0             USD  United States dollar                   00   
1             USD  United States dollar                1INCH   
2             USD  United States dollar                   A8   
3             USD  United States dollar                 AAVE   
4             USD  United States dollar                  ABT   

  base_currency_name          last_updated_utc  
0           00 Token 2017-01-01 00:00:00+00:00  
1              1inch 2017-01-01 00:00:

In [65]:
# Drop the existing bronze_stock_data table if it exists
drop_query = "DROP TABLE IF EXISTS bronze_stock_data"
session.execute(drop_query)

print("Existing 'bronze_stock_data' table dropped successfully.")


Existing 'bronze_stock_data' table dropped successfully.


In [66]:


# query to create the bronze_stock_data table
create_table_query = """
CREATE TABLE IF NOT EXISTS bronze_stock_data (
    unique_id UUID PRIMARY KEY,
    ticker TEXT,
    name TEXT,
    market TEXT,
    locale TEXT,
    active BOOLEAN,
    currency_symbol TEXT,
    currency_name TEXT,
    base_currency_symbol TEXT,
    base_currency_name TEXT,
    last_updated_utc TEXT
);
"""

# Execute the query to create the table
session.execute(create_table_query)

print("Table 'bronze_stock_data' created successfully.")


Table 'bronze_stock_data' created successfully.


In [67]:
from cassandra.query import BatchStatement
import uuid

# Query to fetch data from 'stock_data' table
select_query = "SELECT ticker, name, market, locale, active, currency_symbol, currency_name, base_currency_symbol, base_currency_name, last_updated_utc FROM stock_data"

# Execute the select query to fetch rows
rows = session.execute(select_query)

# Define the insert query for 'bronze_stock_data' table
insert_query = """
    INSERT INTO bronze_stock_data (
        unique_id, ticker, name, market, locale, active, 
        currency_symbol, currency_name, base_currency_symbol, 
        base_currency_name, last_updated_utc
    )
    VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
"""

# Prepare the statement for binding parameters
prepared_statement = session.prepare(insert_query)

# Create a batch statement
batch = BatchStatement()

# Number of rows per batch (adjust based on your use case)
batch_size = 50

# Variable to track the total number of successful rows inserted
total_inserted = 0

# Insert data into bronze_stock_data table in batches
for i, row in enumerate(rows):
    bound_statement = prepared_statement.bind(( 
        uuid.uuid4(),  # Generate a unique UUID for each row
        row.ticker,
        row.name,
        row.market,
        row.locale,
        row.active,
        row.currency_symbol,
        row.currency_name,
        row.base_currency_symbol,
        row.base_currency_name,
        row.last_updated_utc
    ))

    # Add the bound statement to the batch
    batch.add(bound_statement)

    # Execute the batch if the batch size is reached
    if (i + 1) % batch_size == 0:
        session.execute(batch)
        total_inserted += batch_size
        batch.clear()  # Clear the batch after executing

# Execute the remaining batch if there are any leftover rows
if len(batch) > 0:
    session.execute(batch)
    total_inserted += len(batch)

# Print the total number of rows inserted
print(f"Total rows successfully inserted into bronze_stock_data: {total_inserted}")


Total rows successfully inserted into bronze_stock_data: 100000


In [32]:
# Drop the existing bronze_stock_data table if it exists
drop_query = "DROP TABLE IF EXISTS silver_stock_data"
session.execute(drop_query)

print("Existing 'silver_stock_data' table dropped successfully.")


Existing 'silver_stock_data' table dropped successfully.


In [33]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider



# Create silver table without the last_updated_utc column
create_table_query = """
CREATE TABLE IF NOT EXISTS silver_stock_data (
    unique_id UUID PRIMARY KEY,
    active BOOLEAN,
    base_currency_name TEXT,
    base_currency_symbol TEXT,
    currency_name TEXT,
    currency_symbol TEXT,
    locale TEXT,
    market TEXT,
    name TEXT,
    ticker TEXT
);
"""

# Execute the query to create the table
session.execute(create_table_query)

print("Silver table created successfully without the timestamp column.")




Silver table created successfully without the timestamp column.


In [35]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
from cassandra.query import BatchStatement


# Query to get all the data from the bronze table
select_query = "SELECT unique_id, active, base_currency_name, base_currency_symbol, currency_name, currency_symbol, locale, market, name, ticker FROM bronze_stock_data;"

# Execute the query to fetch the data from the bronze table
rows = session.execute(select_query)

# Prepare the insert query for the silver table
insert_query = """
INSERT INTO silver_stock_data (unique_id, active, base_currency_name, base_currency_symbol, currency_name, currency_symbol, locale, market, name, ticker)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s);
"""

# Batch size limit
batch_size_limit = 100  # You can adjust this number based on your system and requirements

# Create a BatchStatement
batch = BatchStatement()
batch_count = 0

# Iterate over the rows from the bronze table and add them to the batch
for row in rows:
    batch.add(insert_query, (
        row.unique_id,
        row.active,
        row.base_currency_name,
        row.base_currency_symbol,
        row.currency_name,
        row.currency_symbol,
        row.locale,
        row.market,
        row.name,
        row.ticker
    ))
    batch_count += 1

    # If the batch reaches the limit, execute it and reset the batch
    if batch_count >= batch_size_limit:
        session.execute(batch)
        batch = BatchStatement()  # Reset the batch
        batch_count = 0  # Reset batch count

# Execute any remaining rows in the last batch
if batch_count > 0:
    session.execute(batch)

print("Data successfully inserted into the silver table.")




Data successfully inserted into the silver table.


In [43]:
from cassandra.cluster import Cluster
from collections import defaultdict


# Create the currency_pairs_summary table
create_table_query = """
CREATE TABLE IF NOT EXISTS gold1_currency_pairs_summary (
    base_currency_name TEXT,
    currency_name TEXT,
    market TEXT,
    total_pairs INT,
    PRIMARY KEY (base_currency_name, currency_name, market)
);
"""
session.execute(create_table_query)

# Step 1: Get all data without aggregation
select_query = """
SELECT base_currency_name, currency_name, market
FROM silver_stock_data;
"""

rows = session.execute(select_query)

# Step 2: Manually aggregate the counts in Python
count_data = defaultdict(int)

for row in rows:
    key = (row.base_currency_name, row.currency_name, row.market)
    count_data[key] += 1

# Step 3: Insert the aggregated data into the currency_pairs_summary table
insert_query = """
INSERT INTO gold1_currency_pairs_summary (base_currency_name, currency_name, market, total_pairs)
VALUES (%s, %s, %s, %s);
"""

# Insert each summarized row into the table
for key, total_pairs in count_data.items():
    base_currency_name, currency_name, market = key
    session.execute(insert_query, (base_currency_name, currency_name, market, total_pairs))

print("Data aggregation and insertion completed.")


Data aggregation and insertion completed.


In [49]:
from cassandra.cluster import Cluster
from cassandra import ConsistencyLevel
from cassandra.query import BatchStatement

# Create the active_currency_details table
create_table_query = """
CREATE TABLE IF NOT EXISTS gold2_active_currency_details (
    unique_id UUID,
    base_currency_name TEXT,
    base_currency_symbol TEXT,
    currency_name TEXT,
    currency_symbol TEXT,
    market TEXT,
    name TEXT,
    ticker TEXT,
    PRIMARY KEY (market, currency_name)
);
"""
session.execute(create_table_query)

# Step 1: Fetch active currency pairs from silver_stock_data
select_query = """
SELECT unique_id, base_currency_name, base_currency_symbol, currency_name, 
       currency_symbol, market, name, ticker
FROM silver_stock_data
WHERE active = True ALLOW FILTERING;
"""

rows = session.execute(select_query)

# Step 2: Insert the fetched data into the gold2_active_currency_details table in batches
batch_size = 100  # Define batch size
insert_query = """
INSERT INTO gold2_active_currency_details (unique_id, base_currency_name, 
                                            base_currency_symbol, currency_name, 
                                            currency_symbol, market, name, ticker)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s);
"""

batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)

# Insert each row of active data into the batch
for index, row in enumerate(rows):
    batch.add(insert_query, (row.unique_id, row.base_currency_name, 
                             row.base_currency_symbol, row.currency_name, 
                             row.currency_symbol, row.market, 
                             row.name, row.ticker))

    # Execute batch after reaching the specified batch size
    if (index + 1) % batch_size == 0:
        session.execute(batch)
        batch.clear()  # Clear the batch for the next set of inserts

# Execute any remaining rows in the batch (if not empty)
if len(batch) > 0:
    session.execute(batch)

print("Data insertion completed.")


Data insertion completed.


In [48]:
# Drop the existing bronze_stock_data table if it exists
drop_query = "DROP TABLE IF EXISTS bgold2_active_currency_details"
session.execute(drop_query)

print("Existing 'gold2_active_currency_details' table dropped successfully.")


Existing 'gold2_active_currency_details' table dropped successfully.


In [50]:
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from cassandra import ConsistencyLevel

# Create the currency_pair_info table
create_table_query = """
CREATE TABLE IF NOT EXISTS gold3_currency_pair_info (
    base_currency_name TEXT,
    base_currency_symbol TEXT,
    currency_name TEXT,
    currency_symbol TEXT,
    pair_description TEXT,
    PRIMARY KEY (base_currency_name, currency_name)
);
"""
session.execute(create_table_query)

# Step 1: Fetch data from silver_stock_data table
select_query = """
SELECT base_currency_name, base_currency_symbol, currency_name, currency_symbol
FROM silver_stock_data;
"""

rows = session.execute(select_query)

# Step 2: Insert data into the currency_pair_info table using batch inserts
insert_query = """
INSERT INTO gold3_currency_pair_info (base_currency_name, base_currency_symbol, 
                                 currency_name, currency_symbol, pair_description)
VALUES (%s, %s, %s, %s, %s);
"""

batch_size = 100  # Define batch size
batch = BatchStatement(consistency_level=ConsistencyLevel.QUORUM)

# Insert each row of data into the batch
for index, row in enumerate(rows):
    pair_description = row.base_currency_name + ' - ' + row.currency_name
    batch.add(insert_query, (row.base_currency_name, row.base_currency_symbol, 
                             row.currency_name, row.currency_symbol, pair_description))

    # Execute batch after reaching the specified batch size
    if (index + 1) % batch_size == 0:
        session.execute(batch)
        batch.clear()  # Clear the batch for the next set of inserts

# Execute any remaining rows in the batch (if not empty)
if len(batch) > 0:
    session.execute(batch)

print("Data insertion completed.")


Data insertion completed.
