In [1]:
# Importing Modules

In [2]:
from dotenv import load_dotenv
import os
import pymongo
import snowflake.connector
import pandas as pd


In [3]:
# Load environment variables from .env file
load_dotenv()

True

In [4]:
mongo_connection_string = (
    f"mongodb+srv://{os.environ['MONGO_USERNAME']}:"
    f"{os.environ['MONGO_PASS']}@"
    f"test-cluster.wvpnfkn.mongodb.net/"
    +"?retryWrites=true&w=majority&appName=test-cluster"
)

In [5]:
try:
    # Connect to MongoDB Atlas
    mongo_client = pymongo.MongoClient(mongo_connection_string)
    mongo_db = mongo_client[os.environ['MONGO_DBNAME']]
    
    # Print connection success message
    print("Connected to MongoDB Atlas successfully!")

    # Now, you can perform further operations with mongo_client and mongo_db
except pymongo.errors.ConnectionFailure as e:
    # Print connection failure message
    print(f"Failed to connect to MongoDB Atlas: {e}")

Connected to MongoDB Atlas successfully!


In [6]:
try:
    # Connect to Snowflake using environment variables
    snowflake_conn = snowflake.connector.connect(
        user=os.environ["SNOWFLAKE_USER"],
        password=os.environ["SNOWFLAKE_PASSWORD"],
        account=os.environ["SNOWFLAKE_ACCOUNT"],
        warehouse=os.environ["SNOWFLAKE_WAREHOUSE"],
        database=os.environ["SNOWFLAKE_DATABASE"],
        schema=os.environ["SNOWFLAKE_SCHEMA"],
        role = os.environ["SNOWFLAKE_ROLE"]
    )

    # Print connection success message
    print("Connected to Snowflake successfully!")

    # Now, you can perform further operations with snowflake_conn
except snowflake.connector.errors.DatabaseError as e:
    # Print connection failure message
    print(f"Failed to connect to Snowflake: {e}")

Connected to Snowflake successfully!


## Mongo to Stage

In [8]:
# Create raw_data folder if it doesn't exist
if not os.path.exists("staging_raw_data"):
    os.makedirs("staging_raw_data")

# Iterate over each collection
for collection_name in mongo_db.list_collection_names():
    # Retrieve data from collection
    collection_data = list(mongo_db[collection_name].find())
    
    # Convert data to DataFrame
    df = pd.DataFrame(collection_data)
    
    # Write DataFrame to CSV file
    csv_file_path = f"staging_raw_data/{collection_name}.csv"
    df.to_csv(csv_file_path, index=False)
    print(f"Data from collection '{collection_name}' written to '{csv_file_path}'")



Data from collection 'projectexperiences' written to 'staging_raw_data/projectexperiences.csv'
Data from collection 'skills' written to 'staging_raw_data/skills.csv'
Data from collection 'certificates' written to 'staging_raw_data/certificates.csv'
Data from collection 'users' written to 'staging_raw_data/users.csv'
Data from collection 'approverdetails' written to 'staging_raw_data/approverdetails.csv'
Data from collection 'userskills' written to 'staging_raw_data/userskills.csv'


In [9]:
# Close MongoDB connection
mongo_client.close()

## Ingest Into Snowflake

In [17]:
# Create staging_raw_data folder if it doesn't exist
if not os.path.exists("staging_raw_data"):
    print("No data to process. Exiting.")
    exit()

# Iterate over each CSV file in the staging_raw_data folder
for filename in os.listdir("staging_raw_data"):
    if filename.endswith(".csv"):
        # Extract table name from filename (remove .csv extension)
        table_name = os.path.splitext(filename)[0]
        
        # Read CSV file into DataFrame
        df = pd.read_csv(f"staging_raw_data/{filename}")
        
        # Replace NaN values with empty strings
        df = df.fillna('')
        
        # Convert all data to string
        df = df.astype(str)
        
        # Create table in Snowflake if it doesn't exist
        snowflake_cursor = snowflake_conn.cursor()
        create_table_query = f"CREATE TABLE IF NOT EXISTS {table_name} ("
        for column in df.columns:
            create_table_query += f"{column} VARCHAR,"
        create_table_query = create_table_query[:-1] + ")"  # Remove trailing comma
        snowflake_cursor.execute(create_table_query)
        
        # Prepare INSERT INTO statement
        insert_query = f"INSERT INTO {table_name} VALUES ({','.join(['%s'] * len(df.columns))})"
        
        # Convert DataFrame to list of tuples (rows)
        rows = [tuple(row) for row in df.itertuples(index=False)]
        
        # Execute bulk insert
        snowflake_cursor.executemany(insert_query, rows)
        snowflake_cursor.close()
        
        print(f"Data from '{filename}' inserted into '{table_name}' table in Snowflake.")

# Commit the transaction
snowflake_conn.commit()

# Close Snowflake connection
snowflake_conn.close()

Data from 'approverdetails.csv' inserted into 'approverdetails' table in Snowflake.
Data from 'certificates.csv' inserted into 'certificates' table in Snowflake.
Data from 'projectexperiences.csv' inserted into 'projectexperiences' table in Snowflake.
Data from 'skills.csv' inserted into 'skills' table in Snowflake.
Data from 'users.csv' inserted into 'users' table in Snowflake.
Data from 'userskills.csv' inserted into 'userskills' table in Snowflake.
