In [5]:
import os
import json
from collections import defaultdict

In [2]:
try:
    poland_videos_files = sorted(os.listdir("../data/videos/last/poland"))
    print(f"Poland: {poland_videos_files}") 
    us_videos_files = sorted(os.listdir("../data/videos/last/united-states"))
    print(f"United States: {us_videos_files}")
except FileNotFoundError:
    print("File not found")

Poland: ['1 - 50.json', '101 - 150.json', '151 - 200.json', '201 - 250.json', '251 - 300.json', '301 - 350.json', '351 - 400.json', '51 - 100.json', 'progress.json']
United States: []


In [35]:
pl_all = []
us_all = []

for file in poland_videos_files:
    if file != "progress.json":
        with open(f"../data/videos/last/poland/{file}", "r") as f:
            videos = json.load(f)
            pl_all.extend(videos)
        
for file in us_videos_files:
    if file != "progress.json":
        with open(f"../data/videos/last/united-states/{file}", "r") as f:
            videos = json.load(f)
            us_all.extend(videos)

In [41]:
# sanity check
assert len(pl_all) == 1000
assert len(us_all) == 1000

unique_pl = defaultdict(int)
unique_us = defaultdict(int)

for channel in pl_all:
    unique_pl[channel["channel_id"]] += 1

for channel in us_all:
    unique_us[channel["channel_id"]] += 1
    
assert len(unique_pl) == 1000
assert len(unique_us) == 1000

AssertionError: 

In [21]:
yt_categories = {
    "1": "Film & Animation",
    "2": "Autos & Vehicles",
    "10": "Music",
    "15": "Pets & Animals",
    "17": "Sports",
    "18": "Short Movies",
    "19": "Travel & Events",
    "20": "Gaming",
    "21": "Videoblogging",
    "22": "People & Blogs",
    "23": "Comedy",
    "24": "Entertainment",
    "25": "News & Politics",
    "26": "Howto & Style",
    "27": "Education",
    "28": "Science & Technology",
    "29": "Nonprofits & Activism",
    "30": "Movies",
    "31": "Anime/Animation",
    "32": "Action/Adventure",
    "33": "Classics",
    "34": "Comedy",
    "35": "Documentary",
    "36": "Drama",
    "37": "Family",
    "38": "Foreign",
    "39": "Horror",
    "40": "Sci-Fi/Fantasy",
    "41": "Thriller",
    "42": "Shorts",
    "43": "Shows",
    "44": "Trailers"
}

In [38]:
for channel_idx in range(len(pl_all)):
    try:
        video_categories_count = defaultdict(int)
        for video in pl_all[channel_idx]["videos"]:
            video_categories_count[yt_categories[video["category"]]] += 1
        pl_all[channel_idx]["channel"]["category"] = max(video_categories_count, key=video_categories_count.get)
    except ValueError:
        print(f"Channel {pl_all[channel_idx]['channel']['name']} has no videos - setting category to Unknown")
        pl_all[channel_idx]["channel"]["category"] = "Unknown"

Channel TheNitroZyniak has no videos - setting category to Unknown
Channel MagdalenaMariaMonika has no videos - setting category to Unknown
Channel ZDROWE i SMACZNE przepisy. has no videos - setting category to Unknown
Channel skkf has no videos - setting category to Unknown


In [39]:
for channel_idx in range(len(us_all)):
    try:
        video_categories_count = defaultdict(int)
        for video in us_all[channel_idx]["videos"]:
            video_categories_count[yt_categories[video["category"]]] += 1
        us_all[channel_idx]["channel"]["category"] = max(video_categories_count, key=video_categories_count.get)
    except ValueError:
        print(f"Channel {us_all[channel_idx]['channel']['name']} has no videos - setting category to Unknown")
        us_all[channel_idx]["channel"]["category"] = "Unknown"

In [None]:
import re
import string
from nltk.corpus import stopwords
from nltk.tokenize import word_tokenize
import emoji
from num2words import num2words
from langdetect import detect
from transformers import pipeline

In [None]:
def clean_text(text): 
    text = re.sub(r'<.*?>', '', text)
    text = re.sub(r'https?://\S+|www\.\S+', '', text)
    text = re.sub(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Za-z]{2,}\b', '', text)
    return text

def text_lowercase(text: str):
    return text.lower()

def demojize(text: str):
    return emoji.demojize(text)

def convert_number(text: str) -> str:
    words = text.split()
    return ' '.join([num2words(word) if word.isdigit() else word for word in words])

def remove_punctuation(text: str) -> str:
    return text.translate(str.maketrans('', '', string.punctuation))

def remove_whitespace(text: str) -> str:
    return " ".join(text.split())

def remove_stopwords(text: str, language: str = 'english') -> str:
    stop_words = set(stopwords.words(language))
    words = word_tokenize(text)
    return ' '.join(word for word in words if word not in stop_words)

def translate_text(text: str, target_language: str = 'en') -> str:
    translator = pipeline("translation", model=f"Helsinki-NLP/opus-mt-{detect(text)}-{target_language}")
    return translator(text)[0]['translation_text']

In [None]:
def preprocess_text(text: str, target_language: str = 'en') -> str:
    text = clean_text(text)

    text = text_lowercase(text)

    text = demojize(text)

    text = convert_number(text)

    text = remove_punctuation(text)

    text = remove_whitespace(text)

    text = translate_text(text, target_language)

    try:
        lang = detect(text)
        text = remove_stopwords(text, language=lang)
    except Exception:
        pass

    return text