In [None]:
!pip install googletrans==4.0.0-rc1

In [None]:
from googletrans import Translator

In [None]:
import asyncio
import concurrent.futures
import csv
import dataset
import gzip
import re
import requests
from bs4 import BeautifulSoup
import konlpy.tag
import nest_asyncio

nest_asyncio.apply()

melon_headers = {'User-Agent': 'Student'}

def get_melon_lyrics(songid):
    url = f'https://www.melon.com/webplayer/getLyrics.json?songId={songid}'
    res = requests.get(url, headers=melon_headers)

    try:
        return '\n'.join([caption['text'] for caption in res.json()['lyrics']])
    except Exception:
        return None

def get_top_songs(year=2018, offset=1, verbose=False):
    url = f'https://www.melon.com/chart/age/list.htm?idx={offset}&chartType=YE&chartGenre=KPOP&chartDate={year}&moved=Y'
    res = requests.get(url, headers=melon_headers)
    soup = BeautifulSoup(res.text, 'html.parser')

    results = []
    i = (offset - 1) * 50

    for song in soup.find_all(class_='wrap_song_info'):
        i += 1

        if song.strong is None:
            songid = f'unknown{i:02d}'
            title = song.find(class_='ellipsis rank01').text.strip()
        else:
            songid = re.search(r"(\d+?)'\)", song.strong.a['href']).group(1)
            title = song.strong.a.text.strip()

        artist = song.find(class_='ellipsis rank02').a.text.strip()
        album = song.find(class_='ellipsis rank03').a.text.strip()

        results.append({
            'id': f'{year}_{songid}',
            'songid': songid,
            'rank': i,
            'year': year,
            'title': title,
            'artist': artist,
            'album': album,
            'lyrics': None,
        })

    if verbose:
        print('Songs for year', year, 'fetched.')

    return results

async def populate_lyrics(songs, verbose=False):
    tasks = {}

    with concurrent.futures.ThreadPoolExecutor(max_workers=10) as executor:
        loop = asyncio.get_event_loop()

        for song in [song for song in songs if song['lyrics'] is None]:
            tasks[song['songid']] = loop.run_in_executor(executor, get_melon_lyrics, song['songid'])

        await asyncio.gather(*tasks.values())

        if verbose:
            print('Lyrics fetched.')

        for songid, lyrics in tasks.items():
            for song in [song for song in songs if song['songid'] == songid]:
                song['lyrics'] = lyrics.result()

db = dataset.connect('sqlite:///lyrics.db')
table = db.get_table('lyrics', primary_id='id', primary_type=db.types.string(13))

async def populate_db():
    for year in range(2019, 2020):
        songs = get_top_songs(year=year, verbose=True)

        await populate_lyrics(songs, verbose=True)

        for song in songs:
            record = table.find_one(id=song['id'])
            if record is None:
                table.insert(song)

        print('Year', year, 'committed to database.')

def dump_data():
    csv_data = []
    cnt = len(table)

    # Create an instance of the Translator
    translator = Translator(service_urls=['translate.google.com'])

    for i, song in enumerate(table.all()):
        # Translate the title and artist from Korean to English
        title_translation = translator.translate(song['title'], src='ko', dest='en')
        artist_translation = translator.translate(song['artist'], src='ko', dest='en')

        lyrics = song['lyrics']
        if lyrics is not None:
            # Translate the lyrics from Korean to English
            lyrics_translation = translator.translate(lyrics, src='ko', dest='en')
            translated_lyrics = lyrics_translation.text

            csv_data.append([
                song['songid'],
                artist_translation.text,
                title_translation.text,
                song['year'],
                song['rank'],
                translated_lyrics
            ])

        print(f"{i + 1} of {cnt} songs processed.")

    return csv_data


async def main():
    await populate_db()
    csv_data = dump_data()

    # Define the CSV file path
    csv_file_path = 'kpop.csv'

    # Write the data to a CSV file
    with open(csv_file_path, 'w', newline='', encoding='utf-8') as f:
        writer = csv.writer(f)
        writer.writerow(['Song ID', 'Artist', 'Title', 'Year', 'Rank', 'Lyrics'])
        writer.writerows(csv_data)

    print("Data has been saved to kpop.csv")

if __name__ == '__main__':
    asyncio.run(main())

In [None]:
!pip install konlpy