In [1]:
import sys
import os
import time
from typing import List, Dict
import json
import pandas as pd
# sys.path.append("/Users/alexandergilmore/Documents/GitHub/Agent-Neo-field/src/main/python/scrape/scraper")
from langchain.embeddings import VertexAIEmbeddings
from scrape import WebContentChunker
from scrape.scraper.embedding import TextEmbeddingService, LangchainEmbeddingService
from scrape.scraper.preprocessing import fix_neo4j_spelling, remove_filler_words
from neo4jwriter.neo4jwriter import Neo4jWriter
from google.cloud import aiplatform, storage
from google.oauth2 import service_account
from uuid import uuid4
import backoff
import grpc




## Define variables and methods

In [2]:
google_credentials = service_account.Credentials.from_service_account_file(
    os.environ.get("GCP_SERVICE_ACCOUNT_KEY_PATH")
)

In [3]:
# embedding_service = TextEmbeddingService('textembedding-gecko@001', 
#                                             aiplatform_client=aiplatform.init(project=os.environ.get('GCP_PROJECT'), 
#                                             location=os.environ.get('GCP_REGION'), 
#                                             credentials=google_credentials))

In [4]:
vertex = VertexAIEmbeddings(credentials=google_credentials, 
                            project=os.environ.get('GCP_PROJECT'), 
                            location=os.environ.get('GCP_REGION'),
                            model_name="textembedding-gecko@001")

embedding_service = LangchainEmbeddingService(vertex)

In [5]:
def batch_method(lst, batch_size=500):
    for i in range(0, len(lst), batch_size):
        yield lst[i:i + batch_size]


def prepare_new_nodes(data: List, playlist_id: str = "") -> List[Dict]:
    """
    format chunked data to be uploaded into neo4j graph.
    embedding must abide by rate limits: 60 requests / minute
    """

    total = len(data)
    new_nodes = data.copy()

    i = 0
    # print("total chunks to process: ", total)
    for chunk in new_nodes:
        

            # try:
        start = time.time()

        # make request
        # print("batch percent: ", str(round((i+1) / total, 4)*100)[:4], "%", " request", i+1, end="\r")
        chunk.update({  "index": str(uuid4()),
                        "playlist_id": playlist_id,
                        "embedding": embedding_service.generate_embedding(chunk['transcript'])})
        
        stop = time.time()
        # abide by rate limit (1 per sec)
        while stop - start < 1:
            stop = time.time()

        i+=1

            # except Exception as e:

                # if i < try_limit:
                #     i+=1
                #     time.sleep(i+1)
                #     continue
                # else:
                # print(e)
                # return new_nodes, i
            
    return new_nodes, i

In [6]:
with open("scrape/scraper/resources/youtube_playlist_ids.json") as json_file:
    playlists = json.load(json_file)

In [7]:
list(playlists.keys())[0]

'Going Meta'

In [8]:
chunker = WebContentChunker()

In [9]:
# push to prod cause this seems to work
writer = Neo4jWriter(neo4j_password="nV-6qhEkZOXfNAcOOD0eUTXLeu-tzEy7bNjTDyv5niY", 
                     neo4j_url="neo4j+s://9eb79ecf.databases.neo4j.io")

# writer = Neo4jWriter()

In [10]:
query = """
UNWIND $params AS param

MERGE (d:Document {index: param.index})
MERGE (s:Source {url: param.url})
MERGE (t:Type {type: "YouTube transcript"})
SET
    d.createTime = datetime(),
    d.text = param.transcript,
    d.url = param.url,
    d.embedding = param.embedding,
    
    s.title = param.title,
    s.playlist_id = param.playlist_id,
    s.video_id = param.video_id,
    s.publish_date = param.publish_date
    
MERGE (d)-[:HAS_SOURCE]->(s)
MERGE (s)-[:HAS_TYPE]->(t)
"""

In [11]:
trouble_playlists = {'Going Meta': 'PL9Hl4pk2FsvX-5QPvwChB-ni_mFF97rCE',
 'Ask a Data Scientist: Season III': 'PL9Hl4pk2FsvVkj5ZrwtYiiwQVzDB7ZhcC',
}

## Load Code

In [12]:
for playlist, pl_id in playlists.items():

    chunker._chunked_documents = []
    chunker.chunk_youtube_transcripts(playlist_title=playlist,
                                  cleaning_functions=[fix_neo4j_spelling, remove_filler_words])
    
    result = []
    failed_idx = None
    playlist_total = len(chunker.youtube_chunks_as_list)

    print(playlist, playlist_total)
    print("grabbing embeddings...")
    for idx, batch in enumerate(batch_method(chunker.youtube_chunks_as_list, 20)):
        new_nodes, failed_idx = prepare_new_nodes(data=batch, playlist_id=pl_id)
        result+=new_nodes
        print("total percent: ", str(round(((20*idx)+1) / playlist_total, 4)*100)[:4], "%", " batch", idx+1, end="\r")

    print("chunks processed: ", len(result))
    if failed_idx is not None:
        print("playlist     : ", playlist)
        print("failed index : ", failed_idx)

    print("loading to graph...")
    writer.batch_write(cypher_query=query, params=result, batch_size=100)

    print("playlist upload complete!")
    

Going Meta 667
grabbing embeddings...
chunks processed:  667  batch 34
playlist     :  Going Meta
failed index :  7
loading to graph...
playlist upload complete!
Nodes 2023 GenAI 116
grabbing embeddings...
chunks processed:  116  batch 6
playlist     :  Nodes 2023 GenAI
failed index :  16
loading to graph...
playlist upload complete!
Ask a Data Scientist: Season III 9
grabbing embeddings...
chunks processed:  9 %  batch 1
playlist     :  Ask a Data Scientist: Season III
failed index :  9
loading to graph...
playlist upload complete!
Error loading document with id: youtube/transcripts/Ask a Data Scientist: Season III/2X4ZZGjOtXo
Error: 'NoneType' object has no attribute 'download_as_text'
Error loading document with id: youtube/transcripts/Ask a Data Scientist: Season III/5O7wEUrBSLY
Error: 'NoneType' object has no attribute 'download_as_text'
Error loading document with id: youtube/transcripts/Ask a Data Scientist: Season III/9zk5hcHV3-I
Error: 'NoneType' object has no attribute 'downl