In [0]:
!pip install googlesearch-python tqdm duckduckgo_search

In [0]:
!pip install openpyxl

In [0]:
import json

In [0]:
from duckduckgo_search import DDGS
from googlesearch import search
from tqdm.notebook import tqdm
from urllib.parse import unquote_plus
from pyspark.sql import SparkSession
import boto3
import io
import json
import pandas as pd
import time


In [0]:

query_tags = {
    "team": "latam_BI",
    "project": "snowflake_latam_test",
    "notebook": "snowflake_latam_test",
    "databricks_username": "rocio.ramos@disney.com",
    "references_metric_store": "FALSE",
}

connection_latam_query = {
    "sfUrl": "research_latam_prd.us-east-1.snowflakecomputing.com",
    "sfUser": "latam_disney_etl@disney.com",
    # "sfUser": "rocio.ramos@disney.com",
    "sfPassword": dbutils.secrets.get(scope = "latam_bi", key = "snowflake_latam_key"),
    "sfDatabase": "SNOWFLAKE_PRD",
    "query_tag": json.dumps(query_tags),
}

snowflake_source_name = "net.snowflake.spark.snowflake"

In [0]:
# Colocar query acá

sf_query1 = """
    
with check_1 as ( select digital.*,  IMDB.ORIGINALTITLE, 
  (TRY_PARSE_JSON(TO_VARCHAR(IMDB.EPISODEINFO))):episodeNumber::INT    AS episode_number,
  (TRY_PARSE_JSON(TO_VARCHAR(IMDB.EPISODEINFO))):seasonNumber::INT     AS season_number,
  (TRY_PARSE_JSON(TO_VARCHAR(IMDB.EPISODEINFO))):seriesTitleId::STRING AS series_title_id,
  REPLACE(REPLACE(REPLACE(IMDB.COUNTRIES,'[',''),'"',''),']','') AS COUNTRIES,
  IMDB.YEAR,
    IMDB.TITLEID, 
from 
SNOWFLAKE_PRD.STG_CI_DIGITAL_I.STREAMING_HISTORICAL as digital

left join  SNOWFLAKE_PRD.STG_RECOMMENDER_SYSTEM.IMDB_TITLE_ESSENTIAL as IMDB
on digital. IMDB_ID_STR =  IMDB.TITLEID
)

select * from check_1
where IMDB_ID_STR is null



;"""


sf_query2 = """select *   from  SNOWFLAKE_PRD.STG_IMDB_OPEN_DATASETS.TITLE_AKAS
;
"""


In [0]:
# Acá se ejecuta la query

sf_data1 = (
    spark.read.format(snowflake_source_name)
    .options(**connection_latam_query)
    .option("query", sf_query1)
    .load()
)

sf_data2 = (
    spark.read.format(snowflake_source_name)
    .options(**connection_latam_query)
    .option("query", sf_query2)
    .load()
)

In [0]:
display(sf_data1)

In [0]:
# Si necesitás trabajar en pandas tenés que convertir la tabla a un Dataframe
imdb_regional_df=sf_data2.toPandas()
sf_data = sf_data1.toPandas()

In [0]:
imdb_regional_df

In [0]:
sf_data

In [0]:
spark_df = spark.createDataFrame(sf_data)


In [0]:
sf_data=sf_data[['TITLE_NAME_STR','SEASON_INT','RELEASE_YEAR_INT','TITLE_TYPE_STR']].drop_duplicates()

In [0]:
sf_data.count()  #114725 con duplicados  #las series tienen varios años de release 9425

In [0]:
sf_data[sf_data['TITLE_NAME_STR']=='George Gently']

In [0]:
# en notebook
%pip uninstall -y duckduckgo-search
%pip install -U ddgs googlesearch-python


In [0]:

#SUMAR SEASON PARA LA BUSQUEDA


from ddgs import DDGS
from googlesearch import search  # opcional
import pandas as pd, re, time

TT_RE = re.compile(r"(tt\d{7,8})")

TYPE_SYNONYMS = {
    "SERIE": ["tv series", "series"],
    "TV SERIES": ["tv series", "series"],
    "MINISERIE": ["miniseries"],
    "EPISODIO": ["tv episode", "episode"],
    "PELICULA": ["feature film", "movie"],
    "FILM": ["feature film", "movie"],
    "DOCUMENTAL": ["documentary"],
}

def _norm(s): return str(s or "").strip().upper()

def _year_str(y):
    try: return "" if pd.isna(y) else str(int(float(y)))
    except: return ""

def _queries(s: pd.Series):
    title = str(s["TITLE_NAME_STR"]).strip()
    year  = _year_str(s.get("RELEASE_YEAR_INT",""))
    ttype = str(s.get("TITLE_TYPE_STR","")).strip()
    syns  = TYPE_SYNONYMS.get(_norm(ttype), [])
    tt_terms = [ttype] + syns if ttype else syns
    base = []

    # con/ sin tipo y año + site filter
    base.append(f'"{title}" {year} site:imdb.com/title')
    for t in tt_terms:
        base.append(f'"{title}" {year} {t} site:imdb.com/title')
        base.append(f'"{title}" {t} site:imdb.com/title')

    # estilo “prod”
    base.append(", ".join([x for x in [ttype, title, year] if x]) + ", IMDB")

    # dedup
    seen, out = set(), []
    for q in base:
        q = q.replace("  ", " ").strip()
        if q and q not in seen:
            seen.add(q); out.append(q)
    return out

def imdb_search_ddgs(s: pd.Series) -> str:
    time.sleep(0.8)
    try:
        with DDGS() as d:
            for q in _queries(s):
                res = list(d.text(q, max_results=10))
                for r in res:
                    url = r.get("href") or r.get("url") or r.get("link") or ""
                    m = TT_RE.search(url)
                    if m: return m.group(1)
    except: pass
    return "ERROR"

def imdb_search_google(s: pd.Series) -> str:
    time.sleep(0.8)
    try:
        for q in _queries(s)[:2]:
            for url in search(q, num_results=3):
                m = TT_RE.search(url)
                if m: return m.group(1)
    except: pass
    return "ERROR"


In [0]:

#TEST
def imdb_tt_probe(title, year, ttype, use_google=False):
    s = pd.Series({"TITLE_TYPE_STR": ttype, "TITLE_NAME_STR": title, "RELEASE_YEAR_INT": year})
    tt = imdb_search_ddgs(s)
    if tt == "ERROR" and use_google:
        tt = imdb_search_google(s)
    return tt

print(imdb_tt_probe("Avatar", 2009, "PELICULA"))         # -> tt0499549
print(imdb_tt_probe("The Last of Us", 2023, "SERIE"))    # -> tt3581920 (con DDGS)


In [0]:
sf_data = sf_data.copy()
sf_data

In [0]:
cols = ["TITLE_TYPE_STR","TITLE_NAME_STR","RELEASE_YEAR_INT"]
sf_data[cols] = sf_data[cols].fillna("")

if "IMDB_ID" not in sf_data.columns:
    sf_data["IMDB_ID"] = None

mask = sf_data["IMDB_ID"].isna()
sf_data.loc[mask, "IMDB_ID"] = sf_data.loc[mask, cols].apply(imdb_search_ddgs, axis=1)

mask = sf_data["IMDB_ID"].eq("ERROR")
if mask.any():
    sf_data.loc[mask, "IMDB_ID"] = sf_data.loc[mask, cols].apply(imdb_search_google, axis=1)

sf_data["IMDB_ID"] = sf_data["IMDB_ID"].replace({"ERROR": None})


In [0]:
sf_data

In [0]:
import pandas as pd

In [0]:
sf_data.to_excel("sf_data_imdbid.xlsx", engine='openpyxl')

In [0]:
df_again=pd.read_excel("sf_data_imdbid.xlsx")

In [0]:
df_again=df_again[df_again['IMDB_ID'].isnull()]

In [0]:
df_again1=df_again.copy()

In [0]:
df_again1

In [0]:

cols = ["TITLE_TYPE_STR","TITLE_NAME_STR","RELEASE_YEAR_INT"]
df_again1[cols] = df_again1[cols].fillna("")

if "IMDB_ID" not in df_again1.columns:
    df_again1["IMDB_ID"] = None

mask = df_again1["IMDB_ID"].isna()
df_again1.loc[mask, "IMDB_ID"] = df_again1.loc[mask, cols].apply(imdb_search_ddgs, axis=1)

mask = df_again1["IMDB_ID"].eq("ERROR")
if mask.any():
    df_again1.loc[mask, "IMDB_ID"] = df_again1.loc[mask, cols].apply(imdb_search_google, axis=1)

df_again1["IMDB_ID"] = df_again1["IMDB_ID"].replace({"ERROR": None})

# el que no encuentra despues hacerlo con imdb regional 

In [0]:
df_again1.to_excel("df_again1.xlsx", engine='openpyxl')

In [0]:
pd.read_excel("df_again1.xlsx")