In [3]:
import youtube_dl
from youtube_dl.utils import DownloadError

ydl = youtube_dl.YoutubeDL()

In [4]:
def get_video_info(url):
    with ydl:
        try:
            result = ydl.extract_info(
                url,
                download=False
            )
        except DownloadError:
            return None

    if 'entries' in result:
        # Can be a playlist or a list of videos
        video = result['entries'][0]
    else:
        # Just a video
        video = result
    return video


def get_audio_url(video):
    for f in video['formats']:
        if f['ext'] == 'm4a':
            return f['url']

In [5]:
video_info = get_video_info("https://youtu.be/e-kSGNzu0hM")
video_info

[youtube] e-kSGNzu0hM: Downloading webpage


{'id': 'e-kSGNzu0hM',
 'title': 'iPhone 13 Review: Pros and Cons',
 'formats': [{'asr': 48000,
   'filesize': 2601209,
   'format_id': '249',
   'format_note': 'tiny',
   'fps': None,
   'height': None,
   'quality': 0,
   'tbr': 52.388,
   'url': 'https://rr1---sn-3u-20ns.googlevideo.com/videoplayback?expire=1673435235&ei=A0S-Y6O9Fo3a4wK45LTgDA&ip=221.161.221.63&id=o-ADWvtBF3uMSOPLH4wu7rBKZHettlstI3FLuLqbvC91oF&itag=249&source=youtube&requiressl=yes&mh=j-&mm=31%2C29&mn=sn-3u-20ns%2Csn-3u-bh2l6&ms=au%2Crdu&mv=m&mvi=1&pl=16&initcwndbps=861250&vprv=1&mime=audio%2Fwebm&ns=jGuskbmJkpHwFE4mMw1TxYkK&gir=yes&clen=2601209&dur=397.221&lmt=1663473823596248&mt=1673413275&fvip=2&keepalive=yes&fexp=24007246&c=WEB&txp=4532434&n=eecHUmaRAkdM9odt&sparams=expire%2Cei%2Cip%2Cid%2Citag%2Csource%2Crequiressl%2Cvprv%2Cmime%2Cns%2Cgir%2Cclen%2Cdur%2Clmt&sig=AOq0QJ8wRQIhAOaEXfnEdJCFXNyJ4aRzbVGC6wRy9mdCmwbwwSrDE1ymAiBs4kFzouKNpIF3Dqh01yEsSfybmZLJd1hOPat1Y37P3g%3D%3D&lsparams=mh%2Cmm%2Cmn%2Cms%2Cmv%2Cmvi%2Cpl%

In [6]:
url = get_audio_url(video_info)
print(url)

https://rr1---sn-3u-20ns.googlevideo.com/videoplayback?expire=1673435235&ei=A0S-Y6O9Fo3a4wK45LTgDA&ip=221.161.221.63&id=o-ADWvtBF3uMSOPLH4wu7rBKZHettlstI3FLuLqbvC91oF&itag=140&source=youtube&requiressl=yes&mh=j-&mm=31%2C29&mn=sn-3u-20ns%2Csn-3u-bh2l6&ms=au%2Crdu&mv=m&mvi=1&pl=16&initcwndbps=861250&vprv=1&mime=audio%2Fmp4&ns=jGuskbmJkpHwFE4mMw1TxYkK&gir=yes&clen=6429687&dur=397.246&lmt=1663473818362112&mt=1673413275&fvip=2&keepalive=yes&fexp=24007246&c=WEB&txp=4532434&n=eecHUmaRAkdM9odt&sparams=expire%2Cei%2Cip%2Cid%2Citag%2Csource%2Crequiressl%2Cvprv%2Cmime%2Cns%2Cgir%2Cclen%2Cdur%2Clmt&sig=AOq0QJ8wRQIgd32QAd3NUJ0OpYVuIsRcrrqgePrs8K5Lby6lb3AxguQCIQDH8Ojc_PIftjd6XyWPI4Z55L0NGhHckhc3p7S_6ruW3w%3D%3D&lsparams=mh%2Cmm%2Cmn%2Cms%2Cmv%2Cmvi%2Cpl%2Cinitcwndbps&lsig=AG3C_xAwRQIgfLC14Ots3kJYCJis2C1KSrIjpXvN8xPxfXaIEH-OgTUCIQCL0mv_IsJroDlTNStN0D2GLosITOw9o0XyYC0fA5DLFQ%3D%3D


In [7]:
API_KEY_ASSEMBLYAI = '367efaabe7294b3a847e9bad64350979'

In [8]:
upload_endpoint = 'https://api.assemblyai.com/v2/upload'
transcript_endpoint = 'https://api.assemblyai.com/v2/transcript'

headers_auth_only = {'authorization': API_KEY_ASSEMBLYAI}

headers = {
    "authorization": API_KEY_ASSEMBLYAI,
    "content-type": "application/json"
}

CHUNK_SIZE = 5_242_880  # 5MB

In [None]:
import requests
import json
import time

def upload(filename):
    def read_file(filename):
        with open(filename, 'rb') as f:
            while True:
                data = f.read(CHUNK_SIZE)
                if not data:
                    break
                yield data

    upload_response = requests.post(upload_endpoint, headers=headers_auth_only, data=read_file(filename))
    return upload_response.json()['upload_url']

In [None]:
def transcribe(audio_url, sentiment_analysis):
    transcript_request = {
        'audio_url': audio_url,
        'sentiment_analysis': sentiment_analysis
    }

    transcript_response = requests.post(transcript_endpoint, json=transcript_request, headers=headers)
    return transcript_response.json()['id']

In [None]:
def poll(transcript_id):
    polling_endpoint = transcript_endpoint + '/' + transcript_id
    polling_response = requests.get(polling_endpoint, headers=headers)
    return polling_response.json()

In [None]:
def get_transcription_result_url(url, sentiment_analysis):
    transcribe_id = transcribe(url, sentiment_analysis)
    while True:
        data = poll(transcribe_id)
        if data['status'] == 'completed':
            return data, None
        elif data['status'] == 'error':
            return data, data['error']
            
        print("waiting for 30 seconds")
        time.sleep(30)

In [11]:
def save_transcript(url, title, sentiment_analysis=False):
    data, error = get_transcription_result_url(url, sentiment_analysis)
    
    if data:
        filename = title + '.txt'
        with open(filename, 'w') as f:
            f.write(data['text'])
             
        if sentiment_analysis:   
            filename = title + '_sentiments.json'
            with open(filename, 'w') as f:
                sentiments = data['sentiment_analysis_results']
                json.dump(sentiments, f, indent=4)
        print('Transcript saved')
        return True
    elif error:
        print("Error!!!", error)
        return False

In [47]:
def save_video_sentiments(url):
    video_info = get_video_info(url)
    url = get_audio_url(video_info)
    if url:
        import re
        title = video_info['title']
        title = title.strip()                          # 앞뒤 빈칸 제거
        title = re.sub('[^a-zA-Z0-9_ ]', '', title)    # _ 를 제외한 모든 특수문자 제거
        title = re.sub('[ ]', '_', title)              # 띄어쓰기를 _로 대체
        save_transcript(url, title, sentiment_analysis=True)

In [48]:
save_video_sentiments("https://youtu.be/e-kSGNzu0hM")

[youtube] e-kSGNzu0hM: Downloading webpage
waiting for 30 seconds
waiting for 30 seconds
waiting for 30 seconds
waiting for 30 seconds
waiting for 30 seconds
waiting for 30 seconds
waiting for 30 seconds
waiting for 30 seconds
Transcript saved


In [50]:
with open("iPhone_13_Review_Pros_and_Cons_sentiments.json", "r") as f:
    data = json.load(f)
    
    positives = []
    negatives = []
    neutrals = []
    for result in data:
        text = result["text"]
        if result["sentiment"] == "POSITIVE":
            positives.append(text)
        elif result["sentiment"] == "NEGATIVE":
            negatives.append(text)
        else:
            neutrals.append(text)
        
    n_pos = len(positives)
    n_neg  = len(negatives)
    n_neut = len(neutrals)

    print("Num positives:", n_pos)
    print("Num negatives:", n_neg)
    print("Num neutrals:", n_neut)

    # ignore neutrals here
    r = n_pos / (n_pos + n_neg)
    print(f"Positive ratio: {r:.3f}")

Num positives: 38
Num negatives: 5
Num neutrals: 19
Positive ratio: 0.884
