In [None]:
!pip install pyspark
!pip install dask_ml

# Preprocess

In [None]:
from pathlib import Path
import dask
from dask.distributed import Client
import dask.dataframe as dd
import json
import logging

TRENDING_VIDEO_DATA = "data/youtube-trending-video-dataset"
FOREIGN_LANGUAGES = ["BR", "DE","FR", "IN", "JP", "KR", "MX", "RU"]

def preprocess(data_path: Path, category_file: Path):
    with category_file.open() as f:
        cat_items = json.load(f)["items"]
    cat_dic = {int(c["id"]): c["snippet"]["title"] for c in cat_items}

    ddf = dd.read_csv(data_path.as_posix())
    ddf["category_name"] = ddf["categoryId"].map(cat_dic)
    ddf["description"] = ddf["description"].fillna("")

    ddf.dask.visualize(filename=f"{data_path.stem}_no_translation.svg")
    processed_file_name =  f"{data_path.stem}_processed.csv"
    ddf.to_csv(data_path.parent.joinpath(processed_file_name).as_posix(), single_file=True, index=False)


In [None]:

from google.colab import drive
drive.mount('/content/drive')
root_path = "/content/drive/MyDrive/NUS/CS5344/"
for country in ["CA", "GB", "US"]:
    data_path = Path(root_path + country + "_youtube_trending_data.csv")
    category_file = Path(root_path + country + "_category_id.json")
    preprocess(data_path=data_path, category_file=category_file)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

# Load your data into a Pandas DataFrame
# Replace 'your_data.csv' with the actual filename or path to your data
df = pd.read_csv("/content/drive/MyDrive/NUS/CS5344/CA_youtube_trending_data_processed.csv", lineterminator='\n')

df.head()

# Task 1


if I have csv file which is the table contain column video_id, views, categoryId, catergory_name, title, trending_date, description. I want to find the top 10 frequent words in description. however, the video_id is not unique as the data may update when the date change. I want the data before 01, November 2023 for calculation. that means, if the video_id is repeated, used the latest date before 01, November 2023 in column "trending_date". please use pyspark and tfidf.


In [9]:
import pandas as pd
import nltk
nltk.download('stopwords')
nltk.download('punkt')
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


## Sample in pandas

In [None]:
import pandas as pd
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.tokenize import word_tokenize

root_path = "/content/drive/MyDrive/NUS/CS5344/"
for country in ["CA", "GB", "US"]:
    data_path = root_path + country + "_youtube_trending_data_processed.csv"
    vars()[country] = pd.read_csv(data_path, lineterminator='\n')

df = pd.concat([CA, GB, US], axis=0)

# Handle missing values in the 'description' column
df['description'] = df['description'].fillna('')

# Filter data before 01, November 2023
df['trending_date'] = pd.to_datetime(df['trending_date'])
df = df[df['trending_date'] < "2023-11-01"]

# Get the latest trending date for each video_id
df['latest_trending_date'] = df.groupby('video_id')['trending_date'].transform('max')
df = df[df['trending_date'] == df['latest_trending_date']]

# Tokenize and remove stopwords
stop_words = set(stopwords.words('english'))

def tokenize_and_remove_stopwords(text):
    tokens = word_tokenize(text)
    filtered_tokens = [word.lower() for word in tokens if word.isalpha() and word.lower() not in stop_words and not any(stop_word in word.lower() for stop_word in ["www", "http", "https",'video', 'youtube', 'facebook', 'new', 'follow', 'like', 'watch', 'subscribe', 'channel'])]
    return filtered_tokens

# Tokenize and remove stop words using scikit-learn TfidfVectorizer
new = pd.concat([df['description'], df['title']], axis=0)
vectorizer = TfidfVectorizer(stop_words='english', tokenizer=tokenize_and_remove_stopwords, max_features=10)
X = vectorizer.fit_transform(new)

# Sum the TF-IDF scores for each word across all documents
sum_tfidf = X.sum(axis=0)

# Get the indices of the top 10 words
top_word_indices = sum_tfidf.argsort()[0, ::-1][:10]

# Get the actual words using the indices
top_words = [vectorizer.get_feature_names_out()[idx] for idx in top_word_indices]

# Display the result
print("Top 10 words based on TF-IDF:")
print(top_words)

## Dask

In [None]:
import dask.dataframe as dd
from dask.distributed import Client
from sklearn.feature_extraction.text import TfidfVectorizer
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords

# Assuming you have a Dask cluster set up, you can create a client
client = Client()

root_path = "/content/drive/MyDrive/NUS/CS5344/"
dfs = []

for country in ["CA", "GB", "US"]:
    data_path = root_path + country + "_youtube_trending_data_processed.csv"
    df = dd.read_csv(data_path, lineterminator='\n')
    dfs.append(df)

# Concatenate without resetting the index
df = dd.concat(dfs, axis=0)

# Handle missing values in the 'description' column
df['description'] = df['description'].fillna('')

# Filter data before 01, November 2023
df['trending_date'] = dd.to_datetime(df['trending_date'])
df = df[df['trending_date'] < "2023-11-01"]

# Get the latest trending date for each video_id
df['latest_trending_date'] = df.groupby('video_id')['trending_date'].transform('max')
df = df[df['trending_date'] == df['latest_trending_date']]

# Tokenize and remove stopwords
stop_words = set(stopwords.words('english'))

def tokenize_and_remove_stopwords(text):
    tokens = word_tokenize(text)
    filtered_tokens = [word.lower() for word in tokens if word.isalpha() and word.lower() not in stop_words and not any(stop_word in word.lower() for stop_word in ["www", "http", "https", 'video', 'youtube', 'facebook', 'new', 'follow', 'like', 'watch', 'subscribe', 'channel'])]
    return filtered_tokens

# Tokenize and remove stop words using scikit-learn TfidfVectorizer
vectorizer = TfidfVectorizer(stop_words='english', tokenizer=tokenize_and_remove_stopwords, max_features=10)

# Concatenate description and title, tokenize and vectorize
df['combined_text'] = df['description'] + ' ' + df['title']
X = vectorizer.fit_transform(df['description'].compute())

# Sum the TF-IDF scores for each word across all documents
sum_tfidf = X.sum(axis=0)

# Get the indices of the top 10 words
top_word_indices = sum_tfidf.argsort()[0, ::-1][:10]

# Get the actual words using the indices
top_words = [vectorizer.get_feature_names_out()[idx] for idx in top_word_indices]

# Display the result
print("Top 10 words based on TF-IDF:")
print(top_words)


## Pyspark

In [None]:
from functools import reduce
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.sql.window import Window

# Create a Spark session
spark = SparkSession.builder.appName("YouTubeTrendingAnalysis").getOrCreate()

# Specify the root path
root_path = "/content/drive/MyDrive/NUS/CS5344/"

# Read data for each country into PySpark DataFrames
countries = ["CA", "GB", "US"]
dfs = []

for country in countries:
    data_path = root_path + country + "_youtube_trending_data_processed.csv"
    df_country = spark.read.option("header", "true").csv(data_path, multiLine=True)
    dfs.append(df_country)

# Concatenate the PySpark DataFrames
df = reduce(lambda x, y: x.union(y), dfs)

# Handle missing values in the 'description' column
df = df.na.fill('')

# Filter data before 01, November 2023
df = df.withColumn("trending_date", df["trending_date"].cast("date"))
df = df.filter(col("trending_date") < "2023-11-01")

# Get the latest trending date for each video_id
df = df.withColumn("latest_trending_date", max("trending_date").over(Window.partitionBy("video_id")))
df = df.filter(col("trending_date") == col("latest_trending_date")).drop("latest_trending_date")

# Tokenize and remove stopwords
tokenizer = Tokenizer(inputCol="description", outputCol="description_tokens")
df = tokenizer.transform(df)

# Remove stopwords
remover = StopWordsRemover(inputCol="description_tokens", outputCol="filtered_tokens")
df = remover.transform(df)

# Collect distinct words from the dataset
distinct_words = df.selectExpr("explode(filtered_tokens) as word").select("word").distinct().collect()
distinct_words = [row.word for row in distinct_words]

# Tokenize and remove stop words using TF-IDF
hashingTF = HashingTF(inputCol="filtered_tokens", outputCol="raw_features", numFeatures=10)
df = hashingTF.transform(df)

idf = IDF(inputCol="raw_features", outputCol="features")
idf_model = idf.fit(df)
df = idf_model.transform(df)

# Create a mapping between index and word
index_to_word = {idx: distinct_words[idx] for idx in range(len(distinct_words))}

# Sum the TF-IDF scores for each word across all documents
sum_tfidf = idf_model.idf.toArray()

# Get the indices of the top 10 words
top_word_indices = sorted(range(len(sum_tfidf)), key=lambda i: sum_tfidf[i], reverse=True)[:10]

# Get the actual words using the mapping
top_words = [index_to_word[idx] for idx in top_word_indices]

# Display the result
print("Top 10 words based on TF-IDF:")
print(top_words)


# Task 2

If I have csv file which is the table contain column video_id, views, categoryId, catergory_name, title, trending_date, view_count, description. The video_id is not unique as the data may update when the date change. I want the data before 01, November 2023 for calculation. that means, if the video_id is repeated, used the latest date before 01, November 2023 in column "trending_date".


Group dataset a by view_count in quartile, find and compare top 30 frequent words in description

please use pyspark and tfidf.

## Sample in pandas

In [None]:

# Load your CSV file into a DataFrame
root_path = "/content/drive/MyDrive/NUS/CS5344/"
for country in ["CA", "GB", "US"]:
    data_path = root_path + country + "_youtube_trending_data_processed.csv"
    vars()[country] = pd.read_csv(data_path, lineterminator='\n')

df = pd.concat([CA, GB, US], axis=0)

# Handle missing values in the 'description' column
df['description'] = df['description'].fillna('')

# Filter data before 01, November 2023
df['trending_date'] = pd.to_datetime(df['trending_date'])
df = df[df['trending_date'] < '2023-11-01']

# Get the latest date for each video_id
df = df.sort_values('trending_date', ascending=False).drop_duplicates('video_id')

# Group by view_count quartiles
df['view_count_quartile'] = pd.qcut(df['view_count'], q=4, labels=False)

# Tokenize and remove stopwords
stop_words = set(stopwords.words('english'))

def tokenize_and_remove_stopwords(text):
    tokens = word_tokenize(text)
    filtered_tokens = [word.lower() for word in tokens if word.isalpha() and word.lower() not in stop_words and not any(stop_word in word.lower() for stop_word in ["www", "http", "https",'video', 'youtube', 'facebook', 'new', 'follow', 'like', 'watch', 'subscribe', 'channel'])]
    return filtered_tokens


df['filtered_words'] = df['description'].apply(tokenize_and_remove_stopwords)

# Calculate TF-IDF for each quartile
for quartile in range(4):
    quartile_df = df[df['view_count_quartile'] == quartile]

    corpus = quartile_df['description'].tolist()
    vectorizer = TfidfVectorizer(stop_words='english', tokenizer=tokenize_and_remove_stopwords)
    tfidf_matrix = vectorizer.fit_transform(corpus)

    # Get feature names and sort by IDF values
    feature_names = vectorizer.get_feature_names_out()
    idf_values = vectorizer.idf_
    sorted_features = [feature for _, feature in sorted(zip(idf_values, feature_names))]

    # Display the top 30 frequent words for each quartile
    top_30_words = sorted_features[:10]
    print(f"Top 10 frequent words for Quartile {quartile + 1}: {top_30_words}")



## Dask

In [None]:
import dask.dataframe as dd
from dask import delayed
from dask.diagnostics import ProgressBar
from nltk.corpus import stopwords
import pandas as pd
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
import csv  # Add this import statement
import numpy as np

# Load your CSV file into a Dask DataFrame
root_path = "/content/drive/MyDrive/NUS/CS5344/"
dfs = []
for country in ["CA", "GB", "US"]:
    data_path = root_path + country + "_youtube_trending_data_processed.csv"
    df = dd.from_pandas(pd.read_csv(data_path, lineterminator='\n'), npartitions=2)
    dfs.append(df)

ddf = dd.concat(dfs)

# Handle missing values in the 'description' column
ddf['description'] = ddf['description'].fillna('')

# Filter data before 01, November 2023
ddf['trending_date'] = dd.to_datetime(ddf['trending_date'])
ddf = ddf[ddf['trending_date'] < '2023-11-01']

# Get the latest date for each video_id
ddf = ddf.sort_values('trending_date', ascending=False).drop_duplicates('video_id')

# Define quartile boundaries
quartile_boundaries = ddf['view_count'].quantile([0, 0.25, 0.5, 0.75, 1]).compute()

# Create a new column 'view_count_quartile' based on quartiles
ddf['view_count_quartile'] = dd.from_array(ddf['view_count'].map_partitions(np.digitize, bins=quartile_boundaries, right=True, meta=('x', 'i4')))

# Tokenize and remove stopwords
stop_words = set(stopwords.words('english'))

@delayed
def tokenize_and_remove_stopwords(text):
    tokens = word_tokenize(text)
    filtered_tokens = [word.lower() for word in tokens if word.isalpha() and word.lower() not in stop_words and not any(stop_word in word.lower() for stop_word in ["www", "http", "https", 'video', 'youtube', 'facebook', 'new', 'follow', 'like', 'watch', 'subscribe', 'channel'])]
    return filtered_tokens

# Apply the tokenize_and_remove_stopwords function using map
ddf['filtered_words'] = ddf['description'].map(tokenize_and_remove_stopwords, meta=('x', 'object'))

# Calculate TF-IDF for each quartile
for quartile in range(4):
    quartile_df = ddf[ddf['view_count_quartile'] == quartile]

    corpus = quartile_df['description'].compute().tolist()
    vectorizer = TfidfVectorizer(stop_words='english', tokenizer=tokenize_and_remove_stopwords)
    tfidf_matrix = vectorizer.fit_transform(corpus)

    # Get feature names and sort by IDF values
    feature_names = vectorizer.get_feature_names_out()
    idf_values = vectorizer.idf_
    sorted_features = [feature for _, feature in sorted(zip(idf_values, feature_names))]

    # Display the top 10 frequent words for each quartile
    top_30_words = sorted_features[:10]
    print(f"Top 10 frequent words for Quartile {quartile + 1}: {top_30_words}")


## Pyspark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
from pyspark.sql.functions import col, row_number, udf
from pyspark.sql.types import StringType, ArrayType
from pyspark.ml.feature import Tokenizer, StopWordsRemover, IDF
from pyspark.ml.feature import VectorAssembler
from pyspark.ml import Pipeline
from pyspark.sql.functions import ntile

# Initialize a Spark session
spark = SparkSession.builder.appName("YouTubeTrendingAnalysis").getOrCreate()

# Load CSV files into PySpark DataFrames
root_path = "/content/drive/MyDrive/NUS/CS5344/"
countries = ["CA", "GB", "US"]

dfs = []

for country in countries:
    data_path = root_path + country + "_youtube_trending_data_processed.csv"
    df_country = spark.read.csv(data_path, header=True, inferSchema=True)
    dfs.append(df_country)

# Concatenate PySpark DataFrames into a single DataFrame
df = dfs[0]
for df_country in dfs[1:]:
    df = df.union(df_country)

# Handle missing values in the 'description' column
df = df.na.fill({'description': ''})

# Convert the 'trending_date' column to timestamp
df = df.withColumn('trending_date', col('trending_date').cast('timestamp'))

# Filter data before 01, November 2023
df = df.filter(col('trending_date') < '2023-11-01')

# Get the latest date for each video_id
window_spec = Window.partitionBy('video_id').orderBy(col('trending_date').desc())
df = df.withColumn('rank', row_number().over(window_spec)).filter(col('rank') == 1).drop('rank')

# Group by view_count quartiles
df = df.withColumn('view_count_quartile', ntile(4).over(Window.orderBy('view_count')))

# Tokenize and remove stopwords
tokenizer = Tokenizer(inputCol='description', outputCol='words')
remover = StopWordsRemover(inputCol='words', outputCol='filtered_words')

# Convert array of strings to a single string
concatenator = udf(lambda x: ' '.join(x), StringType())
df = df.withColumn('filtered_words_concatenated', concatenator('filtered_words'))

# Assemble features for TF-IDF
assembler = VectorAssembler(inputCols=['filtered_words_concatenated'], outputCol='features')

# Calculate TF-IDF for each quartile
for quartile in range(4):
    quartile_df = df.filter(col('view_count_quartile') == quartile)

    # Create a pipeline for tokenization, stopwords removal, and TF-IDF
    pipeline = Pipeline(stages=[tokenizer, remover, assembler, IDF(outputCol='tfidf_features')])
    model = pipeline.fit(quartile_df)
    quartile_df = model.transform(quartile_df)

    # Display the top 10 frequent words for each quartile
    top_10_words = quartile_df.select('tfidf_features').rdd.flatMap(lambda x: x['tfidf_features']).\
        map(lambda word: (word, top_words.count(word))).sortBy(lambda x: x[1], ascending=False).take(10)

    print(f"Top 10 frequent words for Quartile {quartile + 1}: {top_10_words}")

