# spark-basics

Playing around with Spark via the PySpark interface.

## Contents
- [Imports](#imports)
- [Prepare Data](#pre-data)
- [RDD API](#rdd-api)
- [Dataframe API via Spark SQL](#df-api)

## Imports

In [1]:
import findspark
findspark.init()

import pyspark
from pyspark import SparkContext, SparkConf
from pyspark.sql import Row, SparkSession

## Prepare Data

Unzip the data at ˙``data/pagecounts-20160101-000000_parsed.out˙``.

Each line of the dataset, delimited by a white space and contains the statistics for one Wikimedia page. The schema looks as follows:
- Project code: The project identifier for each page.
- Page title: A string containing the title of the page.
- Page hits: Number of requests on the specific hour.
- Page size: Size of the page.

In [2]:
!unzip -o data/pagecounts.zip -d data/

Archive:  data/pagecounts.zip
  inflating: data/pagecounts-20160101-000000_parsed.out  


Start Spark and create a RDD fro the text file.

In [3]:
# Start Spark context
sc = SparkContext.getOrCreate(SparkConf().setMaster("local[*]"))
spark = SparkSession(sc)

#Create RDD from external Data source
rdd_dirty = sc.textFile("data/pagecounts-20160101-000000_parsed.out")
type(rdd_dirty)

21/09/23 15:07:54 WARN Utils: Your hostname, mark-machine resolves to a loopback address: 127.0.1.1; using 192.168.0.102 instead (on interface wlp8s0)
21/09/23 15:07:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
21/09/23 15:07:54 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


pyspark.rdd.RDD

## RDD API

Answering questions about the data with the RDD API.

In [4]:
def get_clean_rdd(rdd):
    """ Create a clean rdd. """
    # split each line into the columns seperated by the whitespace delimiter
    rdd_tmp = rdd.map(lambda x: x.split(" "))
    
    # return dataframe
    return rdd_tmp.map(lambda x: Row(code=str(x[0]), title=str(x[1]), hits=int(x[2]), size=int(x[3])))

In [5]:
# get dataframe from rdd
rdd = get_clean_rdd(rdd=rdd_dirty)
print(type(rdd))

<class 'pyspark.rdd.PipelinedRDD'>


In [6]:
def take_n(rdd, n):
    """ Take n from rdd. """
    return rdd.take(n)

In [7]:
# Retrieve the first 15 records and print out the result.
take_n(rdd, 15)

                                                                                

[Row(code='aa', title='271_a.C', hits=1, size=4675),
 Row(code='aa', title='Category:User_th', hits=1, size=4770),
 Row(code='aa', title='Chiron_Elias_Krase', hits=1, size=4694),
 Row(code='aa', title='Dassault_rafaele', hits=2, size=9372),
 Row(code='aa', title='E.Desv', hits=1, size=4662),
 Row(code='aa', title='File:Wiktionary-logo-en.png', hits=1, size=10752),
 Row(code='aa', title='Indonesian_Wikipedia', hits=1, size=4679),
 Row(code='aa', title='Main_Page', hits=5, size=266946),
 Row(code='aa', title='Requests_for_new_languages/Wikipedia_Banyumasan', hits=1, size=4733),
 Row(code='aa', title='Special:Contributions/203.144.160.245', hits=1, size=5812),
 Row(code='aa', title='Special:Contributions/5.232.61.79', hits=1, size=5805),
 Row(code='aa', title='Special:Contributions/Ayarportugal', hits=1, size=5808),
 Row(code='aa', title='Special:Contributions/Born2bgratis', hits=1, size=5812),
 Row(code='aa', title='Special:ListFiles/Betacommand', hits=1, size=5035),
 Row(code='aa', titl

In [8]:
def count_records(rdd):
    """ Count the records in the rdd. """
    return rdd.count()

In [9]:
# Determine the number of records the dataset has in total.
print(f"There are {count_records(rdd)} records in the dataframe.")



There are 3324129 records in the dataframe.


                                                                                

In [10]:
def summary_stats_col(rdd, col_name):
    """ Computes the min, max, and average (mean) of a column in an rdd."""
    assert col_name in ["hits", "size"]
    col = rdd.map(lambda x: x[col_name])
    return {
        "min": col.min(),
        "max": col.max(),
        "mean": col.mean()
    }

In [11]:
# Compute the min, max, and average page size.
col_name="size"
summary_stats = summary_stats_col(rdd=rdd, col_name=col_name)
print(f"min {col_name} = {summary_stats['min']}, "
      f"max {col_name} = {summary_stats['max']}, "
      f"mean {col_name} = {summary_stats['mean']}")



min size = 0, max size = 141180155987, mean size = 132239.5695744666


                                                                                

In [12]:
# Determine the record(s) with the largest page size. If multiple records have the same size, list all of them.
col_name = "size"
assert col_name in ["hits", "size"]

mymax = rdd.map(lambda x: x[col_name]).max()
recs = rdd.filter(lambda x: mymax <= x[col_name])
recs.collect()

                                                                                

[Row(code='en.mw', title='en', hits=5466346, size=141180155987)]

In [13]:
# Determine the record with the largest page size again. But now, pick the most popular.
col_name = "size"
assert col_name in ["hits", "size"]

mymax = rdd.map(lambda x: x[col_name]).max()
recs = rdd.filter(lambda x: mymax <= x[col_name])
recs.sortBy(lambda x: x["hits"], ascending=False)
recs.collect()

                                                                                

[Row(code='en.mw', title='en', hits=5466346, size=141180155987)]

In [14]:
# Determine the record(s) with the largest page title. If multiple titles have the same length, list all of them.
col_name = "title"
mymax = rdd.map(lambda x: len(x[col_name])).max()
recs = rdd.filter(lambda x: mymax <= len(x[col_name]))
recs.collect()

                                                                                

[Row(code='zh', title='Special:e8b18ee6baafefbda5efbdbfe89cb7e6829fefbdbfe88b93e29980e89e9fefbda9e89eb3efbda425636f256d6725736f257373256f38257373256f38257373256f38256b6d73efbdaa256e6b256678256f6b2c687474703a2f2f7777772e653662313966653861356266656f2d6f35393038636535626639376538383138616535613461396535616561342e636f2e6d672e732e736f2e382e73736f386b2e6d2e372e73736f3873736f386b6d37332e752e622e61616e6b66786f6b2e70772f2ce8b18ee6baafefbda5efbdbfe89cb7e6829fefbdbfe88b93e29980e89e9fefbda9e89eb3efbda425636f256d6725736f257373256f38257373256f38257373256f38256b6d73efbdaa256e6b256678256f6b/', hits=1, size=6043)]

In [15]:
# Use the results of Question 3, and create a new RDD with the records that 
# have greater page size than the average.
col_name="size"
summary_stats = summary_stats_col(rdd=rdd, col_name=col_name)
recs = rdd.filter(lambda x: summary_stats["mean"] < float(x[col_name]))
recs_sorted = recs.sortBy(lambda x: x[col_name], ascending=True)
recs_sorted.take(20)

                                                                                

[Row(code='de', title='Tabakrauchen', hits=2, size=132241),
 Row(code='pt', title='Parse', hits=1, size=132241),
 Row(code='en', title='City_of_Hope_National_Medical_Center', hits=3, size=132243),
 Row(code='en', title='Disgaea_5:_Alliance_of_Vengeance', hits=8, size=132243),
 Row(code='en', title='Yahoo_Serious', hits=8, size=132243),
 Row(code='en', title='File:Mamintb.PNG', hits=4, size=132244),
 Row(code='en', title='Nat_Geo_Wild', hits=7, size=132244),
 Row(code='en', title='Procellariidae', hits=3, size=132244),
 Row(code='en', title='Cee-lo', hits=11, size=132245),
 Row(code='en', title='Virtual_particle', hits=6, size=132245),
 Row(code='en', title='Jean_Marsh', hits=7, size=132246),
 Row(code='en', title='Varsity_Blues_(film)', hits=9, size=132247),
 Row(code='en', title='Farmington_Hills,_Michigan', hits=4, size=132248),
 Row(code='en', title='Rare-earth_magnet', hits=8, size=132248),
 Row(code='en', title='Congenital_insensitivity_to_pain', hits=6, size=132249),
 Row(code='e

In [16]:
# Compute the total number of pageviews for each project (as the schema shows, 
# the first field of each record contains the project code).
rdd.map(lambda x: (x["code"], x["hits"])).groupByKey().mapValues(sum).take(10)

                                                                                

[('aa', 41),
 ('ab.mw', 6),
 ('af', 2494),
 ('af.d', 123),
 ('af.mw', 532),
 ('af.q', 30),
 ('ak.b', 3),
 ('an', 799),
 ('an.v', 1),
 ('ar', 1835)]

In [17]:
# Report the 10 most popular pageviews of all projects, sorted by the total number of hits.
rdd.sortBy(lambda x: x["hits"], ascending=False).take(10)

                                                                                

[Row(code='en.mw', title='en', hits=5466346, size=141180155987),
 Row(code='es.mw', title='es', hits=695531, size=12261337515),
 Row(code='ja.mw', title='ja', hits=611443, size=15021588551),
 Row(code='de.mw', title='de', hits=572119, size=9523069696),
 Row(code='fr.mw', title='fr', hits=536978, size=11752030020),
 Row(code='ru.mw', title='ru', hits=466742, size=11847816616),
 Row(code='it.mw', title='it', hits=400297, size=8176042087),
 Row(code='en', title='Main_Page', hits=257915, size=4289970372),
 Row(code='pt.mw', title='pt', hits=196160, size=4029404403),
 Row(code='pl.mw', title='pl', hits=176059, size=2782453516)]

In [20]:
# Determine the number of page titles that start with the article “The”. How many of those page titles
# are not part of the English project (Pages that are part of the English project have “en” as the first
# field)?
rdd_filtered = rdd.filter(lambda x: x["title"].startswith("The"))
rdd_filtered_2 = rdd_filtered.filter(lambda x: not x["code"].startswith("en"))

print(f"{rdd_filtered.count()} page titles start with 'The', "
      f"of which, {rdd_filtered_2.count()} are not English projects")



45020 page titles start with 'The', of which, 9128 are not English projects


                                                                                

In [22]:
# Determine the percentage of pages that have only received a single page view in this one hour of log data.
precentage = 100 * rdd.filter(lambda x: x["hits"] == 1).count() / rdd.count()
print(f"{precentage:.2f}% pages have only received 1 pagehits")



76.96% pages have only received 1 pagehits




In [31]:
# Determine the number of unique terms appearing in the page titles. Note that in page titles, terms
# are delimited by “_” instead of a white space. You can use any number of normalization steps (e.g.,
# lowercasing, removal of non-alphanumeric characters).
rdd_terms = rdd.map(lambda x: x["title"].lower().split("_")).flatMap(lambda x: [i for i in x])
rdd_terms_distinct = rdd_terms.distinct()
print(f"There are {rdd_terms_distinct.count()} unique terms appearing in the page titles")



There are 1793113 unique terms appearing in the page titles


[Stage 45:>                                                         (0 + 4) / 4]                                                                                

In [35]:
# Determine the most frequently occurring page title term in this dataset.
rdd_terms_kv = rdd_terms.map(lambda x: (x,1))
rdd_terms_counts = rdd_terms_kv.reduceByKey(lambda x,y: (x+y))
rdd_terms_counts.sortBy(lambda x: x[1], ascending=False).take(10)

                                                                                

[('of', 194307),
 ('the', 119894),
 ('in', 82018),
 ('de', 63026),
 ('-', 52643),
 ('and', 41560),
 ('list', 35043),
 ('user', 20756),
 ('a', 17265),
 ('new', 15151)]

## Dataframe API via Spark SQL

In [47]:
import pyspark.sql.functions as f
df = rdd.toDF()
df.show()

+----+--------------------+----+------+
|code|               title|hits|  size|
+----+--------------------+----+------+
|  aa|             271_a.C|   1|  4675|
|  aa|    Category:User_th|   1|  4770|
|  aa|  Chiron_Elias_Krase|   1|  4694|
|  aa|    Dassault_rafaele|   2|  9372|
|  aa|              E.Desv|   1|  4662|
|  aa|File:Wiktionary-l...|   1| 10752|
|  aa|Indonesian_Wikipedia|   1|  4679|
|  aa|           Main_Page|   5|266946|
|  aa|Requests_for_new_...|   1|  4733|
|  aa|Special:Contribut...|   1|  5812|
|  aa|Special:Contribut...|   1|  5805|
|  aa|Special:Contribut...|   1|  5808|
|  aa|Special:Contribut...|   1|  5812|
|  aa|Special:ListFiles...|   1|  5035|
|  aa|Special:ListFiles...|   1|  5036|
|  aa|Special:ListFiles...|   1|  5032|
|  aa|Special:Log/Md._F...|   1|  5529|
|  aa|Special:Log/MikeL...|   1|  5368|
|  aa|Special:MyLanguag...|   1|  4701|
|  aa|Special:RecentCha...|   1|  6152|
+----+--------------------+----+------+
only showing top 20 rows



In [48]:
# Compute the min, max, and average page size.
df.select(f.min("size"), f.max("size"), f.mean("size")).show()



+---------+------------+------------------+
|min(size)|   max(size)|         avg(size)|
+---------+------------+------------------+
|        0|141180155987|132239.56957446598|
+---------+------------+------------------+



                                                                                