In [None]:
!nvidia-smi

In [None]:
!pip install snowflake-connector-python transformers accelerate torch huggingface_hub safetensors datasets sqlalchemy snowflake-sqlalchemy tqdm

In [None]:
from google.colab import userdata
userdata.get('HF_Token')

In [None]:
import warnings
warnings.filterwarnings('ignore', category=FutureWarning)

import snowflake.connector
from snowflake.connector.pandas_tools import write_pandas
import pandas as pd
import numpy as np
from typing import List, Dict
from tqdm.notebook import tqdm
import torch
from transformers import AutoTokenizer, AutoModel, AutoConfig
import torch.nn.functional as F
from datetime import datetime, timezone
import os
from sqlalchemy import create_engine, text
from sqlalchemy.engine.url import URL

class NVEmbedPipeline:
    def __init__(
        self,
        account: str,
        user: str,
        password: str,
        warehouse: str,
        hf_token: str,
        database: str = "TRAVEL_GENIE",
        source_schema: str = "RAW",
        dest_schema: str = "TRANSFORMED_DATA",
        model_name: str = "nvidia/NV-Embed-v2",
        batch_size: int = 32
    ):
        """Initialize the pipeline with Snowflake and model configurations"""
        # Create SQLAlchemy engine
        connection_url = URL.create(
            drivername="snowflake",
            username=user,
            password=password,
            host=account,
            database=database,
            query={
                "warehouse": warehouse,
                "schema": source_schema
            }
        )
        self.engine = create_engine(connection_url)

        # Create direct snowflake connection
        self.conn = snowflake.connector.connect(
            user=user,
            password=password,
            account=account,
            warehouse=warehouse,
            database=database,
            schema=dest_schema
        )

        self.database = database
        self.source_schema = source_schema
        self.dest_schema = dest_schema
        self.batch_size = batch_size
        self.model_name = model_name

        # Set HF token
        os.environ["HF_TOKEN"] = hf_token

        # Load model components
        self._load_model(hf_token)

    def _load_model(self, hf_token):
        """Load the model, tokenizer, and config"""
        print(f"Loading {self.model_name} model...")

        self.config = AutoConfig.from_pretrained(
            self.model_name,
            token=hf_token,
            trust_remote_code=True
        )
        print("Model config:", self.config)

        print("Loading tokenizer...")
        self.tokenizer = AutoTokenizer.from_pretrained(
            self.model_name,
            token=hf_token,
            trust_remote_code=True,
            model_max_length=512
        )

        print("Loading model...")
        self.model = AutoModel.from_pretrained(
            self.model_name,
            token=hf_token,
            trust_remote_code=True,
            torch_dtype=torch.float16,
            config=self.config
        )

        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        self.model = self.model.to(self.device)
        self.model.eval()
        print(f"Model loaded successfully on {self.device}")

    def create_vectorized_table(self):
        """Create the vectorized reviews table"""
        try:
            with self.engine.connect() as conn:
                # Drop existing table if it exists
                drop_table_sql = text(f"""
                DROP TABLE IF EXISTS {self.database}.{self.dest_schema}.ATTRACTION_REVIEWS_VECTORIZED
                """)
                conn.execute(drop_table_sql)

                # Create schema
                schema_sql = text(f"CREATE SCHEMA IF NOT EXISTS {self.database}.{self.dest_schema}")
                conn.execute(schema_sql)

                # Create table
                create_table_sql = text(f"""
                CREATE TABLE {self.database}.{self.dest_schema}.ATTRACTION_REVIEWS_VECTORIZED (
                    REVIEW_ID STRING,
                    ATTRACTION_ID STRING,
                    ATTRACTION_NAME STRING,
                    CITY STRING,
                    COUNTRY STRING,
                    POSTALCODE STRING,
                    STATE STRING,
                    LATITUDE FLOAT,
                    LONGITUDE FLOAT,
                    REVIEW_DATE DATE,
                    RATING INT,
                    REVIEW_TEXT STRING,
                    REVIEW_TITLE STRING,
                    TRAVELDATE DATE,
                    TRIP_TYPE STRING,
                    USER_NAME STRING,
                    USERID STRING,
                    REVIEW_VECTOR ARRAY,
                    MODEL_NAME STRING,
                    EMBEDDING_DIMENSION INT,
                    CREATED_AT TIMESTAMP_NTZ,
                    UPDATED_AT TIMESTAMP_NTZ
                )
                """)
                conn.execute(create_table_sql)
                conn.commit()
                print("Table created successfully")
        except Exception as e:
            print(f"Error creating table: {str(e)}")
            raise

    @torch.no_grad()
    def generate_embeddings(self, texts: List[str]) -> np.ndarray:
        """Generate embeddings using NV-Embed-v2"""
        embeddings = []
        embedding_dim = None

        for i in tqdm(range(0, len(texts), self.batch_size)):
            batch_texts = texts[i:i + self.batch_size]

            if torch.cuda.is_available():
                torch.cuda.empty_cache()

            encoded = self.tokenizer(
                batch_texts,
                padding=True,
                truncation=True,
                max_length=512,
                return_tensors="pt"
            ).to(self.device)

            outputs = self.model(
                input_ids=encoded['input_ids'],
                attention_mask=encoded['attention_mask']
            )

            if i == 0:
                print("Output type:", type(outputs))
                print("Output structure:", outputs)
                print("Output keys:", outputs.keys())

            # Get sentence embeddings and take mean
            sentence_embeddings = outputs['sentence_embeddings']
            batch_embeddings = torch.mean(sentence_embeddings, dim=1)

            # Normalize embeddings
            batch_embeddings = F.normalize(batch_embeddings, p=2, dim=1)
            batch_embeddings = batch_embeddings.cpu().numpy()

            if embedding_dim is None:
                embedding_dim = batch_embeddings.shape[1]
                print(f"Setting embedding dimension to: {embedding_dim}")

            embeddings.append(batch_embeddings)

            if i == 0:
                print(f"Raw sentence embeddings shape: {sentence_embeddings.shape}")
                print(f"Mean pooled embeddings shape: {batch_embeddings.shape}")

        final_embeddings = np.vstack(embeddings)
        print(f"Final embeddings shape: {final_embeddings.shape}")
        return final_embeddings

    def process_reviews(self, chunk_size: int = 1500):
        """Process reviews and generate embeddings"""
        print("Creating vectorized table...")
        self.create_vectorized_table()

        print(f"Reading data from Snowflake in chunks of {chunk_size} rows...")
        query = text(f"""
        SELECT
            ID as review_id,
            LOCATIONID as attraction_id,
            NAME as attraction_name,
            CITY as city,
            COUNTRY as country,
            POSTALCODE as postalcode,
            STATE as state,
            LATITUDE as latitude,
            LONGITUDE as longitude,
            PUBLISHEDDATE::DATE as review_date,
            RATING as rating,
            "TEXT" as review_text,
            "TITLE" as review_title,
            TRAVELDATE::DATE as traveldate,
            TRIP_TYPE as trip_type,
            USER_NAME as user_name,
            USERID as userid
        FROM {self.database}.{self.source_schema}.ATTRACTION_REVIEWS
        WHERE LOCATIONID IN (
            SELECT ID
            FROM {self.database}.{self.source_schema}.ATTRACTIONS
        )
        """)

        total_processed = 0
        try:
            for chunk_df in pd.read_sql(query, self.engine, chunksize=chunk_size):
                chunk_size = len(chunk_df)
                total_processed += chunk_size
                print(f"Processing chunk of {chunk_size} reviews... (Total processed: {total_processed})")

                # Generate embeddings
                combined_texts = [
                    f"Title: {row['review_title'] if pd.notna(row['review_title']) else ''}\nReview: {row['review_text'] if pd.notna(row['review_text']) else ''}"
                    for _, row in chunk_df.iterrows()
                ]
                embeddings = self.generate_embeddings(combined_texts)
                embedding_arrays = [emb.tolist() for emb in embeddings]

                # Add metadata columns
                current_time = datetime.now(timezone.utc).replace(tzinfo=None)  # Remove timezone
                chunk_df['created_at'] = current_time
                chunk_df['updated_at'] = current_time
                chunk_df['review_vector'] = embedding_arrays
                chunk_df['model_name'] = self.model_name
                chunk_df['embedding_dimension'] = embeddings[0].shape[0]

                # Convert date columns to datetime without timezone
                date_columns = ['review_date', 'traveldate']
                for col in date_columns:
                    if col in chunk_df.columns:
                        chunk_df[col] = pd.to_datetime(chunk_df[col]).dt.date

                # Rename columns to uppercase
                column_mapping = {col.lower(): col.upper() for col in chunk_df.columns}
                chunk_df = chunk_df.rename(columns=column_mapping)

                # Write chunk to Snowflake
                print(f"Writing chunk of {len(chunk_df)} rows to Snowflake...")
                success, nchunks, nrows, _ = write_pandas(
                    conn=self.conn,
                    df=chunk_df,
                    table_name='ATTRACTION_REVIEWS_VECTORIZED',
                    database=self.database,
                    schema=self.dest_schema,
                    quote_identifiers=False,
                    auto_create_table=False,
                    chunk_size=chunk_size,
                    use_logical_type=True
                )

                if not success:
                    raise Exception(f"Failed to write chunk to Snowflake after {total_processed} records")

                print(f"Successfully wrote {nrows} rows to Snowflake")

            print(f"Processing completed. Total rows processed: {total_processed}")
            return True, total_processed

        except Exception as e:
            print(f"Error processing reviews: {str(e)}")
            raise
        finally:
            self.conn.close()

    def __del__(self):
        """Cleanup connections"""
        try:
            if hasattr(self, 'conn'):
                self.conn.close()
        except:
            pass

# Usage
if __name__ == "__main__":
    # Replace these with your actual credentials
    config = {
        "account": "SFEDU02-URB63596",
        "user": "BULLFROG",
        "password": "9987323889Ritesh.",
        "warehouse": "ANIMAL_TASK_WH",
        "database": "TRAVEL_GENIE",
        "source_schema": "RAW",
        "dest_schema": "TRANSFORMED_DATA_TRANSFORMED",
        "hf_token": "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"
    }

    # Clear CUDA cache if available
    if torch.cuda.is_available():
        torch.cuda.empty_cache()

    try:
        # Initialize pipeline
        pipeline = NVEmbedPipeline(**config)

        # Process reviews
        success, nrows = pipeline.process_reviews()

        if success:
            print(f"Successfully processed {nrows} reviews")
        else:
            print("Processing completed with errors")

    except Exception as e:
        print(f"Error running pipeline: {str(e)}")
        raise