In [0]:
%sh

pip install pytrends

Collecting pytrends
  Downloading pytrends-4.9.2-py3-none-any.whl (15 kB)
Collecting lxml
  Downloading lxml-4.9.3-cp39-cp39-manylinux_2_28_x86_64.whl (8.0 MB)
Installing collected packages: lxml, pytrends
Successfully installed lxml-4.9.3 pytrends-4.9.2
You should consider upgrading via the '/local_disk0/.ephemeral_nfs/envs/pythonEnv-ec15c21d-0a1c-4275-b8fc-a097128ebc3a/bin/python -m pip install --upgrade pip' command.


In [0]:
from pytrends.request import TrendReq
from pyspark.sql.functions import *
from pyspark.sql.window import *
from pyspark.sql.types import *

pytrends = TrendReq(hl='en-IN')

In [0]:
pytrends.trending_searches(pn='india').head(10)

Unnamed: 0,0
0,MLC
1,Man United vs Dortmund
2,Water
3,National Girlfriend Day
4,NTPC share price
5,Max Verstappen
6,Prime Minister Narendra Modi
7,Jasprit Bumrah
8,Vakkom Purushothaman
9,Lloyd Steel share price


In [0]:
%fs

rm -r dbfs:/user/hive/warehouse/trending_searches.db/

In [0]:
%sql

create database if not exists trending_searches;
show databases;
use trending_searches;

In [0]:
%sql

create table if not exists countries_trending_searches(search_term varchar(50), country string, rank int, data_as_of_date date);
desc countries_trending_searches;

col_name,data_type,comment
search_term,varchar(50),
country,string,
rank,int,
data_as_of_date,date,


In [0]:
%sql

create table if not exists top_trending(search_term varchar(75), trend_cntry_count int,avg_cntry_trend_rank double , ranking int, country varchar(25), data_as_of_date date);
desc top_trending;

col_name,data_type,comment
search_term,varchar(75),
trend_cntry_count,int,
avg_cntry_trend_rank,double,
ranking,int,
country,varchar(25),
data_as_of_date,date,


In [0]:
#Can't store this data in file or table as data is lost in Community Edition after clusture is terminated

dst_coutries = ["Argentina","Australia","Austria","Belgium","Brazil","Canada","Chile","Colombia","Denmark","Egypt","Finland","France","Germany","Greece","Hong_Kong","Hungary","India","Indonesia","Ireland","Israel","Italy","Japan","Kenya","Malaysia","Mexico","Netherlands","New Zealand","Nigeria","Norway","Philippines","Poland","Portugal","Romania","Russia","Saudi_Arabia","Singapore","South_Africa","South_Korea","Sweden","Switzerland","Taiwan","Thailand","Turkey","Ukraine","United_Kingdom","United_States","Vietnam"]

In [0]:
schema = StructType([
  StructField('search_term', StringType(), True),
  StructField('country', StringType(), True),
  StructField('rank', IntegerType(), True),
  StructField('data_as_of_date', DateType(), True)
])

comb_df1 = spark.createDataFrame([], schema)

In [0]:
#Seperate notebook to run jobs in parallel (Not supported in communtity edition) -
#https://databricks-prod-cloudfront.cloud.databricks.com/public/4027ec902e239c93eaaa8714f173bcfc/8881205802088401/2387488636523229/1939591732392236/latest.html

for i in dst_coutries:
    top_5_trend = pytrends.trending_searches(pn=i.lower()).head(10)
    spark_top_5 = spark.createDataFrame(top_5_trend)\
    .withColumnRenamed('0', 'search_term')\
    .withColumn("country", lit(i))\
    .withColumn("rank", (monotonically_increasing_id() + 1).cast('int'))\
    .withColumn("data_as_of_date", current_date())
    comb_df1 = comb_df1.unionAll(spark_top_5)
display(comb_df1)

search_term,country,rank,data_as_of_date
Manchester United,Argentina,1,2023-07-31
Caña con ruda,Argentina,2,2023-07-31
Paro docente,Argentina,3,2023-07-31
Dembele,Argentina,4,2023-07-31
Elecciones Villa Allende,Argentina,5,2023-07-31
Racing de córdoba vs Independiente,Argentina,6,2023-07-31
Huracan,Argentina,7,2023-07-31
Manchester City vs Atlético Madrid,Argentina,8,2023-07-31
Velez,Argentina,9,2023-07-31
Colon,Argentina,10,2023-07-31


In [0]:
comb_df1.write.mode('overwrite').insertInto('trending_searches.countries_trending_searches')

In [0]:
%sql
--New columns:
--Adding column with count of countries where term is trending
--Average of trending term rank in each country
--Ranking each term based on previous 2 columns

select search_term, trend_cntry_count, avg_cntry_trend_rank, dense_rank() over (order by trend_cntry_count desc, search_term desc) as ranking, country, data_as_of_date 
from (select search_term, 
count(country) over (partition by search_term) as trend_cntry_count, 
round(avg(rank) over (partition by search_term), 2) as avg_cntry_trend_rank, 
country, data_as_of_date
from trending_searches.countries_trending_searches order by trend_cntry_count desc)
where trend_cntry_count > 1

search_term,trend_cntry_count,avg_cntry_trend_rank,ranking,country,data_as_of_date
Man United vs Dortmund,13,3.46,1,South_Africa,2023-07-31
Man United vs Dortmund,13,3.46,1,Philippines,2023-07-31
Man United vs Dortmund,13,3.46,1,Kenya,2023-07-31
Man United vs Dortmund,13,3.46,1,Nigeria,2023-07-31
Man United vs Dortmund,13,3.46,1,United_Kingdom,2023-07-31
Man United vs Dortmund,13,3.46,1,Romania,2023-07-31
Man United vs Dortmund,13,3.46,1,New Zealand,2023-07-31
Man United vs Dortmund,13,3.46,1,Australia,2023-07-31
Man United vs Dortmund,13,3.46,1,Canada,2023-07-31
Man United vs Dortmund,13,3.46,1,Malaysia,2023-07-31


In [0]:
%sql

--Removing country col to focus on trending search

select distinct search_term, trend_cntry_count, avg_cntry_trend_rank, dense_rank() over (order by trend_cntry_count desc,search_term asc) as ranking, data_as_of_date 
from (select search_term, 
count(country) over (partition by search_term) as trend_cntry_count, 
round(avg(rank) over (partition by search_term), 2) as avg_cntry_trend_rank, 
country, data_as_of_date
from trending_searches.countries_trending_searches order by trend_cntry_count desc)
where trend_cntry_count > 1

search_term,trend_cntry_count,avg_cntry_trend_rank,ranking,data_as_of_date
Man United vs Dortmund,13,3.46,1,2023-07-31
Chelsea,11,3.0,2,2023-07-31
Dembele,8,4.13,3,2023-07-31
Canada vs Australia,5,5.0,4,2023-07-31
Manchester United,5,2.0,5,2023-07-31
Chelsea vs Fulham,4,5.75,6,2023-07-31
Japan vs Spain,4,6.5,7,2023-07-31
Liverpool vs Leicester City,4,8.0,8,2023-07-31
Niger,4,3.0,9,2023-07-31
UFC,4,9.5,10,2023-07-31


In [0]:
df_transf1 = comb_df1.withColumn('trend_cntry_count',count('country').over(Window.partitionBy('search_term'))).withColumn('avg_cntry_trend_rank',round(avg('rank').over(Window.partitionBy('search_term')),2)).withColumn('ranking', dense_rank().over(Window.orderBy(desc("trend_cntry_count"),desc("search_term")))).orderBy(desc('trend_cntry_count'))

#Rearranging columns & removing unnecessary column(rank as it's average data is stored in ranking)
df_transf2 = df_transf1.select('search_term','trend_cntry_count','avg_cntry_trend_rank','ranking','country','data_as_of_date')
display(df_transf2.where(col('trend_cntry_count') > 1))

search_term,trend_cntry_count,avg_cntry_trend_rank,ranking,country,data_as_of_date
Man United vs Dortmund,13,3.46,1,Australia,2023-07-31
Man United vs Dortmund,13,3.46,1,Canada,2023-07-31
Man United vs Dortmund,13,3.46,1,India,2023-07-31
Man United vs Dortmund,13,3.46,1,Ireland,2023-07-31
Man United vs Dortmund,13,3.46,1,Kenya,2023-07-31
Man United vs Dortmund,13,3.46,1,Malaysia,2023-07-31
Man United vs Dortmund,13,3.46,1,New Zealand,2023-07-31
Man United vs Dortmund,13,3.46,1,Nigeria,2023-07-31
Man United vs Dortmund,13,3.46,1,Philippines,2023-07-31
Man United vs Dortmund,13,3.46,1,Romania,2023-07-31


In [0]:
#Removing country col & using distinct to focus on trending search

display(df_transf2.select('search_term','trend_cntry_count','avg_cntry_trend_rank','ranking','data_as_of_date').distinct().where(col('trend_cntry_count') > 1))

search_term,trend_cntry_count,avg_cntry_trend_rank,ranking,data_as_of_date
Man United vs Dortmund,13,3.46,1,2023-07-31
Chelsea,11,3.0,2,2023-07-31
Dembele,8,4.13,3,2023-07-31
Manchester United,5,2.0,4,2023-07-31
Canada vs Australia,5,5.0,5,2023-07-31
UFC,4,9.5,6,2023-07-31
Niger,4,3.0,7,2023-07-31
Liverpool vs Leicester City,4,8.0,8,2023-07-31
Japan vs Spain,4,6.5,9,2023-07-31
Chelsea vs Fulham,4,5.75,10,2023-07-31


In [0]:
df_transf2.write.mode('append').insertInto('trending_searches.top_trending')