Copyright (C) 2023 Seoul National University

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
    http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.


## Colab 101

Colab is a free Jupyter notebook environment by Google Research. Unlike AWS cluster (which is charged every hour it is up and running), you can run experiments on your own environment.

## Colab Spark Setup

In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless
!pip install -q findspark
!pip install pyspark==3.3.2
!curl -OL https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop2.tgz
!tar xzvf spark-3.3.2-bin-hadoop2.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop2"

## Wikipedia dataset sample

This time we're not using HDFS to load the data. Sample data are loaded by Python code directly.

The data has four fields: project, title, pageview count and size.

In [None]:
wikipedia_data_sample = ["commons.m File:Gemblong.JPG 1 9717"
,"pl Beata_Tyszkiewicz 10 207378"
,"en Special:RecentChangesLinked/Roswell_(TV_series) 1 14617"
,"de Grafische_Benutzeroberfl%C3%A4che 1 22549"
,"en Simeon_I_of_Bulgaria 5 385793"
,"en Rainbow_Six_(novel) 8 122792"
,"es Pediatr%C3%ADa 5 73598"
,"sv Ett_uts%C3%B6kt_universum 1 9499"
,"en Video_game_content_rating_system 4 112324"
,"es Yuno_Gasai 2 55260"
,"en File:Georg_Wilhelm_Friedrich_Hegel00.jpg 1 43395"
,"en Anestia_ombrophanes 1 8881"
,"et Seitse 2 84874"
,"en And_I_Am_Telling_You_I%27m_Not_Going 4 85690"
,"he %D7%A4%D7%A8%D7%93%D7%99%D7%92%D7%9E%D7%94 1 13887"
,"zh File:Pictogram_voting_keep-green.svg 1 15106"
,"sv Special:Senaste_relaterade_%C3%A4ndringar/Homestead,_Florida 1 7677"
,"pt Categoria:Ambientes_de_desenvolvimento_integrado_livres 1 8151"
,"de.voy Plattensee 1 43748"
,"en Independent_Chip_Model 1 8938"
,"en Category:Toronto_Toros_players 2 0"
,"en Special:Export/Helsinki_Accords 1 19899"
,"xh Special:Contributions/Kpeterzell 1 5883"
,"nl 4_mei 1 0"
,"no Carlos_Keller_Rueff 5 87075"
,"en Special:Contributions/2.31.218.202 1 7402"
,"es Placa_Yangtze 1 10329"
,"de Datei:BSicon_uhKBHFe.svg 1 9786"
,"en Randolph_County,_Alabama 1 21431"
,"es S%C3%A9neca 3 70494"
,"en Tu_Bishvat 3 56438"
,"cs Radiohead 1 14325"
,"es Naturaleza_sangre 1 9286"
,"en Anatolia_(disambiguation) 1 7980"
,"pt Queima_de_suti%C3%A3s 1 8982"
,"pt Titanoboa_cerrejonensis 5 64540"
,"commons.m Category:People_of_Ireland 1 19278"
,"fi Matti_Inkinen 1 10138"
,"ja %E3%83%95%E3%82%A1%E3%82%A4%E3%83%AB:Esfahan_(Iran)_Emam_Mosque.JPG 1 33168"
,"en Psicobloc 1 12739"
,"en Macael,_Spain 1 12658"
,"fa %DA%A9%D9%87%D8%AA%D9%88%DB%8C%D9%87 1 22855"
,"fr Sp%C3%A9cial:Pages_li%C3%A9es/Fichier:Wiki-ezokuroten5.jpg 1 21955"
,"nl Overleg_gebruiker:82.171.157.232 1 0"
,"en Thomas_%26_Mack_Center 2 41010"
,"en Warren_Beatty 49 2631986"
,"uz Auberville 1 11401"]

## Spark RDD Transforms and Actions

RDDs support two types of operations.

**Transformations** create a new dataset from an existing one

**Actions** return a value to the driver program after running a computation on the dataset.

From now, we'll try several Spark RDD transforms using the sample wikipedia dataset.


In [None]:
import findspark
from pyspark.sql import SparkSession

findspark.init(os.environ["SPARK_HOME"])

# SparkSession is an entry point to programming Spark with the Dataset and DataFrame API
# SparkContext represents a connection to a Spark cluster, which can be used to create RDD and broadcast variables on the cluster.

ss = SparkSession.builder.master("local[*]").getOrCreate()
sc = ss.sparkContext

In [None]:
# Parallelize the data and split into columns
lines = sc.parallelize(wikipedia_data_sample, 10)

# Create a PythonRDD named 'columns'.
# A list of tuples, where each item in the tuple corresponds to space-split words in 'lines'
columns = lines.map(lambda line: tuple(line.split(" ")))
# columns.collect()[:5]

In [None]:
# Element-Wise Transformation: Map Transform

# Create a list of (project, count) tuples
project_count_tuples = columns.map(lambda column: (column[0], int(column[2])))
project_count_tuples.collect()

In [None]:
# Element-Wise Transformation: Filter Transform

# Filter project containing name 'de'
project_de_filtered = project_count_tuples.filter(lambda t: 'de' in t[0])
project_de_filtered.collect()

## Quiz
Sample wikipedia data에서 project 의 count column 값이 5 이상인 경우만 filter 하시오.
- 결과값: project, count 로 구성된 tuple

In [None]:
# Code here!


In [None]:
# Element-Wise Transformation: sortByKey Transform

# Sort key-value tuples by key in ascending order
project_sorted = project_count_tuples.sortByKey()
project_sorted.collect()

In [None]:
# And in descending order
project_sorted_desc = project_count_tuples.sortByKey(ascending=False)
project_sorted_desc.collect()

In [None]:
# Transformations on a single Pair RDD: ReduceByKey Transform

# Compute the sum of pageview counts per project
# project_count_tuples.collect()[:5]
project_sum_tuples = project_count_tuples.reduceByKey(lambda a, b: a + b) 
project_sum_tuples.collect()

## Quiz
Sample wikipedia data에서 project 별로 pageview count column 값을 곱하고 project가 'en'이 아닌 경우만 filter한 후, count 가 큰 순서대로 정렬하시오.

- 결과값: project, count 로 구성된 tuple

In [None]:
# Code here!


In [None]:
# Transformations on two Pair RDDs: Join Transform

# Declare additional wikipedia data.
# (recap: project, title (artist name), pageview count, size)
wikipedia_sample_artist = ["en Lauv 49 2631986"
,"en SamSmith 1 12739"
,"en BTS 100 12658"
,"fa Eminem 1 22855"
,"en PostMalone 49 2631986"]

# And each artist's ranking information
artist_to_ranking = ["SamSmith 1"
,"BTS 2"
,"Eminem 3"
,"PostMalone 4"]

# Parallelize both of them and split by spaces
lines2 = sc.parallelize(wikipedia_sample_artist, 5)
lines3 = sc.parallelize(artist_to_ranking, 4)
wikipedia_sample_artist_tuples = lines2.map(lambda line: tuple(line.split(" ")))
artist_to_ranking_tuples = lines3.map(lambda line: tuple(line.split(" ")))

# Create a PythonRDD of (title, count) tuples
title_count_tuples = wikipedia_sample_artist_tuples.map(lambda column: (column[1], int(column[2])))

# Join by title - the value would be (count, ranking) tuples
title_count_tuples.join(artist_to_ranking_tuples).collect()

## Quiz
Sample wikipedia data 로부터 (project, title) 로 된 PythonRDD 로 만든 다음, 
아래의 project_to_projectid 와 project 로 join 하시오.

- 결과값: key = project, value = (title, projectid) 인 tuple

In [None]:
# Declare another sample data
project_to_projectid = ["en 1"
,"fr 2"
,"de 3"
,"es 4"]

# Parallelize both of them and split by spaces
lines2 = sc.parallelize(wikipedia_data_sample, 10)
lines3 = sc.parallelize(project_to_projectid, 4)
wikipedia_sample_tuples = lines2.map(lambda line: tuple(line.split(" ")))
proj_to_projid_tuples = lines3.map(lambda line: tuple(line.split(" ")))

# Code here!


## SparkSQL

Let's learn how to create SQL table by Spark DataFrame and execute SQL queries using Spark!
We'll use Wikipedia data above to create the table, and try some SQL operations using its four colums - project, title, count, size.

In [None]:
# Create a Spark DataFrame from wikipedia_data_sample (equivalent of an 'SQL table' in Spark)
df = ss.createDataFrame(columns, ['project', 'title', 'count', 'size'])

# Create a table view called "WikipediaTable"
df.createOrReplaceTempView("WikipediaTable")

df.show(df.count())

# Run an SQL query that selects project equals to 'en' with count greater than or equal to 5
selected = ss.sql("SELECT project,title FROM WikipediaTable WHERE (project = 'en' AND count >= 5)")

# Print the results in this console (top 20 results will be shown)
selected.show(selected.count())

In [None]:
# Run an SQL query that orders projects by the number of titles each project has
selected = ss.sql("SELECT project, COUNT(title) AS num_of_title FROM WikipediaTable \
GROUP BY project \
ORDER BY num_of_title DESC")

# Print the results in this console (top 20 results will be shown)
selected.show()

In [None]:
# Get back to the two sample data
wikipedia_sample_artist = ["en Lauv 49 2631986"
,"en SamSmith 1 12739"
,"en BTS 100 12658"
,"fa Eminem 1 22855"
,"en PostMalone 49 2631986"]

artist_to_ranking = ["SamSmith 1"
,"BTS 2"
,"Eminem 3"
,"PostMalone 4"]

# Parallelize both of them and split by spaces
lines2 = sc.parallelize(wikipedia_sample_artist, 5)
lines3 = sc.parallelize(artist_to_ranking, 4)

wikipedia_sample_artist_tuples = lines2.map(lambda line: tuple(line.split(" ")))
artist_to_ranking_tuples = lines3.map(lambda line: tuple(line.split(" ")))

In [None]:
# Create a Spark DataFrame from wikipedia_sample_artist and artist_to_ranking
wikipedia_sample_artist_df = ss.createDataFrame(wikipedia_sample_artist_tuples, ['project', 'title', 'count', 'size'])
artist_to_ranking_df = ss.createDataFrame(artist_to_ranking_tuples, ['title', 'ranking'])

# Create a table view of them, called "ArtistTable" and "RankingTable"
wikipedia_sample_artist_df.createOrReplaceTempView("ArtistTable")
artist_to_ranking_df.createOrReplaceTempView("RankingTable")

# Run an SQL query that joins the two tables.
# The result will show 'ranking' of RankingTable and 'title', 'count' of ArtistTable.
# Join will be performed on rows with common 'title' in both tables.
selected = ss.sql("SELECT RankingTable.ranking, ArtistTable.title, ArtistTable.count FROM ArtistTable \
                   INNER JOIN RankingTable ON RankingTable.title=ArtistTable.title \
                   ORDER BY RankingTable.ranking")

selected.show()

## Spark SQL Quiz 1. 
'WikipediaTable'에서, 각 project 당 *count column 값의 총합이 10 이상인* (project, sum_of_count)를 구하시오
- 결과값: project, sum_of_count 2개의 column 을 갖는 테이블

In [None]:
## Code here!



## Spark SQL Quiz 2.
ProjectGradeTable을 WikipediaTable과 join하여, grade가 'C'에 해당하는 project에 속하는 title들을 구하시오
- 결과값: title 1개의 column 을 갖는 테이블

In [None]:
cols = ['project', 'grade']
vals = [
     ('en', 'C'),
     ('he', 'A'),
     ('zh', 'B'),    
     ('no', 'A')
]

project_grade = ss.createDataFrame(vals, cols)
project_grade.show()
project_grade.createOrReplaceTempView("ProjectGradeTable")

In [None]:
## Code here!

