In [1]:
from pyspark.sql import SparkSession

# Create a new SparkSession
spark = SparkSession\
    .builder\
    .config('spark.app.name', 'learning_spark_sql')\
    .getOrCreate()

# Read in Wikipedia Unique Visitors Dataset
wiki_uniq_df = spark.read.option('header', True).option('delimiter', ',').option('inferSchema', True).csv("wiki_uniq_march_2022_w_site_type.csv")

In [2]:
wiki_uniq_df.show(5, truncate=False)

+------------------+-------------------+-----------------+-------------------+-------------+---------+
|domain            |uniq_human_visitors|uniq_bot_visitors|total_visitor_count|language_code|site_type|
+------------------+-------------------+-----------------+-------------------+-------------+---------+
|en.m.wikipedia.org|33261399           |8400247          |41661646           |en           |wikipedia|
|en.wikipedia.org  |17009339           |4851741          |21861080           |en           |wikipedia|
|es.m.wikipedia.org|5668575            |1977289          |7645864            |es           |wikipedia|
|ru.m.wikipedia.org|5816762            |1367179          |7183941            |ru           |wikipedia|
|ja.m.wikipedia.org|5396108            |1325212          |6721320            |ja           |wikipedia|
+------------------+-------------------+-----------------+-------------------+-------------+---------+
only showing top 5 rows



In [3]:
wiki_uniq_df.printSchema()

root
 |-- domain: string (nullable = true)
 |-- uniq_human_visitors: integer (nullable = true)
 |-- uniq_bot_visitors: integer (nullable = true)
 |-- total_visitor_count: integer (nullable = true)
 |-- language_code: string (nullable = true)
 |-- site_type: string (nullable = true)



In [4]:
desc = wiki_uniq_df.describe()
desc.show(truncate = False)

+-------+----------------+-------------------+-----------------+-------------------+-------------+----------+
|summary|domain          |uniq_human_visitors|uniq_bot_visitors|total_visitor_count|language_code|site_type |
+-------+----------------+-------------------+-----------------+-------------------+-------------+----------+
|count  |760             |760                |760              |760                |760          |760       |
|mean   |null            |155413.0394736842  |51431.0552631579 |206844.09473684212 |null         |null      |
|stddev |null            |1435327.5409314982 |376318.441663093 |1809320.9789242456 |null         |null      |
|min    |aa.wikibooks.org|0                  |170              |1005               |aa           |wiki      |
|max    |zu.wikipedia.org|33261399           |8400247          |41661646           |zu           |wiktionary|
+-------+----------------+-------------------+-----------------+-------------------+-------------+----------+



In [5]:
#drop site_type column because they are all wikipedia
wiki_uniq_df = wiki_uniq_df.drop('site_type')

In [7]:
#change domain column name to domain_name
wiki_uniq_df = wiki_uniq_df.withColumnRenamed('domain','domain_name')

In [8]:
#see only english page
wiki_uniq_df.filter(wiki_uniq_df.language_code == "en").show(truncate=False)

+--------------------+-------------------+-----------------+-------------------+-------------+
|domain_name         |uniq_human_visitors|uniq_bot_visitors|total_visitor_count|language_code|
+--------------------+-------------------+-----------------+-------------------+-------------+
|en.m.wikipedia.org  |33261399           |8400247          |41661646           |en           |
|en.wikipedia.org    |17009339           |4851741          |21861080           |en           |
|en.m.wiktionary.org |146792             |190145           |336937             |en           |
|en.wiktionary.org   |123043             |184577           |307620             |en           |
|en.wikibooks.org    |11221              |102602           |113823             |en           |
|en.m.wikibooks.org  |7758               |18442            |26200              |en           |
|en.wikiquote.org    |3959               |21852            |25811              |en           |
|en.wikisource.org   |5790               |16002   

In [10]:
#temporary view of wiki_uniq_df so we can use SQL to perform analysis against it
wiki_uniq_df.createOrReplaceTempView('uniq_visitors_march')

In [26]:
#perform some query
query = """ SELECT domain_name, SUM(uniq_human_visitors) as total_human_visitor  
        FROM uniq_visitors_march
        WHERE language_code = 'en'
        GROUP BY 1
        ORDER BY 2 DESC
"""

wiki_new_df = spark.sql(query)

wiki_new_df.show(10, truncate=False)

+-------------------+-------------------+
|domain_name        |total_human_visitor|
+-------------------+-------------------+
|en.m.wikipedia.org |33261399           |
|en.wikipedia.org   |17009339           |
|en.m.wiktionary.org|146792             |
|en.wiktionary.org  |123043             |
|en.wikibooks.org   |11221              |
|en.m.wikisource.org|9908               |
|en.m.wikibooks.org |7758               |
|en.wikisource.org  |5790               |
|en.m.wikiquote.org |4841               |
|en.wikiquote.org   |3959               |
+-------------------+-------------------+
only showing top 10 rows



In [27]:
#do that but not sql
wiki_uniq_df.filter('language_code == "en"').select(['domain_name', 'uniq_human_visitors'])\
                        .groupBy('domain_name').sum()\
                        .orderBy('sum(uniq_human_visitors)', ascending = False)\
                        .show(10, truncate=False)

+-------------------+------------------------+
|domain_name        |sum(uniq_human_visitors)|
+-------------------+------------------------+
|en.m.wikipedia.org |33261399                |
|en.wikipedia.org   |17009339                |
|en.m.wiktionary.org|146792                  |
|en.wiktionary.org  |123043                  |
|en.wikibooks.org   |11221                   |
|en.m.wikisource.org|9908                    |
|en.m.wikibooks.org |7758                    |
|en.wikisource.org  |5790                    |
|en.m.wikiquote.org |4841                    |
|en.wikiquote.org   |3959                    |
+-------------------+------------------------+
only showing top 10 rows



In [28]:
#saving pyspark dataframe
wiki_new_df = wiki_new_df.write.parquet('./result/csv/uniq_human_visitors/', mode="overwrite")


In [29]:
wiki_new_restored = spark.read.parquet('./result/csv/uniq_human_visitors/')

In [30]:
wiki_new_restored.printSchema()

root
 |-- domain_name: string (nullable = true)
 |-- total_human_visitor: long (nullable = true)



In [31]:
spark.stop()