In [13]:
# ------------------
# Common Imports
# ------------------
import pandas as pd
import os
from dotenv import load_dotenv
import json # For pretty printing MongoDB documents

# ------------------
# MongoDB Imports
# ------------------
from pymongo import MongoClient, ASCENDING, DESCENDING
from pymongo.errors import ConnectionFailure, OperationFailure

# ------------------
# Cassandra Imports
# ------------------
from cassandra.cluster import Cluster
from cassandra.auth import PlainTextAuthProvider # If you have auth for Cassandra
from cassandra.query import SimpleStatement
from cassandra.util import Date
from datetime import datetime
import uuid # For Cassandra UUID type

In [14]:
# Load environment variables from .env file
load_dotenv()

# Configuration
MONGODB_CONNECTION_STRING = os.getenv("CONNECTION_STRING")
MONGODB_DATABASE_NAME = "grocery_store_db"
CASSANDRA_KEYSPACE = "day_grocery"
CASSANDRA_CONTACT_POINTS = ["127.0.0.1"]
CASSANDRA_PORT = 9042

EXCEL_FILE_PATH = "2_synthetic_grocery_data.xlsx"

print("Imports and configuration loaded.")
if MONGODB_CONNECTION_STRING:
    print("MongoDB Connection String loaded from .env")
else:
    print("ERROR: MongoDB Connection String not found in .env file!")

Imports and configuration loaded.
MongoDB Connection String loaded from .env


In [15]:
# Load data from the Excel file generated previously

try:
    df_cabang = pd.read_excel(EXCEL_FILE_PATH, sheet_name="Cabang")
    df_karyawan = pd.read_excel(EXCEL_FILE_PATH, sheet_name="Karyawan")
    df_transaksi_harian = pd.read_excel(EXCEL_FILE_PATH, sheet_name="Transaksi_Harian")
    
    print("--- Cabang Data Sample ---")
    print(df_cabang.head(2))
    print(f"\nLoaded {len(df_cabang)} cabang records.")
    
    print("\n--- Karyawan Data Sample ---")
    print(df_karyawan.head(2))
    print(f"\nLoaded {len(df_karyawan)} karyawan records.")
    
    print("\n--- Transaksi Harian Data Sample ---")
    print(df_transaksi_harian.head(2))
    # Ensure 'tanggal' is parsed as datetime object for easier conversion later
    df_transaksi_harian['tanggal'] = pd.to_datetime(df_transaksi_harian['tanggal'])
    print(f"\nLoaded {len(df_transaksi_harian)} transaksi harian records.")
    print(f"Data types for transaksi_harian:\n{df_transaksi_harian.dtypes}")

except FileNotFoundError:
    print(f"ERROR: Excel file '{EXCEL_FILE_PATH}' not found. Please generate it first.")
    # Initialize empty dataframes to prevent errors in later cells if file not found
    df_cabang = pd.DataFrame()
    df_karyawan = pd.DataFrame()
    df_transaksi_harian = pd.DataFrame()
except Exception as e:
    print(f"Error loading data from Excel: {e}")
    df_cabang = pd.DataFrame()
    df_karyawan = pd.DataFrame()
    df_transaksi_harian = pd.DataFrame()

--- Cabang Data Sample ---
  id_cabang              nama_cabang  \
0     CB001  Cabang Banda Aceh Utama   
1     CB002      Cabang Padang Utama   

                                              lokasi    kontak_cabang  
0    Gg. Antapani Lama No. 138, Bengkulu, Jawa Barat  +62-54-779-4795  
1  Gang Moch. Toha No. 7, Prabumulih, Sumatera Utara   (010) 934-1587  

Loaded 1000 cabang records.

--- Karyawan Data Sample ---
  id_karyawan          nama_karyawan          jabatan id_cabang
0      KR0001     Sutan Nalar Wijaya  Asisten Manajer     CB495
1      KR0002  T. Perkasa Mangunsong     Staff Gudang     CB310

Loaded 6000 karyawan records.

--- Transaksi Harian Data Sample ---
                    id_transaksi_harian id_transaksi id_cabang id_karyawan  \
0  3627dbf5-fcdf-4168-bb86-5c654cdb048b     TRX14804     CB621      KR2847   
1  7ce15964-2019-4eb2-b00e-3f686cd1a353     TRX27006     CB848      KR1226   

      tanggal      nama_barang  qty  harga_barang  total_transaksi  
0  2024-05-1

# MongoDB Data Ingestion

In [16]:
# ------------------
# MongoDB Operations
# ------------------

mongo_client = None
db = None

if MONGODB_CONNECTION_STRING:
    try:
        print(f"\nConnecting to MongoDB using: {MONGODB_CONNECTION_STRING[:30]}...{MONGODB_CONNECTION_STRING[-20:]}")
        mongo_client = MongoClient(MONGODB_CONNECTION_STRING)
        mongo_client.admin.command('ismaster') 
        print("Successfully connected to MongoDB!")
        
        db = mongo_client[MONGODB_DATABASE_NAME]
        print(f"Selected MongoDB database: '{MONGODB_DATABASE_NAME}'")
        
    except ConnectionFailure as e:
        print(f"MongoDB Connection Failed: {e}")
        mongo_client = None
        db = None
    except Exception as e:
        print(f"An unexpected error occurred with MongoDB connection: {e}")
        mongo_client = None
        db = None
else:
    print("MongoDB connection string not available. Skipping MongoDB operations.")


Connecting to MongoDB using: mongodb+srv://ilokuda:ilokudat...ter-experiment-yaffa
Successfully connected to MongoDB!
Selected MongoDB database: 'grocery_store_db'


In [8]:
# Check if db object is not None, instead of just 'if db'
if db is not None and not df_cabang.empty:
    # --- 1. Create 'cabang' collection (id_cabang as _id, no other explicit indexes) ---
    try:
        cabang_collection_plain = db["cabang"]
        if "cabang" in db.list_collection_names():
            print("Dropping existing 'cabang' collection...")
            cabang_collection_plain.drop()
        
        cabang_data_mongo = df_cabang.to_dict(orient="records")
        for record in cabang_data_mongo: # Use id_cabang as _id
            record["_id"] = record["id_cabang"]
            
        result_plain = cabang_collection_plain.insert_many(cabang_data_mongo)
        print(f"Successfully inserted {len(result_plain.inserted_ids)} documents into 'cabang'.")
    except Exception as e:
        print(f"An error occurred during 'cabang' ingestion: {e}")

    # --- 2. Create 'indexed_cabang' collection (id_cabang as _id, index on 'lokasi') ---
    try:
        cabang_collection_indexed = db["indexed_cabang"]
        if "indexed_cabang" in db.list_collection_names():
            print("Dropping existing 'indexed_cabang' collection...")
            cabang_collection_indexed.drop()
            
        result_indexed = cabang_collection_indexed.insert_many(cabang_data_mongo) 
        print(f"Successfully inserted {len(result_indexed.inserted_ids)} documents into 'indexed_cabang'.")
        
        print("Creating index on 'lokasi' for 'indexed_cabang'...")
        cabang_collection_indexed.create_index([("lokasi", ASCENDING)], name="lokasi_index")
        print("Index 'lokasi_index' created on 'indexed_cabang'.")

        print("\nIndexes for 'indexed_cabang':")
        for index in cabang_collection_indexed.list_indexes():
            print(index)
            
    except Exception as e:
        print(f"An error occurred during 'indexed_cabang' ingestion/indexing: {e}")
elif df_cabang.empty:
    print("Cabang DataFrame is empty. Skipping MongoDB ingestion for Cabang.")
else: # This means db is None
    print("MongoDB connection not established (db is None). Skipping Cabang ingestion.")

Successfully inserted 1000 documents into 'cabang'.
Successfully inserted 1000 documents into 'indexed_cabang'.
Creating index on 'lokasi' for 'indexed_cabang'...
Index 'lokasi_index' created on 'indexed_cabang'.

Indexes for 'indexed_cabang':
SON([('v', 2), ('key', SON([('_id', 1)])), ('name', '_id_')])
SON([('v', 2), ('key', SON([('lokasi', 1)])), ('name', 'lokasi_index')])


In [None]:
if db is not None and not df_karyawan.empty:
    # --- 1. Create 'karyawan' collection (id_karyawan as _id, no other explicit indexes) ---
    try:
        karyawan_collection_plain = db["karyawan"]
        if "karyawan" in db.list_collection_names():
            print("Dropping existing 'karyawan' collection...")
            karyawan_collection_plain.drop()

        karyawan_data_mongo = df_karyawan.to_dict(orient="records")
        for record in karyawan_data_mongo: # Use id_karyawan as _id
            record["_id"] = record["id_karyawan"]
            
        result_plain = karyawan_collection_plain.insert_many(karyawan_data_mongo)
        print(f"Successfully inserted {len(result_plain.inserted_ids)} documents into 'karyawan'.")
    except Exception as e:
        print(f"An error occurred during 'karyawan' ingestion: {e}")

    # --- 2. Create 'indexed_karyawan' collection (id_karyawan as _id, compound index) ---
    try:
        karyawan_collection_indexed = db["indexed_karyawan"]
        if "indexed_karyawan" in db.list_collection_names():
            print("Dropping existing 'indexed_karyawan' collection...")
            karyawan_collection_indexed.drop()
        
        # Data is the same (karyawan_data_mongo already has _id set)
        result_indexed = karyawan_collection_indexed.insert_many(karyawan_data_mongo)
        print(f"Successfully inserted {len(result_indexed.inserted_ids)} documents into 'indexed_karyawan'.")
        
        # Create compound index on (nama_karyawan, jabatan)
        print("Creating compound index on ('nama_karyawan', 'jabatan') for 'indexed_karyawan'...")
        karyawan_collection_indexed.create_index(
            [("nama_karyawan", ASCENDING), ("jabatan", ASCENDING)], 
            name="nama_jabatan_compound_index"
        )
        print("Index 'nama_jabatan_compound_index' created on 'indexed_karyawan'.")

        # Verify by listing indexes
        print("\nIndexes for 'indexed_karyawan':")
        for index in karyawan_collection_indexed.list_indexes():
            print(index)
            
    except Exception as e:
        print(f"An error occurred during 'indexed_karyawan' ingestion/indexing: {e}")
elif df_karyawan.empty:
    print("Karyawan DataFrame is empty. Skipping MongoDB ingestion for Karyawan.")
else:
    print("MongoDB connection not established. Skipping Karyawan ingestion.")

NotImplementedError: Database objects do not implement truth value testing or bool(). Please compare with None instead: database is not None

# Cassandra Data Ingestion

In [9]:
# --------------------
# Cassandra Operations
# --------------------
cassandra_cluster = None
cassandra_session = None

try:
    print(f"\nConnecting to Cassandra cluster at {CASSANDRA_CONTACT_POINTS}:{CASSANDRA_PORT}...")
    cassandra_cluster = Cluster(contact_points=CASSANDRA_CONTACT_POINTS, port=CASSANDRA_PORT)
    temp_session = cassandra_cluster.connect()
    
    print(f"Creating keyspace '{CASSANDRA_KEYSPACE}' if it doesn't exist...")
    temp_session.execute(f"""
        CREATE KEYSPACE IF NOT EXISTS {CASSANDRA_KEYSPACE}
        WITH replication = {{ 'class': 'SimpleStrategy', 'replication_factor': '1' }}
    """)
    print(f"Keyspace '{CASSANDRA_KEYSPACE}' ensured.")
    temp_session.shutdown()

    print(f"Connecting to keyspace '{CASSANDRA_KEYSPACE}'...")
    cassandra_session = cassandra_cluster.connect(CASSANDRA_KEYSPACE)
    print("Successfully connected to Cassandra and keyspace.")
    
except Exception as e:
    print(f"Cassandra connection or keyspace creation failed: {e}")
    if cassandra_cluster:
        cassandra_cluster.shutdown()


Connecting to Cassandra cluster at ['127.0.0.1']:9042...
Creating keyspace 'day_grocery' if it doesn't exist...
Keyspace 'day_grocery' ensured.
Connecting to keyspace 'day_grocery'...
Successfully connected to Cassandra and keyspace.


In [10]:
if cassandra_session:
    # --- 1. Create 'transaksi_harian' table ---
    try:
        table_name_plain = "transaksi_harian"
        print(f"Creating table '{table_name_plain}' in keyspace '{CASSANDRA_KEYSPACE}'...")
        
        # cassandra_session.execute(f"DROP TABLE IF EXISTS {table_name_plain}")

        create_table_plain_query = f"""
        CREATE TABLE IF NOT EXISTS {table_name_plain} (
            id_transaksi_harian UUID PRIMARY KEY,
            id_transaksi TEXT,
            id_cabang TEXT,
            id_karyawan TEXT,
            tanggal DATE,
            nama_barang TEXT,
            qty INT,
            harga_barang INT,
            total_transaksi INT
        );
        """
        cassandra_session.execute(create_table_plain_query)
        print(f"Table '{table_name_plain}' created successfully or already exists.")
        
    except Exception as e:
        print(f"Error creating Cassandra table '{table_name_plain}': {e}")

    # --- 2. Create 'indexed_transaksi_harian' table ---
    # Primary Key: Partition by (id_cabang, id_karyawan, nama_barang), Clustered by tanggal for ordering
    # id_transaksi_harian is added to clustering key to ensure uniqueness if other components are not unique per row
    try:
        table_name_indexed = "indexed_transaksi_harian"
        print(f"Creating table '{table_name_indexed}' in keyspace '{CASSANDRA_KEYSPACE}'...")

        # cassandra_session.execute(f"DROP TABLE IF EXISTS {table_name_indexed}")
        
        create_table_indexed_query = f"""
        CREATE TABLE IF NOT EXISTS {table_name_indexed} (
            id_cabang TEXT,
            id_karyawan TEXT,
            nama_barang TEXT,
            tanggal DATE,
            id_transaksi_harian UUID, 
            id_transaksi TEXT,
            qty INT,
            harga_barang INT,
            total_transaksi INT,
            PRIMARY KEY ((id_cabang, id_karyawan, nama_barang), tanggal, id_transaksi_harian)
        ) WITH CLUSTERING ORDER BY (tanggal ASC, id_transaksi_harian ASC);
        """
        # Note: Partition key fields (id_cabang, id_karyawan, nama_barang) must be listed first.
        # Other fields become regular columns.
        cassandra_session.execute(create_table_indexed_query)
        print(f"Table '{table_name_indexed}' created successfully or already exists.")
        
    except Exception as e:
        print(f"Error creating Cassandra table '{table_name_indexed}': {e}")
else:
    print("Cassandra session not established. Skipping table creation.")

Creating table 'transaksi_harian' in keyspace 'day_grocery'...
Table 'transaksi_harian' created successfully or already exists.
Creating table 'indexed_transaksi_harian' in keyspace 'day_grocery'...
Table 'indexed_transaksi_harian' created successfully or already exists.


### Ingestion

In [11]:
# --- Configuration for this cell ---
# Set the maximum number of records you want to ingest into Cassandra.
# For example, to insert only 3000 rows:
MAX_CASSANDRA_RECORDS_TO_INGEST = 3000
# To ingest all records, you can set it to a very large number or len(df_transaksi_harian)

# Ensure the check uses 'is not None' as previously discussed
if cassandra_session is not None and not df_transaksi_harian.empty:
    table_plain = "transaksi_harian"
    table_indexed = "indexed_transaksi_harian"
    
    # Determine the actual number of records to ingest and create a subset DataFrame
    num_total_records = len(df_transaksi_harian)
    
    if num_total_records == 0:
        print("Transaksi Harian DataFrame is empty. Nothing to ingest.")
        # Exit this block if df_transaksi_harian became empty after the initial check somehow
        # This part of the conditional logic might be redundant given the outer check, but safe
    else:
        # Create a subset if the total records exceed the desired limit
        if num_total_records > MAX_CASSANDRA_RECORDS_TO_INGEST:
            df_ingest_subset = df_transaksi_harian.head(MAX_CASSANDRA_RECORDS_TO_INGEST)
            actual_records_to_ingest = MAX_CASSANDRA_RECORDS_TO_INGEST
            print(f"\nPreparing to insert {actual_records_to_ingest} records (limited from {num_total_records} total) into Cassandra tables '{table_plain}' and '{table_indexed}'...")
        else:
            df_ingest_subset = df_transaksi_harian # Ingest all if total is less than or equal to limit
            actual_records_to_ingest = num_total_records
            print(f"\nPreparing to insert all {actual_records_to_ingest} records into Cassandra tables '{table_plain}' and '{table_indexed}'...")

        insert_cql_plain = f"""
        INSERT INTO {table_plain} (id_transaksi_harian, id_transaksi, id_cabang, id_karyawan, tanggal, 
            nama_barang, qty, harga_barang, total_transaksi) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """
        prepared_plain = cassandra_session.prepare(insert_cql_plain)
        
        insert_cql_indexed = f"""
        INSERT INTO {table_indexed} (id_cabang, id_karyawan, nama_barang, tanggal, id_transaksi_harian, 
            id_transaksi, qty, harga_barang, total_transaksi) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?)
        """
        prepared_indexed = cassandra_session.prepare(insert_cql_indexed)
        
        inserted_count = 0
        failed_count = 0
        
        # Iterate over the subset of the DataFrame
        for index, row in df_ingest_subset.iterrows():
            try:
                id_harian_uuid = uuid.UUID(str(row['id_transaksi_harian']))
                tanggal_date_obj = row['tanggal'].date() # Already datetime object from Cell 2

                # Insert into plain table
                cassandra_session.execute(prepared_plain, (
                    id_harian_uuid, row['id_transaksi'], row['id_cabang'], row['id_karyawan'],
                    tanggal_date_obj, row['nama_barang'], int(row['qty']), 
                    int(row['harga_barang']), int(row['total_transaksi'])
                ))
                
                # Insert into indexed table
                cassandra_session.execute(prepared_indexed, (
                    row['id_cabang'], row['id_karyawan'], row['nama_barang'], tanggal_date_obj, 
                    id_harian_uuid, row['id_transaksi'], int(row['qty']), 
                    int(row['harga_barang']), int(row['total_transaksi'])
                ))
                
                inserted_count += 1
                # Print progress every 1000 records or when the loop finishes for the subset
                if inserted_count % 1000 == 0 or inserted_count == actual_records_to_ingest:
                    print(f"Processed {inserted_count}/{actual_records_to_ingest} records for Cassandra...")
                    
            except Exception as e:
                print(f"Failed to insert row {index} (id_transaksi_harian: {row.get('id_transaksi_harian', 'N/A')}) into Cassandra: {row.to_dict()}")
                print(f"Error: {e}")
                failed_count += 1
                if failed_count > 20: # Stop if too many errors
                    print("Too many errors, stopping Cassandra ingestion.")
                    break

        print(f"\nCassandra Ingestion Summary:")
        print(f"Attempted to process: {actual_records_to_ingest} records.")
        print(f"Successfully inserted into both tables: {inserted_count} records.")
        print(f"Failed to insert: {failed_count} records.")

        if inserted_count > 0:
            print(f"\nSample data from Cassandra table '{table_plain}' (limit 1):")
            for r_plain in cassandra_session.execute(f"SELECT * FROM {table_plain} LIMIT 1"): print(r_plain)
            print(f"\nSample data from Cassandra table '{table_indexed}' (limit 1):")
            for r_indexed in cassandra_session.execute(f"SELECT * FROM {table_indexed} LIMIT 1"): print(r_indexed)
            
elif df_transaksi_harian.empty:
    print("Transaksi Harian DataFrame is empty. Skipping Cassandra ingestion.")
else: # cassandra_session is None
    print("Cassandra session not established. Skipping Transaksi_Harian ingestion.")


Preparing to insert 100000 records into Cassandra tables 'transaksi_harian' and 'indexed_transaksi_harian'...
Processed 1000/100000 records for Cassandra...
Processed 2000/100000 records for Cassandra...
Processed 3000/100000 records for Cassandra...
Processed 4000/100000 records for Cassandra...
Processed 5000/100000 records for Cassandra...
Processed 6000/100000 records for Cassandra...
Processed 7000/100000 records for Cassandra...
Processed 8000/100000 records for Cassandra...
Processed 9000/100000 records for Cassandra...
Processed 10000/100000 records for Cassandra...
Processed 11000/100000 records for Cassandra...
Processed 12000/100000 records for Cassandra...
Processed 13000/100000 records for Cassandra...
Processed 14000/100000 records for Cassandra...
Processed 15000/100000 records for Cassandra...
Processed 16000/100000 records for Cassandra...
Processed 17000/100000 records for Cassandra...
Processed 18000/100000 records for Cassandra...
Processed 19000/100000 records for

KeyboardInterrupt: 

# Close Connection

In [None]:
# Clean up connections
if mongo_client:
    print("\nClosing MongoDB connection...")
    mongo_client.close()
    print("MongoDB connection closed.")

if cassandra_cluster:
    print("\nClosing Cassandra connection...")
    cassandra_cluster.shutdown() 
    print("Cassandra connection closed.")

print("\nScript finished.")
# alhamdulillah