-
Notifications
You must be signed in to change notification settings - Fork 20
/
execute.py
181 lines (156 loc) · 8.65 KB
/
execute.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
import sys
import time
import contextlib
import selenium
from selenium import webdriver
from . import program
from .download.selenium_webdriver_dependencies import download_all
from .download.windows_info import get_drive_letter
from .download.user_os_info import determine_user_os
from .notifications import Common, ModuleMessage, ScriptMessage
from .custom_logger import log
def logic(channel, channel_type, file_name, log_silently, txt, csv, markdown, reverse_chronological, headless, scroll_pause_time, user_driver, execution_type):
common_message = Common()
module_message = ModuleMessage()
script_message = ScriptMessage()
def check_user_input():
nonlocal channel, channel_type, user_driver
base_url = 'https://www.youtube.com'
videos = 'videos'
url = f'{base_url}/{channel_type}/{channel}/{videos}'
if txt is False and csv is False:
print(common_message.not_writing_to_any_files)
print(module_message.not_writing_to_any_files_hint) if execution_type == 'module' else print(script_message.not_writing_to_any_files_hint)
sys.exit() # the files already exist and the user doesn't want to overwrite either of them
if user_driver is None:
print(module_message.running_default_driver) if execution_type == 'module' else print(script_message.running_default_driver)
print(module_message.show_driver_options) if execution_type == 'module' else print(script_message.show_driver_options)
user_driver = 'firefox'
seleniumdriver = check_driver()
if seleniumdriver == 'invalid':
sys.exit()
return url, seleniumdriver
def check_driver():
if 'firefox' in user_driver: return webdriver.Firefox
elif 'opera' in user_driver: return webdriver.Opera
elif 'chrome' in user_driver: return webdriver.Chrome
elif 'brave' in user_driver: return configure_brave_driver
elif 'edge' in user_driver: return configure_edge_driver
elif 'safari' in user_driver:
if user_os != 'macos':
common_message.display_dependency_setup_instructions('safari', user_os)
sys.exit()
return webdriver.Safari
else:
print(common_message.invalid_driver)
return 'invalid'
def configure_brave_driver():
options = webdriver.ChromeOptions()
if user_os == 'windows':
drive = get_drive_letter()
options.binary_location = rf'{drive}:\Program Files (x86)\BraveSoftware\Brave-Browser\Application\brave.exe'
executable_path = rf'{drive}:\Windows\bravedriver.exe'
else:
options.binary_location = '/Applications/Brave Browser.app/Contents/MacOS/Brave Browser'
executable_path = '/usr/local/bin/bravedriver'
# options.headless = True
return webdriver.Chrome(options=options, executable_path=executable_path)
def configure_edge_driver():
# options = selenium.webdriver.remote.webdriver.WebDriver()
if user_os == 'windows':
drive = get_drive_letter()
# options.binary_location = rf'{drive}:\Program Files (x86)\Microsoft\Edge\Application\msedge.exe'
executable_path = rf'{drive}:\Windows\msedgedriver.exe'
else:
# options.binary_location = '/Applications/Microsoft Edge.app/Contents/MacOS/Microsoft Edge'
executable_path = '/usr/local/bin/msedgedriver'
print(common_message.unsupported_edge)
print(module_message.show_driver_options)
sys.exit()
# options.headless = True
return webdriver.Edge(executable_path=executable_path)
def open_user_driver():
if headless is False:
return seleniumdriver()
else: # headless is True
if user_driver == 'firefox': return set_up_headless_firefox_driver()
elif user_driver == 'opera': return set_up_headless_opera_driver()
elif user_driver == 'safari': return set_up_headless_safari_driver()
elif user_driver == 'chrome': return set_up_headless_chrome_driver()
elif user_driver == 'brave': return set_up_headless_brave_driver()
elif user_driver == 'edge': return set_up_headless_edge_driver()
def set_up_headless_firefox_driver():
options = selenium.webdriver.firefox.options.Options()
options.headless = True
return seleniumdriver(options=options)
def set_up_headless_opera_driver():
# Opera driver MRO: WebDriver -> OperaDriver -> selenium.webdriver.chrome.webdriver.WebDriver -> selenium.webdriver.remote.webdriver.WebDriver -> builtins.object
# options = selenium.webdriver.chrome.options.Options()
# options.headless = True
options = webdriver.ChromeOptions()
options.add_argument('headless')
driver = seleniumdriver(options=options)
print(common_message.unsupported_opera_headless)
return driver
def set_up_headless_safari_driver():
print(common_message.unsupported_safari_headless)
return seleniumdriver()
def set_up_headless_chrome_driver():
# options = selenium.webdriver.chrome.options.Options()
options = webdriver.ChromeOptions()
options.add_argument('headless')
return seleniumdriver(chrome_options=options)
def set_up_headless_brave_driver():
print(common_message.unsupported_brave_headless)
return configure_brave_driver()
def set_up_headless_edge_driver():
print(common_message.unsupported_edge_headless)
return configure_edge_driver()
def determine_file_name():
if file_name is not None:
return file_name.strip('.csv').strip('.txt').strip('.md')
else:
channel_name = driver.find_element_by_xpath("//yt-formatted-string[@class='style-scope ytd-channel-name']").text.replace(' ', '')
suffix = 'reverse_chronological_videos_list' if reverse_chronological else 'chronological_videos_list'
return f'{channel_name}_{suffix}'
def show_user_how_to_set_up_selenium():
if user_driver != 'safari':
common_message.tell_user_to_download_driver(user_driver)
common_message.display_dependency_setup_instructions(user_driver, user_os)
@contextlib.contextmanager
def yield_logger(file_name):
log_file = f'{file_name}.log'
with open (log_file, 'a', encoding='utf-8') as output_location:
if log_silently is True: yield (output_location,)
else: yield (output_location, sys.stdout)
user_os = determine_user_os()
url, seleniumdriver = check_user_input()
program_start = time.perf_counter()
try:
driver = open_user_driver()
except selenium.common.exceptions.WebDriverException as error_message:
# selenium.common.exceptions.WebDriverException: Message: 'BROWSERdriver' executable needs to be in PATH. Please see https://................
# for some reason this also catches selenium.common.exceptions.SessionNotCreatedException: Message: session not created: This version of BROWSERDriver only supports BROWSER version ##
common_message.display_selenium_dependency_error(error_message)
try:
download_all()
driver = open_user_driver()
except selenium.common.exceptions.WebDriverException as error_message: # could not download the correct Selenium driver based on the user's OS and specified driver
common_message.display_selenium_dependency_update_error(error_message)
show_user_how_to_set_up_selenium()
common_message.display_unable_to_update_driver_automatically(user_driver)
return
with driver:
driver.get(url)
driver.set_window_size(780, 800)
driver.set_window_position(0, 0)
file_name = determine_file_name()
with yield_logger(file_name) as logging_locations:
log( '>' * 50 + 'STARTING PROGRAM' + '<' * 50, logging_locations)
log(f'Now scraping {url} using the {user_driver}driver:', logging_locations)
program.determine_action(url, driver, scroll_pause_time, reverse_chronological, file_name, txt, csv, markdown, logging_locations)
program_end = time.perf_counter()
total_time = program_end - program_start
log(f'This program took {total_time} seconds to complete.', logging_locations)
log( '>' * 50 + 'PROGRAM COMPLETE' + '<' * 50, logging_locations)
return file_name