In [None]:
# Install required packages
# !pip install snowflake-connector-python
# !pip install neo4j
# !pip install pandas

import os
import pandas as pd
import snowflake.connector
from neo4j import GraphDatabase
from dotenv import load_dotenv
import time

# Load environment variables
load_dotenv()

# Snowflake connection
snowflake_conn = snowflake.connector.connect(
    user=os.getenv("SNOWFLAKE_USER"),
    password=os.getenv("SNOWFLAKE_PASSWORD"),
    account=os.getenv("SNOWFLAKE_ACCOUNT"),
    warehouse=os.getenv("SNOWFLAKE_WAREHOUSE"),
    database=os.getenv("SNOWFLAKE_DATABASE"),
    schema=os.getenv("SNOWFLAKE_SCHEMA")
)

# Neo4j AuraDB connection
URI = os.getenv("NEO4J_URI")
AUTH = (os.getenv("NEO4J_USERNAME"), os.getenv("NEO4J_PASSWORD"))
driver = GraphDatabase.driver(URI, auth=AUTH)

In [4]:
def clear_database(driver):
    with driver.session() as session:
        # Drop all constraints and indexes
        print("Dropping all constraints and indexes...")
        session.run("CALL apoc.schema.assert({}, {})")

        # Delete all nodes and relationships
        print("Deleting all nodes and relationships...")
        session.run("MATCH (n) DETACH DELETE n")

def create_constraints(driver):
    with driver.session() as session:
        # Create constraints
        constraints = [
            "CREATE CONSTRAINT attraction_id IF NOT EXISTS FOR (a:Attraction) REQUIRE a.attraction_id IS UNIQUE",
            "CREATE CONSTRAINT hotel_id IF NOT EXISTS FOR (h:Hotel) REQUIRE h.hotel_id IS UNIQUE",
            "CREATE CONSTRAINT restaurant_id IF NOT EXISTS FOR (r:Restaurant) REQUIRE r.restaurant_id IS UNIQUE",
            "CREATE CONSTRAINT review_id IF NOT EXISTS FOR (r:Review) REQUIRE r.review_id IS UNIQUE",
            "CREATE CONSTRAINT city_state IF NOT EXISTS FOR (c:City) REQUIRE (c.name, c.state) IS UNIQUE"
        ]

        for constraint in constraints:
            try:
                session.run(constraint)
                print(f"Created constraint: {constraint}")
            except Exception as e:
                print(f"Error creating constraint: {str(e)}")

def load_attractions(driver, snowflake_conn):
    print("Loading attractions...")

    # Use cursor instead of pandas
    cursor = snowflake_conn.cursor().execute("""
    SELECT * FROM TRAVEL_GENIE.TRANSFORMED_DATA_TRANSFORMED.ATTRACTIONS
    """)

    # Get column names
    columns = [desc[0].lower() for desc in cursor.description]

    cypher_query = """
    MERGE (c:City {name: $city, state: $state})
    MERGE (a:Attraction {
        attraction_id: toString($attraction_id),
        name: $attraction_name,
        rating: $rating,
        review_count: $review_count,
        ranking: $ranking,
        phone: $phone,
        website: $website,
        email: $email,
        description: $description,
        latitude: $latitude,
        longitude: $longitude
    })
    MERGE (a)-[:LOCATED_IN]->(c)
    """

    # Load data in batches
    batch_size = 500
    count = 0

    with driver.session() as session:
        while True:
            rows = cursor.fetchmany(batch_size)
            if not rows:
                break

            for row in rows:
                # Create dictionary from row data
                params = dict(zip(columns, row))

                # Convert None values to appropriate defaults
                params = {k: (v if v is not None else "") for k, v in params.items()}

                # Convert numeric types to float or int where needed
                params['rating'] = float(params['rating']) if params['rating'] else 0.0
                params['review_count'] = int(float(params['review_count'])) if params['review_count'] else 0
                params['ranking'] = int(float(params['ranking'])) if params['ranking'] else 0
                params['latitude'] = float(params['latitude']) if params['latitude'] else 0.0
                params['longitude'] = float(params['longitude']) if params['longitude'] else 0.0

                try:
                    session.run(cypher_query, params)
                    count += 1
                    if count % 100 == 0:
                        print(f"Processed {count} attractions")
                except Exception as e:
                    print(f"Error processing attraction: {params.get('attraction_name', 'Unknown')}")
                    print(f"Error: {str(e)}")
                    continue

    print(f"Completed loading {count} attractions")

def load_hotels(driver, snowflake_conn):
    print("Loading hotels...")

    cursor = snowflake_conn.cursor().execute("""
    SELECT * FROM TRAVEL_GENIE.TRANSFORMED_DATA_TRANSFORMED.HOTELS
    """)

    columns = [desc[0].lower() for desc in cursor.description]

    cypher_query = """
    MERGE (c:City {name: $city, state: $state})
    MERGE (h:Hotel {
        hotel_id: toString($hotel_id),
        name: $hotel_name,
        rating: $rating,
        review_count: $review_count,
        price_range: $price_range,
        ranking: $ranking,
        phone: $phone,
        address: $address,
        website: $website,
        email: $email,
        hotel_class: $hotel_class,
        number_of_rooms: $number_of_rooms,
        latitude: $latitude,
        longitude: $longitude
    })
    MERGE (h)-[:LOCATED_IN]->(c)
    """

    batch_size = 500
    count = 0

    with driver.session() as session:
        while True:
            rows = cursor.fetchmany(batch_size)
            if not rows:
                break

            for row in rows:
                params = dict(zip(columns, row))
                params = {k: (v if v is not None else "") for k, v in params.items()}

                params['rating'] = float(params['rating']) if params['rating'] else 0.0
                params['review_count'] = int(float(params['review_count'])) if params['review_count'] else 0
                params['ranking'] = int(float(params['ranking'])) if params['ranking'] else 0
                params['latitude'] = float(params['latitude']) if params['latitude'] else 0.0
                params['longitude'] = float(params['longitude']) if params['longitude'] else 0.0

                try:
                    session.run(cypher_query, params)
                    count += 1
                    if count % 100 == 0:
                        print(f"Processed {count} hotels")
                except Exception as e:
                    print(f"Error processing hotel: {params.get('hotel_name', 'Unknown')}")
                    print(f"Error: {str(e)}")
                    continue

    print(f"Completed loading {count} hotels")

def load_restaurants(driver, snowflake_conn):
    print("Loading restaurants...")

    cursor = snowflake_conn.cursor().execute("""
    SELECT * FROM TRAVEL_GENIE.TRANSFORMED_DATA_TRANSFORMED.RESTAURANTS
    """)

    columns = [desc[0].lower() for desc in cursor.description]

    cypher_query = """
    MERGE (c:City {name: $city, state: $state})
    MERGE (r:Restaurant {
        restaurant_id: toString($restaurant_id),
        name: $restaurant_name,
        rating: $rating,
        review_count: $review_count,
        price_category: $price_category,
        ranking: $ranking,
        phone: $phone,
        website: $website,
        email: $email,
        cuisines: $cuisines,
        latitude: $latitude,
        longitude: $longitude
    })
    MERGE (r)-[:LOCATED_IN]->(c)
    """

    batch_size = 500
    count = 0

    with driver.session() as session:
        while True:
            rows = cursor.fetchmany(batch_size)
            if not rows:
                break

            for row in rows:
                params = dict(zip(columns, row))
                params = {k: (v if v is not None else "") for k, v in params.items()}

                params['rating'] = float(params['rating']) if params['rating'] else 0.0
                params['review_count'] = int(float(params['review_count'])) if params['review_count'] else 0
                params['ranking'] = int(float(params['ranking'])) if params['ranking'] else 0
                params['latitude'] = float(params['latitude']) if params['latitude'] else 0.0
                params['longitude'] = float(params['longitude']) if params['longitude'] else 0.0

                try:
                    session.run(cypher_query, params)
                    count += 1
                    if count % 100 == 0:
                        print(f"Processed {count} restaurants")
                except Exception as e:
                    print(f"Error processing restaurant: {params.get('restaurant_name', 'Unknown')}")
                    print(f"Error: {str(e)}")
                    continue

    print(f"Completed loading {count} restaurants")

def load_reviews(driver, snowflake_conn):
    print("Loading reviews...")

    cursor = snowflake_conn.cursor().execute("""
    SELECT * FROM TRAVEL_GENIE.TRANSFORMED_DATA_TRANSFORMED.ATTRACTION_REVIEWS_VECTORIZED
    """)

    columns = [desc[0].lower() for desc in cursor.description]

    cypher_query = """
    MATCH (a:Attraction {attraction_id: $attraction_id})
    MERGE (r:Review {
        review_id: toString($review_id),
        rating: $rating,
        review_text: $review_text,
        review_title: $review_title,
        travel_date: $traveldate,
        trip_type: $trip_type,
        user_name: $user_name,
        user_id: $userid
    })
    MERGE (r)-[:REVIEWS]->(a)
    """

    batch_size = 1000
    count = 0

    with driver.session() as session:
        while True:
            rows = cursor.fetchmany(batch_size)
            if not rows:
                break

            for row in rows:
                params = dict(zip(columns, row))
                params = {k: (v if v is not None else "") for k, v in params.items()}

                params['rating'] = float(params['rating']) if params['rating'] else 0.0

                try:
                    session.run(cypher_query, params)
                    count += 1
                    if count % 1000 == 0:
                        print(f"Processed {count} reviews")
                except Exception as e:
                    print(f"Error processing review: {params.get('review_id', 'Unknown')}")
                    print(f"Error: {str(e)}")
                    continue

    print(f"Completed loading {count} reviews")


def verify_loading(driver):
    with driver.session() as session:
        # Check counts
        queries = [
            "MATCH (a:Attraction) RETURN count(a) as count",
            "MATCH (h:Hotel) RETURN count(h) as count",
            "MATCH (r:Restaurant) RETURN count(r) as count",
            "MATCH (rv:Review) RETURN count(rv) as count",
            "MATCH (c:City) RETURN count(c) as count"
        ]

        for query in queries:
            result = session.run(query).single()
            print(f"{query}: {result['count']}")

def main():
    try:
        # Clear existing data
        # print("Clearing database...")
        # clear_database(driver)

        # # Create constraints
        # print("Creating constraints...")
        # create_constraints(driver)

        # # Load data
        # print("Loading data...")
        # load_attractions(driver, snowflake_conn)
        # load_hotels(driver, snowflake_conn)
        # load_restaurants(driver, snowflake_conn)
        # load_reviews(driver, snowflake_conn)

        # # Verify loading
        print("Verifying data...")
        verify_loading(driver)

    except Exception as e:
        print(f"Error: {str(e)}")
    finally:
        # Close connections
        driver.close()
        snowflake_conn.close()

if __name__ == "__main__":
    main()

Clearing database...
Dropping all constraints and indexes...
Deleting all nodes and relationships...
Creating constraints...
Created constraint: CREATE CONSTRAINT attraction_id IF NOT EXISTS FOR (a:Attraction) REQUIRE a.attraction_id IS UNIQUE
Created constraint: CREATE CONSTRAINT hotel_id IF NOT EXISTS FOR (h:Hotel) REQUIRE h.hotel_id IS UNIQUE
Created constraint: CREATE CONSTRAINT restaurant_id IF NOT EXISTS FOR (r:Restaurant) REQUIRE r.restaurant_id IS UNIQUE
Created constraint: CREATE CONSTRAINT review_id IF NOT EXISTS FOR (r:Review) REQUIRE r.review_id IS UNIQUE
Created constraint: CREATE CONSTRAINT city_state IF NOT EXISTS FOR (c:City) REQUIRE (c.name, c.state) IS UNIQUE
Loading data...
Loading attractions...
Processed 100 attractions
Processed 200 attractions
Processed 300 attractions
Processed 400 attractions
Processed 500 attractions
Processed 600 attractions
Processed 700 attractions
Processed 800 attractions
Processed 900 attractions
Processed 1000 attractions
Processed 1100