Skip to content

Commit

Permalink
[py] Firefox Profile Fixes and Deprecations (#13477)
Browse files Browse the repository at this point in the history
* [py] do not change profile preferences of an existing directory by default

* [py] deprecate firefox profile methods that are not supported
  • Loading branch information
titusfortner committed Jan 21, 2024
1 parent 562c359 commit ac9d52f
Showing 1 changed file with 62 additions and 66 deletions.
128 changes: 62 additions & 66 deletions py/selenium/webdriver/firefox/firefox_profile.py
Expand Up @@ -28,18 +28,19 @@
from io import BytesIO
from xml.dom import minidom

from typing_extensions import deprecated

from selenium.common.exceptions import WebDriverException

WEBDRIVER_PREFERENCES = "webdriver_prefs.json"
EXTENSION_NAME = "fxdriver@googlecode.com"


@deprecated("Addons must be added after starting the session")
class AddonFormatError(Exception):
"""Exception for not well-formed add-on manifest files."""


class FirefoxProfile:
ANONYMOUS_PROFILE_NAME = "WEBDRIVER_ANONYMOUS_PROFILE"
DEFAULT_PREFERENCES = None

def __init__(self, profile_directory=None):
Expand All @@ -52,61 +53,60 @@ def __init__(self, profile_directory=None):
This defaults to None and will create a new
directory when object is created.
"""
if not FirefoxProfile.DEFAULT_PREFERENCES:
with open(
os.path.join(os.path.dirname(__file__), WEBDRIVER_PREFERENCES), encoding="utf-8"
) as default_prefs:
FirefoxProfile.DEFAULT_PREFERENCES = json.load(default_prefs)

self.default_preferences = copy.deepcopy(FirefoxProfile.DEFAULT_PREFERENCES["mutable"])
self.profile_dir = profile_directory
self.tempfolder = None
if not self.profile_dir:
self.profile_dir = self._create_tempfolder()
else:
self.tempfolder = tempfile.mkdtemp()
newprof = os.path.join(self.tempfolder, "webdriver-py-profilecopy")
self._desired_preferences = {}
if profile_directory:
newprof = os.path.join(tempfile.mkdtemp(), "webdriver-py-profilecopy")
shutil.copytree(
self.profile_dir, newprof, ignore=shutil.ignore_patterns("parent.lock", "lock", ".parentlock")
profile_directory, newprof, ignore=shutil.ignore_patterns("parent.lock", "lock", ".parentlock")
)
self.profile_dir = newprof
os.chmod(self.profile_dir, 0o755)
self._read_existing_userjs(os.path.join(self.profile_dir, "user.js"))
self.extensionsDir = os.path.join(self.profile_dir, "extensions")
self.userPrefs = os.path.join(self.profile_dir, "user.js")
if os.path.isfile(self.userPrefs):
os.chmod(self.userPrefs, 0o644)
self._profile_dir = newprof
os.chmod(self._profile_dir, 0o755)
else:
self._profile_dir = tempfile.mkdtemp()
if not FirefoxProfile.DEFAULT_PREFERENCES:
with open(
os.path.join(os.path.dirname(__file__), WEBDRIVER_PREFERENCES), encoding="utf-8"
) as default_prefs:
FirefoxProfile.DEFAULT_PREFERENCES = json.load(default_prefs)

self._desired_preferences = copy.deepcopy(FirefoxProfile.DEFAULT_PREFERENCES["mutable"])
for key, value in FirefoxProfile.DEFAULT_PREFERENCES["frozen"].items():
self._desired_preferences[key] = value

# Public Methods
def set_preference(self, key, value):
"""Sets the preference that we want in the profile."""
self.default_preferences[key] = value
self._desired_preferences[key] = value

def add_extension(self, extension):
@deprecated("Addons must be added after starting the session")
def add_extension(self, extension=None):
self._install_extension(extension)

def update_preferences(self):
for key, value in FirefoxProfile.DEFAULT_PREFERENCES["frozen"].items():
# Do not update key that is being set by the user using
# set_preference as users are unaware of the freeze properties
# and it leads to an inconsistent behavior
if key not in self.default_preferences:
self.default_preferences[key] = value
self._write_user_prefs(self.default_preferences)
"""Writes the desired user prefs to disk."""
user_prefs = os.path.join(self._profile_dir, "user.js")
if os.path.isfile(user_prefs):
os.chmod(user_prefs, 0o644)
self._read_existing_userjs(user_prefs)
with open(user_prefs, "w", encoding="utf-8") as f:
for key, value in self._desired_preferences.items():
f.write(f'user_pref("{key}", {json.dumps(value)});\n')

# Properties

@property
def path(self):
"""Gets the profile directory that is currently being used."""
return self.profile_dir
return self._profile_dir

@property
@deprecated("The port is stored in the Service class")
def port(self):
"""Gets the port that WebDriver is working on."""
return self._port

@port.setter
@deprecated("The port is stored in the Service class")
def port(self, port) -> None:
"""Sets the port that WebDriver will be running on."""
if not isinstance(port, int):
Expand All @@ -121,20 +121,24 @@ def port(self, port) -> None:
self.set_preference("webdriver_firefox_port", self._port)

@property
@deprecated("Allowing untrusted certs is toggled in the Options class")
def accept_untrusted_certs(self):
return self.default_preferences["webdriver_accept_untrusted_certs"]
return self._desired_preferences["webdriver_accept_untrusted_certs"]

@accept_untrusted_certs.setter
@deprecated("Allowing untrusted certs is toggled in the Options class")
def accept_untrusted_certs(self, value) -> None:
if not isinstance(value, bool):
raise WebDriverException("Please pass in a Boolean to this call")
self.set_preference("webdriver_accept_untrusted_certs", value)

@property
@deprecated("Allowing untrusted certs is toggled in the Options class")
def assume_untrusted_cert_issuer(self):
return self.default_preferences["webdriver_assume_untrusted_issuer"]
return self._desired_preferences["webdriver_assume_untrusted_issuer"]

@assume_untrusted_cert_issuer.setter
@deprecated("Allowing untrusted certs is toggled in the Options class")
def assume_untrusted_cert_issuer(self, value) -> None:
if not isinstance(value, bool):
raise WebDriverException("Please pass in a Boolean to this call")
Expand All @@ -143,9 +147,10 @@ def assume_untrusted_cert_issuer(self, value) -> None:

@property
def encoded(self) -> str:
"""A zipped, base64 encoded string of profile directory for use with
remote WebDriver JSON wire protocol."""
self.update_preferences()
"""Updates preferences and creates a zipped, base64 encoded string of
profile directory."""
if self._desired_preferences:
self.update_preferences()
fp = BytesIO()
with zipfile.ZipFile(fp, "w", zipfile.ZIP_DEFLATED) as zipped:
path_root = len(self.path) + 1 # account for trailing slash
Expand All @@ -155,32 +160,21 @@ def encoded(self) -> str:
zipped.write(filename, filename[path_root:])
return base64.b64encode(fp.getvalue()).decode("UTF-8")

def _create_tempfolder(self):
"""Creates a temp folder to store User.js and the extension."""
return tempfile.mkdtemp()

def _write_user_prefs(self, user_prefs):
"""Writes the current user prefs dictionary to disk."""
with open(self.userPrefs, "w", encoding="utf-8") as f:
for key, value in user_prefs.items():
f.write(f'user_pref("{key}", {json.dumps(value)});\n')

def _read_existing_userjs(self, userjs):
"""Reads existing preferences and adds them to desired preference
dictionary."""
pref_pattern = re.compile(r'user_pref\("(.*)",\s(.*)\)')
try:
with open(userjs, encoding="utf-8") as f:
for usr in f:
matches = pref_pattern.search(usr)
try:
self.default_preferences[matches.group(1)] = json.loads(matches.group(2))
except Exception:
warnings.warn(
f"(skipping) failed to json.loads existing preference: {matches.group(1) + matches.group(2)}"
)
except Exception:
# The profile given hasn't had any changes made, i.e no users.js
pass
with open(userjs, encoding="utf-8") as f:
for usr in f:
matches = pref_pattern.search(usr)
try:
self._desired_preferences[matches.group(1)] = json.loads(matches.group(2))
except Exception:
warnings.warn(
f"(skipping) failed to json.loads existing preference: {matches.group(1) + matches.group(2)}"
)

@deprecated("Addons must be added after starting the session")
def _install_extension(self, addon, unpack=True):
"""Installs addon from a filepath, url or directory of addons in the
profile.
Expand Down Expand Up @@ -212,11 +206,12 @@ def _install_extension(self, addon, unpack=True):
assert addon_id, f"The addon id could not be found: {addon}"

# copy the addon to the profile
addon_path = os.path.join(self.extensionsDir, addon_id)
extensions_dir = os.path.join(self._profile_dir, "extensions")
addon_path = os.path.join(extensions_dir, addon_id)
if not unpack and not addon_details["unpack"] and xpifile:
if not os.path.exists(self.extensionsDir):
os.makedirs(self.extensionsDir)
os.chmod(self.extensionsDir, 0o755)
if not os.path.exists(extensions_dir):
os.makedirs(extensions_dir)
os.chmod(extensions_dir, 0o755)
shutil.copy(xpifile, addon_path + ".xpi")
else:
if not os.path.exists(addon_path):
Expand All @@ -226,6 +221,7 @@ def _install_extension(self, addon, unpack=True):
if tmpdir:
shutil.rmtree(tmpdir)

@deprecated("Addons must be added after starting the session")
def _addon_details(self, addon_path):
"""Returns a dictionary of details about the addon.
Expand Down

0 comments on commit ac9d52f

Please sign in to comment.