In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkFiles

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL wikipedia pageview") \
    .getOrCreate()

#pickup a random pageview gz for dev
url = "https://dumps.wikimedia.org/other/pageviews/2020/2020-10/pageviews-20201001-020000.gz"
blacklist_location_url = "https://s3.amazonaws.com/dd-interview-data/data_engineer/wikipedia/blacklist_domains_and_pages"

spark.sparkContext.addFile(url)
spark.sparkContext.addFile(blacklist_location_url)


In [2]:
'''
define schema according to https://wikitech.wikimedia.org/wiki/Analytics/Data_Lake/Traffic/Pageviews
#domain_code page_title count_views total_response_size
'''
from pyspark.sql.types import (
    LongType,
    StringType,
    StructType,
)
schema = StructType() \
      .add("domain_code",StringType(),True) \
      .add("page_title",StringType(),True) \
      .add("count_views",LongType(),True) \
      .add("total_response_size",LongType(),True)
blacklist_schema =  StructType() \
      .add("domain_code",StringType(),True) \
      .add("page_title",StringType(),True)

In [3]:
blacklist_df = spark.read.option("delimiter", " ").option("inferSchema", True).schema(blacklist_schema) \
    .csv("file://"+SparkFiles.get("blacklist_domains_and_pages"))
blacklist_df.show()

df = spark.read.option("delimiter", " ").option("inferSchema", True).schema(schema) \
    .csv("file://"+SparkFiles.get("pageviews-20201001-020000.gz"))
    
df.show()

+-----------+--------------------+
|domain_code|          page_title|
+-----------+--------------------+
|         ab|%D0%91%D0%BE%D1%8...|
|        ace|Beureukaih:Nuvola...|
|        ace|               Japan|
|        ace|Kusuih:Hubong_gis...|
|        ace|Kusuih:Neuubah_me...|
|        ace|Marit_Ureu%C3%ABn...|
|      ace.m|Kusuih:MobileLang...|
|         af|                 .sy|
|         af|                2009|
|         af|         Apostelskap|
|         af|                 DNS|
|         af|    Franse_Rewolusie|
|         af|            Galicies|
|         af|      Hector_Berlioz|
|         af|Kategorie:Trentin...|
|         af|L%C3%AAer:Allah-g...|
|         af|L%C3%AAer:Boerfam...|
|         af|L%C3%AAer:Flag_of...|
|         af|         Moeitevolle|
|         af|           Onderling|
+-----------+--------------------+
only showing top 20 rows

+-----------+--------------------+-----------+-------------------+
|domain_code|          page_title|count_views|total_response_size|


In [4]:
"""
let's use spark sql!!! https://spark.apache.org/sql/
https://spark.apache.org/docs/latest/sql-programming-guide.html
"""
df.createOrReplaceTempView("wiki_pageviews")
spark.sql("SELECT COUNT(DISTINCT(domain_code)) FROM wiki_pageviews").show()

+---------------------------+
|count(DISTINCT domain_code)|
+---------------------------+
|                       1505|
+---------------------------+



In [5]:
blacklist_df.createOrReplaceTempView("blacklists")
"""
SELECT wiki_pageviews.* FROM wiki_pageviews
LEFT JOIN blacklists b USING(domain_code,page_title)
WHERE b.domain_code IS NULL AND b.page_title is NULL
Depends on spark sql query plan, this might be more optimized, could check as alternative of NOT EXIST way 
"""
filter_query = """
SELECT *
FROM   wiki_pageviews w
WHERE NOT EXISTS (
   SELECT *
   FROM   blacklists
   WHERE  domain_code = w.domain_code and page_title = w.page_title
   );
"""
transformed_df = spark.sql(filter_query)
transformed_df.show()
print(transformed_df.count())
print(blacklist_df.count())
print(df.count())


+-----------+--------------------+-----------+-------------------+
|domain_code|          page_title|count_views|total_response_size|
+-----------+--------------------+-----------+-------------------+
|         aa|                   -|         11|                  0|
|         aa|           Main_Page|         13|                  0|
|         aa|           Wikipedia|          1|                  0|
|         aa|Wikipedia:Communi...|          3|                  0|
|         aa|   Wikipedia:Sandbox|          1|                  0|
|       aa.b|           Main_Page|          1|                  0|
|       aa.b|MediaWiki:Ipb_alr...|          1|                  0|
|       aa.b|Special:CreateAcc...|          1|                  0|
|       aa.b|   Special:UserLogin|          1|                  0|
|       aa.d|           Main_Page|          2|                  0|
|     aa.m.d|           Main_Page|          1|                  0|
|         ab|                   -|         14|                

In [6]:

transformed_df.createOrReplaceTempView("filtered_wiki_pageviews")
"""
for sql knowledge :) 
http://www.silota.com/docs/recipes/sql-top-n-group.html
#maybe use dense_rank or rank
"""
rank_query = """
select * from (
    select domain_code, 
           count_views,
           page_title, 
           row_number() over (partition by domain_code order by count_views desc) as domain_rank 
    from filtered_wiki_pageviews) ranks
where domain_rank <= 1;
"""
final_df = spark.sql(rank_query)
final_df.show()

spark.sql("SELECT COUNT(DISTINCT(domain_code)) FROM filtered_wiki_pageviews").show()


+--------------+-----------+--------------------+-----------+
|   domain_code|count_views|          page_title|domain_rank|
+--------------+-----------+--------------------+-----------+
|       cbk-zam|         11|   El_Primero_Pagina|          1|
|        co.m.d|          1|Discussioni_utent...|          1|
|      en.m.voy|         50|           Main_Page|          1|
|  foundation.m|        672|      Privacy_policy|          1|
|foundation.m.m|        318|      Privacy_policy|          1|
|          ro.b|          6|   Pagina_principală|          1|
|          zu.m|          4|           Amaphupho|          1|
|         als.m|          6| Wikipedia:Houptsyte|          1|
|         diq.m|          2|                2022|          1|
|      es.m.voy|         28|             Jalisco|          1|
|        ms.m.b|          2|Solat_cara_Nabi_M...|          1|
|        pt.m.q|         21|     Estatuto_do_PCC|          1|
|          sd.d|          5|            مُک_صفحو|          1|
|       

In [9]:
x = final_df.repartition(1) \
    .write \
    .option("header", True) \
    .option("inferSchema", True) \
    .option("delimiter", ",") \
    .mode("overwrite") \
    .csv('rank.csv')

In [8]:
print(x)

None
