In [15]:
pip install confluent-kafka


Note: you may need to restart the kernel to use updated packages.


In [165]:
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider
import json

# Load secrets from the token file
with open("/Users/sudhir/Desktop/Fall2024/BIGDATA/Project/Sudhir-token (1).json") as f:
    secrets = json.load(f)

CLIENT_ID = secrets["clientId"]
CLIENT_SECRET = secrets["secret"]

cloud_config = {
    'secure_connect_bundle': 'secure-connect-sudhir (3).zip'}

auth_provider = PlainTextAuthProvider(CLIENT_ID, CLIENT_SECRET)

cluster = Cluster(cloud=cloud_config, auth_provider=auth_provider)
session = cluster.connect()

# Check connection
if session:
    print("Connection is active")
else:
    print("Connection is not active")


# Set the keyspace
session.set_keyspace('diabetes')

Connection is active


In [89]:
# Set the keyspace
session.set_keyspace('diabetes')

# Create the table with the corrected column name
create_table_query = """
    CREATE TABLE bronze_diabetes (
    id UUID PRIMARY KEY,
    year INT,
    gender TEXT,
    age INT,
    location TEXT,
    race_africanamerican INT,
    race_asian INT,
    race_caucasian INT,
    race_hispanic INT,
    race_other INT,
    hypertension INT,
    heart_disease INT,
    smoking_history TEXT,
    bmi FLOAT,
    hba1c_level FLOAT,
    blood_glucose_level INT,
    diabetes INT
);
"""

# Execute the CREATE TABLE query
session.execute(create_table_query)
print("Table 'bronze_diabetes' created successfully.")


Table 'bronze_diabetes' created successfully.


In [None]:
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from cassandra.query import SimpleStatement
import pandas as pd
import uuid

csv_file = '/Users/sudhir/Desktop/Fall2024/BIGDATA/Project/received_diabetes_data.csv'


# Read CSV into a DataFrame
df = pd.read_csv(csv_file)

# Prepare the CQL query for inserting data
insert_query = f"""
    INSERT INTO bronze_diabetes (
        id, year, gender, age, location, race_africanamerican, race_asian,
        race_caucasian, race_hispanic, race_other, hypertension, heart_disease,
        smoking_history, bmi, hba1c_level, blood_glucose_level, diabetes
    ) VALUES (
        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
    )
"""
prepared = session.prepare(insert_query)

# Batch insert data into the table
batch_size = 1000
batches = [df[i:i + batch_size] for i in range(0, len(df), batch_size)]

for batch in batches:
    batch_statement = BatchStatement()
    for _, row in batch.iterrows():
        batch_statement.add(prepared, (
            uuid.uuid4(),             # Generate a unique UUID
            int(row['year']),
            row['gender'],
            int(row['age']),
            row['location'],
            int(row['race_africanamerican']),
            int(row['race_asian']),
            int(row['race_caucasian']),
            int(row['race_hispanic']),
            int(row['race_other']),
            int(row['hypertension']),
            int(row['heart_disease']),
            row['smoking_history'],
            float(row['bmi']),
            float(row['hba1c_level']),
            int(row['blood_glucose_level']),
            int(row['diabetes'])
        ))
    session.execute(batch_statement)
    print(f"Inserted {len(batch)} records.")

print("Data insertion completed.")


Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 1000 records.
Inserted 10

In [None]:
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
from cassandra.query import SimpleStatement
import pandas as pd
import uuid

# Query data from the bronze table and load it into a DataFrame
query_bronze_data = "SELECT * FROM bronze_diabetes;"
rows = session.execute(query_bronze_data)
bronze_df = pd.DataFrame(rows)

# Clean the data
bronze_df = bronze_df.drop_duplicates()  # Drop duplicate rows
bronze_df = bronze_df.dropna()           # Drop rows with null values

# Drop the columns 'race_other' and 'smoking_history'
cleaned_df = bronze_df.drop(columns=['race_other', 'smoking_history'])

# Create the silver table if it doesn't exist
create_silver_table_query = """
    CREATE TABLE IF NOT EXISTS silver_diabetes (
        id UUID PRIMARY KEY,
        year INT,
        gender TEXT,
        age INT,
        location TEXT,
        race_africanamerican INT,
        race_asian INT,
        race_caucasian INT,
        race_hispanic INT,
        hypertension INT,
        heart_disease INT,
        bmi FLOAT,
        hba1c_level FLOAT,
        blood_glucose_level INT,
        diabetes INT
    );
"""
session.execute(create_silver_table_query)
print("Table 'silver_diabetes' created successfully.")

# Prepare the CQL query for batch insertion
insert_query = """
    INSERT INTO silver_diabetes (
        id, year, gender, age, location, race_africanamerican, race_asian,
        race_caucasian, race_hispanic, hypertension, heart_disease,
        bmi, hba1c_level, blood_glucose_level, diabetes
    ) VALUES (
        ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?
    )
"""
prepared = session.prepare(insert_query)

# Define batch size
batch_size = 1000

# Insert data in batches using BatchStatement
for i in range(0, len(cleaned_df), batch_size):
    batch = cleaned_df.iloc[i:i + batch_size]
    batch_statement = BatchStatement()
    for _, row in batch.iterrows():
        batch_statement.add(prepared, (
            row['id'],               # Use the existing UUID from bronze
            int(row['year']),
            row['gender'],
            int(row['age']),
            row['location'],
            int(row['race_africanamerican']),
            int(row['race_asian']),
            int(row['race_caucasian']),
            int(row['race_hispanic']),
            int(row['hypertension']),
            int(row['heart_disease']),
            float(row['bmi']),
            float(row['hba1c_level']),
            int(row['blood_glucose_level']),
            int(row['diabetes'])
        ))
    session.execute(batch_statement)
    print(f"Inserted {len(batch)} records into silver_diabetes.")

print("Data inserted into 'silver_diabetes' and connection closed.")


Table 'silver_diabetes' created successfully.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into silver_diabetes.
Inserted 1000 records into sil

In [None]:
from cassandra.cluster import Cluster
from cassandra.query import BatchStatement
import pandas as pd

session.set_keyspace('diabetes')

# Query data from the silver table and load it into a DataFrame
query_silver_data = "SELECT * FROM silver_diabetes;"
rows = session.execute(query_silver_data)

# Convert the query result to a DataFrame
silver_df = pd.DataFrame(rows)

# Clean the data: Drop duplicate rows and rows with null values
silver_df = silver_df.drop_duplicates()
silver_df = silver_df.dropna()

print("Data cleaned. Dropped duplicates and null values.")


Data cleaned. Dropped duplicates and null values.


In [166]:
# Recreate table Golden_Layer1 with the new schema
create_gold_table_1_query = """
    CREATE TABLE IF NOT EXISTS Golden_Layer1 (
        year INT,
        gender TEXT,
        blood_glucose_level INT,
        PRIMARY KEY (year, gender)
    );
"""
session.execute(create_gold_table_1_query)
print("Table 'Golden_Layer1' created successfully.")

# Recreate table Golden_Layer2 with the new schema
create_gold_table_2_query = """
    CREATE TABLE IF NOT EXISTS Golden_Layer2 (
        age INT,
        gender TEXT,
        diabetes TEXT,
        PRIMARY KEY (age, gender)
    );
"""
session.execute(create_gold_table_2_query)
print("Table 'Golden_Layer2' created successfully.")

# Recreate table Golden_Layer3 with the new schema
create_gold_table_3_query = """
    CREATE TABLE IF NOT EXISTS Golden_Layer3 (
        location TEXT,
        bmi FLOAT,
        heart_disease INT,
        PRIMARY KEY (location)
    );
"""
session.execute(create_gold_table_3_query)
print("Table 'Golden_Layer3' created successfully.")


Table 'Golden_Layer1' created successfully.
Table 'Golden_Layer2' created successfully.
Table 'Golden_Layer3' created successfully.


In [None]:

session.execute("DROP TABLE IF EXISTS Golden_Layer1;")

create_gold_table_1_query = """
    CREATE TABLE IF NOT EXISTS Golden_Layer1 (
        year INT,
        age INT,
        location TEXT,
        diabetes TEXT,
        PRIMARY KEY (year, age, location)
    );
"""
session.execute(create_gold_table_1_query)
print("Table 'Golden_Layer1' created successfully.")

session.execute("DROP TABLE IF EXISTS Golden_Layer2;")


create_gold_table_2_query = """
    CREATE TABLE IF NOT EXISTS Golden_Layer2 (
        gender TEXT,
        age INT,
        heart_disease INT,
        bmi FLOAT,
        PRIMARY KEY (gender, age)
    );
"""
session.execute(create_gold_table_2_query)
print("Table 'Golden_Layer2' created successfully.")

session.execute("DROP TABLE IF EXISTS Golden_Layer3;")

create_gold_table_3_query = """
    CREATE TABLE IF NOT EXISTS Golden_Layer3 (
        gender TEXT,
        age INT,
        location TEXT,
        blood_glucose_level INT,
        PRIMARY KEY (gender, age, location)
    );
"""
session.execute(create_gold_table_3_query)
print("Table 'Golden_Layer3' created successfully.")



Table 'Golden_Layer1' created successfully.
Table 'Golden_Layer2' created successfully.
Table 'Golden_Layer3' created successfully.


In [None]:
from cassandra.query import BatchStatement
import pandas as pd

# Prepare the CQL queries for batch insertion into Gold tables
insert_gold_1_query = """
    INSERT INTO Golden_Layer1 (year, gender, blood_glucose_level) VALUES (?, ?, ?)
"""
insert_gold_2_query = """
    INSERT INTO Golden_Layer2 (age, gender, diabetes) VALUES (?, ?, ?)
"""
insert_gold_3_query = """
    INSERT INTO Golden_Layer3 (location, bmi, heart_disease) VALUES (?, ?, ?)
"""

# Prepare the queries
prepared_gold_1 = session.prepare(insert_gold_1_query)
prepared_gold_2 = session.prepare(insert_gold_2_query)
prepared_gold_3 = session.prepare(insert_gold_3_query)

# Define the batch size for inserting data
batch_size = 500  

# Function to insert data into each gold table in smaller batches
def insert_data_in_batches(df, batch_size, prepared_query, table_type):
    for i in range(0, len(df), batch_size):
        batch = df.iloc[i:i + batch_size]
        batch_statement = BatchStatement()
        for _, row in batch.iterrows():
            
            if table_type == 'gold_1': 
                batch_statement.add(prepared_query, (
                    int(row['year']) if pd.notna(row['year']) else 0,
                    str(row['gender']) if pd.notna(row['gender']) else '',
                    int(row['blood_glucose_level']) if pd.notna(row['blood_glucose_level']) else 0
                ))
            elif table_type == 'gold_2':  
                batch_statement.add(prepared_query, (
                    int(row['age']) if pd.notna(row['age']) else 0,
                    str(row['gender']) if pd.notna(row['gender']) else '',
                    str(row['diabetes']) if pd.notna(row['diabetes']) else ''
                ))
            elif table_type == 'gold_3': 
                batch_statement.add(prepared_query, (
                    str(row['location']) if pd.notna(row['location']) else '',
                    float(row['bmi']) if pd.notna(row['bmi']) else 0.0,
                    int(row['heart_disease']) if pd.notna(row['heart_disease']) else 0
                ))
        try:
            session.execute(batch_statement)
            print(f"Inserted {len(batch)} records into {table_type}.")
        except Exception as e:
            print(f"Error executing batch for {table_type}: {e}")

# Assuming silver_df is your DataFrame containing the data

# Insert data into Gold Table 1 (year, gender, blood_glucose_level)
insert_data_in_batches(silver_df[['year', 'gender', 'blood_glucose_level']], batch_size, prepared_gold_1, 'gold_1')

# Insert data into Gold Table 2 (age, gender, diabetes)
insert_data_in_batches(silver_df[['age', 'gender', 'diabetes']], batch_size, prepared_gold_2, 'gold_2')

# Insert data into Gold Table 3 (location, bmi, heart_disease)
insert_data_in_batches(silver_df[['location', 'bmi', 'heart_disease']], batch_size, prepared_gold_3, 'gold_3')

print("Data inserted into gold tables.")


Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 records into gold_1.
Inserted 500 r