In [None]:
from pymongo.mongo_client import MongoClient

uri = ""

# Create a new client and connect to the server
client = MongoClient(uri)

# Send a ping to confirm a successful connection
try:
    client.admin.command('ping')
    print("Pinged your deployment. You successfully connected to MongoDB!")

    db = client.sample_training # Replace 'mydatabase' with your actual database name

# Get the list of collection names
    collection_names = db['grades'].count_documents({})

    print(f"Total documents in the collection: {collection_names}")


# Close the connection (optional, but good practice)
    client.close()
except Exception as e:
    print(e)

In [None]:
import snowflake.connector

# Snowflake connection parameters
# Replace with your actual credentials and details
SNOWFLAKE_USER = ""
SNOWFLAKE_PASSWORD = ""
SNOWFLAKE_ACCOUNT = ""  
SNOWFLAKE_WAREHOUSE = ""
SNOWFLAKE_DATABASE = ""
SNOWFLAKE_SCHEMA = ""

try:
    # Establish the connection
    conn = snowflake.connector.connect(
        user=SNOWFLAKE_USER,
        password=SNOWFLAKE_PASSWORD,
        account=SNOWFLAKE_ACCOUNT,
        warehouse=SNOWFLAKE_WAREHOUSE,
        database=SNOWFLAKE_DATABASE,
        schema=SNOWFLAKE_SCHEMA
    )

    # Create a cursor object to execute SQL statements
    cursor = conn.cursor()

    # Example: Execute a simple query
    cursor.execute("SELECT CURRENT_VERSION()")
    one_row = cursor.fetchone()
    print(f"Snowflake Version: {one_row[0]}")

    # Close the cursor and connection
    cursor.close()
    conn.close()
    print("Successfully connected to Snowflake and retrieved version.")

except snowflake.connector.errors.ProgrammingError as e:
    print(f"Error connecting to Snowflake: {e}")
except Exception as e:
    print(f"An unexpected error occurred: {e}")

In [None]:
import pymongo
import snowflake.connector
import json

# --- MongoDB Configuration ---

MONGO_URI = ""
MONGO_DB_NAME = "sample_training"
MONGO_COLLECTION_NAME = "grades"

# --- Snowflake Configuration ---
SNOWFLAKE_USER = ""
SNOWFLAKE_PASSWORD = ""
SNOWFLAKE_ACCOUNT = ""  
SNOWFLAKE_WAREHOUSE = ""
SNOWFLAKE_DATABASE = ""
SNOWFLAKE_SCHEMA = ""
SNOWFLAKE_TABLE = ""
SNOWFLAKE_PSA_TABLE = ""

SNOWFLAKE_PSA_SCHEMA = ""


def load_mongo_to_snowflake():
    try:
        # --- Connect to MongoDB ---
        mongo_client = pymongo.MongoClient(MONGO_URI)
        mongo_db = mongo_client[MONGO_DB_NAME]
        mongo_collection = mongo_db[MONGO_COLLECTION_NAME]
        print("✅ Connected to MongoDB")

        # Fetch data from MongoDB
        mongo_data = list(mongo_collection.find({}))
        mongo_count = len(mongo_data)
        
        # Convert ObjectId to string for Snowflake compatibility
        for doc in mongo_data:
            if '_id' in doc:
                doc['_id'] = str(doc['_id'])

        # --- Connect to Snowflake ---
        conn = snowflake.connector.connect(
            user=SNOWFLAKE_USER,
            password=SNOWFLAKE_PASSWORD,
            account=SNOWFLAKE_ACCOUNT,
            warehouse=SNOWFLAKE_WAREHOUSE,
            database=SNOWFLAKE_DATABASE,
            schema=SNOWFLAKE_SCHEMA
        )
        cursor = conn.cursor()
        print("✅ Connected to Snowflake")

        # Insert into staging table
        rows_to_insert = [(json.dumps(doc),) for doc in mongo_data]

        cursor.executemany(f"""
            INSERT INTO {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE} (RAW_DATA)
            VALUES (%s)
        """, rows_to_insert)

        # Count rows in staging
        cursor.execute(f"SELECT COUNT(*) FROM {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE}")
        stg_count = cursor.fetchone()[0]

        print(f"📊 MongoDB collection count = {mongo_count}, Snowflake Staging count = {stg_count}")

        if mongo_count == stg_count:
            print("✅ Counts match — loading into PSA.STUDENT_GRADES...")
            cursor.execute(f"""
                INSERT INTO PSA.STUDENT_GRADES (ID, STUDENT_ID, CLASS_ID, TYPE, SCORE)
                SELECT
                    RAW_DATA:_id::STRING        AS ID,
                    RAW_DATA:student_id::FLOAT  AS STUDENT_ID,
                    RAW_DATA:class_id::FLOAT    AS CLASS_ID,
                    s.value:type::STRING        AS TYPE,
                    s.value:score::FLOAT        AS SCORE
                FROM (
                    SELECT PARSE_JSON(RAW_DATA) AS RAW_DATA
                    FROM {SNOWFLAKE_SCHEMA}.{SNOWFLAKE_TABLE}
                ),
                LATERAL FLATTEN(input => RAW_DATA:scores) s;
            """)
            conn.commit()
            print(f"🎉 Successfully loaded {stg_count} records into PSA.STUDENT_GRADES.")
        else:
            print("⚠️ Counts do not match! Skipping PSA load.")

    except Exception as e:
        print(f"❌ An error occurred: {e}")
    finally:
        if 'mongo_client' in locals() and mongo_client:
            mongo_client.close()
            print("🔒 MongoDB connection closed.")
        if 'conn' in locals() and conn:
            conn.close()
            print("🔒 Snowflake connection closed.")

if __name__ == "__main__":
    load_mongo_to_snowflake()