Skip to content

Commit

Permalink
plugin.tf1: implement authentication
Browse files Browse the repository at this point in the history
Co-Authored-By: bastimeyer <mail@bastimeyer.de>
  • Loading branch information
BellezaEmporium and bastimeyer committed May 17, 2024
1 parent 2156997 commit 7adb5b1
Showing 1 changed file with 111 additions and 8 deletions.
119 changes: 111 additions & 8 deletions src/streamlink/plugins/tf1.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,13 @@
$url lci.fr
$type live
$region France
$account Required on tf1.fr
"""

import logging
import re

from streamlink.plugin import Plugin, PluginError, pluginmatcher
from streamlink.plugin import Plugin, PluginError, pluginargument, pluginmatcher
from streamlink.plugin.api import useragents, validate
from streamlink.stream.hls import HLSStream

Expand All @@ -30,8 +31,90 @@
(?P<lci>tf1info|lci)\.fr/direct/?
)
""", re.VERBOSE))
@pluginargument(
"email",
requires=["password"],
metavar="EMAIL",
help="The email address used to register with tf1.fr.",
)
@pluginargument(
"password",
sensitive=True,
metavar="PASSWORD",
help="A tf1.fr account password to use with --tf1-username.",
)
@pluginargument(
"purge-credentials",
action="store_true",
help="Purge cached tf1.fr credentials to initiate a new session and reauthenticate.",
)
class TF1(Plugin):
_URL_API = "https://mediainfo.tf1.fr/mediainfocombo/{channel_id}"
_URL_LOGIN = "https://compte.tf1.fr/accounts.login"
_URL_TOKEN = "https://www.tf1.fr/token/gigya/web"
_API_KEY = "3_hWgJdARhz_7l1oOp3a8BDLoR9cuWZpUaKG4aqF7gum9_iK3uTZ2VlDBl8ANf8FVk"
# Gigya GDPR consent IDs
_CONSENT_IDS = ["4", "10001", "10003", "10005", "10007", "10009", "10011", "10013", "10015", "10017", "10019"]

_CACHE_KEY_USER_TOKEN = "token"

def _login(self, login_id, password):
status, *login_data = self.session.http.post(
url=self._URL_LOGIN,
data={
"loginID": login_id,
"password": password,
"APIKey": self._API_KEY,
"includeUserInfo": "true",
},
schema=validate.Schema(
validate.parse_json(),
validate.any(
validate.all(
{
"errorCode": 0,
"UID": str,
"UIDSignature": str,
"signatureTimestamp": str,
},
validate.union_get("UID", "UIDSignature", "signatureTimestamp"),
validate.transform(lambda data: ("success", *data)),
),
validate.all(
{
"errorCode": int,
"errorDetails": str,
},
validate.union_get("errorCode", "errorDetails"),
validate.transform(lambda data: ("failure", *data)),
),
),
),
)

if status != "success":
error_code, error_details = login_data
raise PluginError(f"{error_code=} - {error_details or 'Unknown error'}")

uid, uid_signature, signature_timestamp = login_data
log.debug(f"{uid=} {uid_signature=} {signature_timestamp=}")

return self.session.http.post(
url=self._URL_TOKEN,
json={
"uid": uid,
"signature": uid_signature,
"timestamp": int(signature_timestamp),
"consent_ids": self._CONSENT_IDS,
},
schema=validate.Schema(
validate.parse_json(),
{
"token": str,
},
validate.get("token"),
),
)

def _get_channel(self):
if self.match["live"]:
Expand All @@ -48,17 +131,21 @@ def _get_channel(self):

return channel, channel_id

def _api_call(self, channel_id):
def _api_call(self, channel_id, user_token):
headers = {
# forces HLS streams
"User-Agent": useragents.IPHONE,
}
if user_token:
headers["Authorization"] = f"Bearer {user_token}"

return self.session.http.get(
self._URL_API.format(channel_id=channel_id),
params={
"context": "MYTF1",
"pver": "4001000",
},
headers={
# forces HLS streams
"User-Agent": useragents.IPHONE,
"pver": "5015000",
},
headers=headers,
schema=validate.Schema(
validate.parse_json(),
{
Expand Down Expand Up @@ -88,7 +175,23 @@ def _get_streams(self):
channel, channel_id = self._get_channel()
log.debug(f"Found channel {channel} ({channel_id})")

code, data = self._api_call(channel_id)
if self.get_option("purge-credentials"):
log.info("Removing cached user-authentication token...")
self.cache.set(self._CACHE_KEY_USER_TOKEN, None, 0)
user_token = None
else:
user_token = self.cache.get(self._CACHE_KEY_USER_TOKEN)

if (
not user_token
and (login_email := self.get_option("email"))
and (login_password := self.get_option("password"))
):
log.info("Acquiring new user-authentication token...")
user_token = self._login(login_email, login_password)
self.cache.set(self._CACHE_KEY_USER_TOKEN, user_token)

code, data = self._api_call(channel_id, user_token)
if code != 200:
log.error(data)
return
Expand Down

0 comments on commit 7adb5b1

Please sign in to comment.