In [1]:
# Required Installs

# %sudo cp /home/jovyan/work/jars/neo4j-connector-apache-spark_2.12-4.1.0_for_spark_3.jar /usr/local/spark/jars/neo4j-connector-apache-spark_2.12-4.1.0_for_spark_3.jar
# %pip install --upgrade google-api-python-client -q
# %pip install pymongo cassandra-driver -q

# Imports and Spark Session Setup

In [2]:
import os
import pyspark
import googleapiclient.discovery
from pyspark.sql import SparkSession
from cassandra.cluster import Cluster
from pyspark.sql import functions as F
from pyspark.sql.window import Window
import time


# MONGO CONFIGURATION
mongo_uri = "mongodb://admin:mongopw@mongo:27017/demo.feedback?authSource=admin"

# CASSANDRA CONFIGURATION
cassandra_host = "cassandra"

#NEO4J CONFIGURATION
bolt_url = "bolt://neo4j:7687"


def setup_spark_session(mongo_uri, cassandra_host, bolt_url):
    try:
        # Spark init
        spark = SparkSession.builder \
            .master("local") \
            .appName('jupyter-pyspark') \
            .config("spark.mongodb.input.uri", mongo_uri) \
            .config("spark.mongodb.output.uri", mongo_uri) \
            .config("spark.jars.packages","org.mongodb.spark:mongo-spark-connector_2.12:3.0.1")\
            .config("spark.cassandra.connection.host", cassandra_host) \
            .config("spark.jars.packages","com.datastax.spark:spark-cassandra-connector-assembly_2.12:3.1.0")\
            .getOrCreate()

        sc = spark.sparkContext
        sc.setLogLevel("ERROR")

        print("Spark is setup")
        
        return spark
        

    except Exception as e:
        print(f"An error occurred during Spark setup: {e}")

## Mongo Functions

In [3]:
# Use Youtube API to get video title from video id
def get_video_title(video_id, youtube_config):
    try:
        response = youtube_config.videos().list(
            part='snippet',
            id=video_id
        ).execute()

        # Get video title from response
        video_title = response['items'][0]['snippet']['title']

        return video_title

    except Exception as e:
        print(f"Error fetching video title for {video_id}: {str(e)}")
        return None


# API call to Youtube to get 100 of the most recent comments and related data from a video
def get_youtube_comment_data(video_id, youtube_config):
    max_results = 100
    comment_list = []

    try:
        video_title = get_video_title(video_id, youtube_config)

        # Loop through pages to get 100 comments (only does about 20 at a time)
        while len(comment_list) < max_results:
            request = youtube_config.commentThreads().list(
                part="snippet,replies",
                maxResults=min(100, max_results - len(comment_list)),
                textFormat="plainText",
                videoId=video_id,
                pageToken=None if not comment_list else comment_list[-1].get("nextPageToken"),
                prettyPrint=True
            )

            # Send request
            response = request.execute()

            # Iterate through comments and extract relevant information
            comment_list.extend(
                {
                    "_id": item["id"],
                    "video_id": item["snippet"]["videoId"],
                    "video_title": video_title,
                    "author_display_name": item["snippet"]["topLevelComment"]["snippet"]["authorDisplayName"],
                    "text_original": item["snippet"]["topLevelComment"]["snippet"]["textOriginal"],
                    "like_count": item["snippet"]["topLevelComment"]["snippet"]["likeCount"],
                    "repliesCount": item["snippet"]["totalReplyCount"],
                    "datetime_posted": item["snippet"]["topLevelComment"]["snippet"]["publishedAt"],
                }
                for item in response.get("items", [])
            )

    except Exception as e:
        print(f"An error occurred: {str(e)}")

    return comment_list


# Define a lambda function to process each video_id, NO FOR LOOP!
process_video = lambda video_id, spark, youtube_config: spark.createDataFrame(get_youtube_comment_data(video_id, youtube_config)) \
    .write.format("mongo") \
    .mode("append") \
    .option("replaceDocument", "false") \
    .option("database", "youtube_comments") \
    .option("collection", "video_comments") \
    .save()


def write_comments_to_mongo(spark):
    API_KEY = "AIzaSyDsMwmQeItUE4T4Stzq6mYTxelrdOaUL_8"
    youtube_config = googleapiclient.discovery.build("youtube", "v3", developerKey=API_KEY)

    # Youtube IDs of videos we want to get comments from
    video_list = ["gir8BEqAutk", "mvVBuG4IOW4", "lUvBk4owRNU"]

    # Apply function to each video_id
    list(map(lambda video_id: process_video(video_id, spark, youtube_config), video_list))

    print("Youtube comment data written to Mongo DB")


def read_mongo(spark):
    try:
        # Read from MongoDB
        df = spark.read \
            .format("mongo") \
            .option("uri", "mongodb://admin:mongopw@mongo:27017/") \
            .option("database", "youtube_comments") \
            .option("collection", "video_comments") \
            .option("authSource", "admin") \
            .load()

        # Calculate the count of distinct video_ids
        distinct_video_id_count = df.select("video_id").distinct().count()

        print(f"Mongo read: {df.count()} comments with {distinct_video_id_count} unique video_ids.")

        return df

    except Exception as e:
        print(f"An error occurred during MongoDB read: {e}")
        return None


## CASSANDRA Functions

In [4]:
def write_to_cassandra(df, cassandra_options):
    try:
        cassandra_host = cassandra_options.get("cluster", "localhost")  # Default to localhost if not specified

        # CQL statements
        create_keyspace_cassandra_sql = """
            CREATE KEYSPACE IF NOT EXISTS youtube_comments
            WITH REPLICATION = {'class': 'SimpleStrategy', 'replication_factor': 1};
            """

        create_table_cassandra_sql = """
            CREATE TABLE IF NOT EXISTS youtube_comments.video_comments
            (
                id text,
                author_display_name text,
                datetime_posted timestamp,
                like_count bigint,
                repliescount bigint,
                text_original text,
                video_id text,
                video_title text,
                PRIMARY KEY (video_title, datetime_posted)
            );
            """

        # Cassandra connection setup
        with Cluster([cassandra_host]) as cluster:
            session = cluster.connect()

            session.execute(create_keyspace_cassandra_sql)
            session.execute("USE youtube_comments;")
            session.execute(create_table_cassandra_sql)

        # Give Cassandra some time to reflect table creation if no tables existed. 
        # Next chunk will return no table error if no delay.
        time.sleep(3)

        # Had to rename cols because Cassandra will not have a field starting with an underscore
        df_cassandra = df.toDF(
            "id",
            "author_display_name",
            "datetime_posted",
            "like_count",
            "repliescount",
            "text_original",
            "video_id",
            "video_title"
        )

        # Write data from Spark DataFrame to Cassandra table
        df_cassandra.write \
            .format("org.apache.spark.sql.cassandra") \
            .mode("append") \
            .options(**cassandra_options) \
            .save()

        print("Comments written to Cassandra")

    except Exception as e:
        print(f"An error occurred: {e}")

cassandra_options = {
    "table": "video_comments",
    "keyspace": "youtube_comments",
    "cluster": cassandra_host
}


def define_cassandra_queries(spark, cassandra_options):
    try:
        cassandra_comments = spark.read \
            .format("org.apache.spark.sql.cassandra") \
            .options(**cassandra_options) \
            .load()

        cassandra_comments.createOrReplaceTempView("blank_space_comments")

        blank_space_query_1 = """
            SELECT 
                video_title,
                datetime_posted,
                text_original
            FROM blank_space_comments
            WHERE video_title = "Taylor Swift - Blank Space (Taylor's Version) (Lyric Video)"
            ORDER BY datetime_posted DESC;
            """

        blank_space_query_2 = """
            SELECT 
                video_title,
                like_count,
                text_original
            FROM blank_space_comments
            WHERE video_title = "Taylor Swift - Blank Space (Taylor's Version) (Lyric Video)"
            ORDER BY like_count DESC;
            """

        blank_space_result_1 = spark.sql(blank_space_query_1)

        blank_space_result_2 = spark.sql(blank_space_query_2)

        # The query executes on Cassandra, not Spark (proof)
        spark.sql(blank_space_query_1).explain()

        return blank_space_result_1, blank_space_result_2

    except Exception as e:
        print(f"An error occurred: {e}")
        return None, None

## NEO4J Data Preparation <br>
At the time of writing, the full dataset contains 541 rows. This is too much to attempt to display in neo4j.  <br>
This next section filters the full spark dataframe, df, to only include authors who have commented on at least 2 of the 3 videos. 

In [5]:
def filter_to_2_vid_comment_authors(df):
    try:
        # Create new df without 'text_original' column
        df_without_text = df.drop('text_original')

        # Get distinct author_display_name and video_title combinations
        distinct_authors_videos = df_without_text.select('author_display_name', 'video_title').distinct()

        # Filter authors by those who commented on at least two unique videos
        window_spec = Window.partitionBy('author_display_name').orderBy('video_title')
        author_video_counts = distinct_authors_videos.select(
            'author_display_name',
            'video_title',
            F.row_number().over(window_spec).alias('comment_count')
        ).filter(F.col('comment_count') >= 2)

        # Filter original df based on selected comment authors
        filtered_authors = author_video_counts.select('author_display_name').distinct()

        df_2vid_comment_authors = df.join(filtered_authors, 'author_display_name', 'inner')

        return df_2vid_comment_authors

    except Exception as e:
        print(f"An error occurred: {e}")
        return None
    
    
def write_neo4j(df):
    try:
        df_n4j = df.select("video_title", "author_display_name", "text_original")

        # Writing Author node
        cql_query = """
        MERGE (a:Author {author: event.author_display_name})
        """
        df_n4j.write.format("org.neo4j.spark.DataSource") \
            .mode("Overwrite") \
            .option("url", bolt_url) \
            .option("query", cql_query) \
            .save()

        print("Author nodes written.")

        # Writing Video node
        cql_query = """
        MERGE (v:Video {title: event.video_title})
        """
        df_n4j.write.format("org.neo4j.spark.DataSource") \
            .mode("Overwrite") \
            .option("url", bolt_url) \
            .option("query", cql_query) \
            .save()
        
        print("Video nodes written.")
        
        # Creating relationship
        cql_query = """
        MATCH (v:Video {title: event.video_title}), (a:Author {author: event.author_display_name})
        MERGE (a)-[:COMMENTS_ON {comment: event.text_original}]->(v)
        """
        df_n4j.write.format("org.neo4j.spark.DataSource") \
            .mode("Overwrite") \
            .option("url", bolt_url) \
            .option("query", cql_query) \
            .save()
        print("Comment relationships written.")
        
        print("Neo4J Success! Checkout the web browser")

    except Exception as e:
        print(f"An error occurred: {str(e)}")

# Run Program

In [10]:
spark = setup_spark_session(mongo_uri, cassandra_host, bolt_url)

write_comments_to_mongo(spark)

df = read_mongo(spark)

write_to_cassandra(df, cassandra_options)

blank_space_result_1, blank_space_result_2 = define_cassandra_queries(spark, cassandra_options)

Spark is setup
Youtube comment data written to Mongo DB


                                                                                

Mongo read: 615 comments with 3 unique video_ids.


                                                                                

Comments written to Cassandra
== Physical Plan ==
*(1) Sort [datetime_posted#502 DESC NULLS LAST], true, 0
+- *(1) Project [video_title#501, datetime_posted#502, text_original#507]
   +- BatchScan[video_title#501, datetime_posted#502, text_original#507] Cassandra Scan: youtube_comments.video_comments
 - Cassandra Filters: [["video_title" = ?, Taylor Swift - Blank Space (Taylor's Version) (Lyric Video)]]
 - Requested Columns: [video_title,datetime_posted,text_original]




### Show most recent comments on the Blank Space video

In [7]:
blank_space_result_1.toPandas().head(5)

                                                                                

Unnamed: 0,video_title,datetime_posted,text_original
0,Taylor Swift - Blank Space (Taylor's Version) ...,2023-12-03 13:02:53,Cherry kios I'm your queen
1,Taylor Swift - Blank Space (Taylor's Version) ...,2023-12-03 13:02:06,New money I can read you I I can see a bad guy
2,Taylor Swift - Blank Space (Taylor's Version) ...,2023-12-03 08:04:51,"Taylor i love you,you song are the best🎉❤"
3,Taylor Swift - Blank Space (Taylor's Version) ...,2023-12-03 05:45:50,nice
4,Taylor Swift - Blank Space (Taylor's Version) ...,2023-12-03 03:03:20,"So elegant, so beautiful u just look like a W..."


### Show Top liked comments on the Blank Space video

In [8]:
blank_space_result_2.toPandas().head(5)

Unnamed: 0,video_title,like_count,text_original
0,Taylor Swift - Blank Space (Taylor's Version) ...,10,"A VOZ DESSA MULHER É VICIANTE, AS MÚSICAS SÃO!..."
1,Taylor Swift - Blank Space (Taylor's Version) ...,4,Top show gostei linda voz linda ❤😮😊
2,Taylor Swift - Blank Space (Taylor's Version) ...,4,Me encantaaaa❤
3,Taylor Swift - Blank Space (Taylor's Version) ...,3,"Haha she changed the "" Starbucks lovers"" 😂❤"
4,Taylor Swift - Blank Space (Taylor's Version) ...,3,I love the spareness of this version. Taylor's...


In [9]:
df_2vid_comment_authors = filter_to_2_vid_comment_authors(df)
write_neo4j(df_2vid_comment_authors)

                                                                                

Author nodes written.


                                                                                

Video nodes written.


                                                                                

Comment relationships written.
Neo4J Success! Checkout the web browser




Run this in the neo4j Web UI to see graph

_MATCH p = (a:Author)-[:COMMENTS_ON]->(v:Video) RETURN p;_