# <center>Big Data &ndash; Exercises</center>
## <center>Spring 2022 &ndash; Week 9 &ndash; ETH Zurich</center>
## <center>Spark Dataframes and Spark SQL, Moodle exercise</center>

# Preparation for the moodle exercise in Spark

In this jupyter notebook we are going to make the preprocessing part of the dataset that is going to be used in the graded exercise of this week.
It will be the same language game dataset as in exercise08.

1. Change to `exercise09` repository

2. Start docker <br>
```docker-compose up -d```

3. Getting the data:
Follow the procedure that is described below. The dataset can be found here: https://cloud.inf.ethz.ch/s/a8FoHew6dHKGYKK/download/confusion20140302.tbz2 

More specifically do the following:
- download the data      :<br> ```wget https://cloud.inf.ethz.ch/s/a8FoHew6dHKGYKK/download/confusion20140302.tbz2```
- extract the data       :<br> ```tar -jxvf confusion20140302.tbz2```

4. copy the data to hdfs :<br> ```docker cp confusion-2014-03-02/confusion-2014-03-02.json jupyter:/home/jovyan/work``` <br>
(Copying the data to hdfs needs to be done only once and it might take 4-5 minutes.)

## More Info about the data
You can find more information about the dataset (as well as the schema and examples) in this link: https://quietlyamused.org/blog/2014/03/12/language-confusion/

## Instructions:

In every query we ask you for three quantities: the query itself, the result of the query as well as the productivity time. That means the development time of each query (time elapsed before you start writing the query, and the time at which the correct, final query is ready). Note that the time part of every question is optional and not graded. In order to make easier the time recording we created two functions that do it automatically. Run the cell below in order to import the functions into the current notebook. Then before each query we will have a ```start_exercise()``` cell that you have to run in order to start time recording. After you have finished your query and you are sure about the answer run the ```finish_exercise()``` one to get the time measurement. 

In [1]:
import time

def start_exercise():
    global last
    last = time.time()
    
def finish_exercise():
    global last
    print("This exercise took {0}s".format(int(time.time()-last)))

#### <b>For the assignments we only use the first 50k lines of the dataset, `dataset_50k = dataset.limit(50000)`. </b>

## <center>1. Spark Dataframes</center>

Write queries for the same questions as last week, but this time using Spark Dataframes operations (the data loading will take a couple minutes).

### 1.0. Data preprocessing

In [2]:
import json
from pyspark.sql import SparkSession
from pyspark import SparkConf

spark = SparkSession.builder.master('local').getOrCreate()
sc = spark.sparkContext

path = "confusion-2014-03-02.json"
dataset = spark.read.json(path).cache()

In [3]:
#test it out
dataset.limit(3).show()

+--------------------+-------+----------+---------+--------------------+---------+
|             choices|country|      date|    guess|              sample|   target|
+--------------------+-------+----------+---------+--------------------+---------+
|[Maori, Mandarin,...|     AU|2013-08-19|Norwegian|48f9c924e0d98c959...|Norwegian|
|[Danish, Dinka, K...|     AU|2013-08-19|    Dinka|af5e8f27cef9e689a...|    Dinka|
|[German, Hungaria...|     AU|2013-08-19|  Turkish|509c36eb58dbce009...|   Samoan|
+--------------------+-------+----------+---------+--------------------+---------+



In [4]:
dataset_50k = dataset.limit(50000)

In [5]:
dataset_50k.printSchema()

root
 |-- choices: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- country: string (nullable = true)
 |-- date: string (nullable = true)
 |-- guess: string (nullable = true)
 |-- sample: string (nullable = true)
 |-- target: string (nullable = true)



## Assignment 1
Find the number of games where the guessed language is correct (meaning equal to the target one) and that language is Russian.

In [None]:
dataset_50k.filter(dataset_50k.target == 'Russian').filter(dataset_50k.target == dataset_50k.guess).limit(3).show()

In [9]:
start_exercise()

In [10]:
dataset_50k.filter(dataset_50k.target == 'Russian').filter(dataset_50k.target == dataset_50k.guess).count()

957

In [11]:
finish_exercise()

This exercise took 3s


## Assignment 2
Return the number of distinct "target" languages.

In [13]:
dataset_50k.select('target').distinct().take(10)

[Row(target='Norwegian'),
 Row(target='Dinka'),
 Row(target='Samoan'),
 Row(target='Somali'),
 Row(target='Japanese'),
 Row(target='Turkish'),
 Row(target='French'),
 Row(target='German'),
 Row(target='Spanish'),
 Row(target='Romanian')]

In [14]:
start_exercise()

In [15]:
dataset_50k.select('target').distinct().count()

68

In [16]:
finish_exercise()

This exercise took 2s


## Assignment 3
Return the sample IDs (i.e., the *sample* field) of the top three games where the guessed language is correct (equal to the target one) ordered by language (ascending), then by country (ascending), then by date (ascending).

In [25]:
correct = dataset_50k.filter(dataset_50k.target == dataset_50k.guess)
correct_ordered = correct.orderBy(["target", "country", "date"], ascending=[1, 1, 1])
top3 = correct_ordered.select('sample').take(3)
top3 = [row.sample for row in top3]
print(top3)

['fdf23d0a7063ba2fcef4b18eb7d57ad8', '13722ceed1eede7ba597ade9b4cb9807', 'efcd813daec1c836d9f030b30caa07ce']


In [26]:
start_exercise()

In [27]:
correct = dataset_50k.filter(dataset_50k.target == dataset_50k.guess)
correct_ordered = correct.orderBy(["target", "country", "date"], ascending=[1, 1, 1])
top3 = correct_ordered.select('sample').take(3)
top3 = [row.sample for row in top3]
print(top3)

['fdf23d0a7063ba2fcef4b18eb7d57ad8', '13722ceed1eede7ba597ade9b4cb9807', 'efcd813daec1c836d9f030b30caa07ce']


In [28]:
finish_exercise()

This exercise took 3s


## Assignment 4
Aggregate all games by country and target language, counting the number of guesses for each group and return the frequencies of the three most frequent country/language combinations.

In [39]:
import pyspark.sql.functions as F
top3count = dataset_50k.groupBy('country','target').agg(F.count('*').alias('n')).select('n').orderBy('n', ascending=0).take(3)
top3count = [row.n for row in top3count]
print(top3count)

[770, 719, 715]


In [40]:
start_exercise()

In [41]:
import pyspark.sql.functions as F
top3count = dataset_50k.groupBy('country','target').agg(F.count('*').alias('n')).select('n').orderBy('n', ascending=0).take(3)
top3count = [row.n for row in top3count]
print(top3count)

[770, 719, 715]


In [42]:
finish_exercise()

This exercise took 2s


## Assignment 5
Find the percentage of games where (the answer was correct && the correct guess was the first choice amongst the array of possible answers)

Please write the fraction rounding to 4 decimals (eg. 0.3323)

In [43]:
total_number_games = dataset_50k.count()
correct = dataset_50k.filter(dataset_50k.target == dataset_50k.guess)
correct_and_first = correct.filter(correct.guess == correct.choices[0])
perc = correct_and_first.count()/total_number_games

print(f'{perc:1.4f}')


50000
0.2435


In [44]:
start_exercise()

In [45]:
total_number_games = dataset_50k.count()
correct = dataset_50k.filter(dataset_50k.target == dataset_50k.guess)
correct_and_first = correct.filter(correct.guess == correct.choices[0])
perc = correct_and_first.count()/total_number_games

print(f'{perc:1.4f}')


0.2435


In [46]:
finish_exercise()

This exercise took 4s


## Assignment 6
Return the number of games played on the latest day.

In [56]:
latest_day = dataset_50k.select('date').orderBy('date', ascending=0).first().date
print(latest_day)
dataset_50k.filter(dataset_50k.date == latest_day).count()

2013-09-03


34429

In [57]:
start_exercise()

In [58]:
latest_day = dataset_50k.select('date').orderBy('date', ascending=0).first().date
dataset_50k.filter(dataset_50k.date == latest_day).count()

34429

In [59]:
finish_exercise()

This exercise took 3s


## <center>2. Spark SQL</center>

Write Spark SQL queries for the same questions as earlier.

### 2.0. Data preprocessing

In [60]:
!pip install sparksql-magic

Collecting sparksql-magic
  Downloading sparksql_magic-0.0.3-py36-none-any.whl (4.3 kB)
Collecting py4j==0.10.9
  Downloading py4j-0.10.9-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 4.7 MB/s eta 0:00:01
Installing collected packages: py4j, sparksql-magic
Successfully installed py4j-0.10.9 sparksql-magic-0.0.3


In [61]:
%load_ext sparksql_magic

In [62]:
path = "confusion-2014-03-02.json"
dataset_50k = spark.read.json(path).cache().limit(50000)
dataset_50k.registerTempTable("dataset_50k")

In [63]:
%%sparksql
-- test it out
SELECT *
FROM dataset_50k
LIMIT 3

0,1,2,3,4,5
choices,country,date,guess,sample,target
"['Maori', 'Mandarin', 'Norwegian', 'Tongan']",AU,2013-08-19,Norwegian,48f9c924e0d98c959d8a6f1862b3ce9a,Norwegian
"['Danish', 'Dinka', 'Khmer', 'Lao']",AU,2013-08-19,Dinka,af5e8f27cef9e689a070b8814dcc02c3,Dinka
"['German', 'Hungarian', 'Samoan', 'Turkish']",AU,2013-08-19,Turkish,509c36eb58dbce009ccf93f375358d53,Samoan


AnalysisException: cannot resolve '`max_date`' given input columns: [dataset_50k.choices, dataset_50k.country, dataset_50k.date, dataset_50k.guess, dataset_50k.sample, dataset_50k.target]; line 6 pos 13;## Assignment 1
Find the number of games where the guessed language is correct (meaning equal to the target one) and that language is Russian.

In [93]:
start_exercise()

In [94]:
%%sparksql
-- write you query here
SELECT count(*)
FROM dataset_50k
WHERE guess = target AND target = 'Russian'




0
count(1)
957


In [95]:
finish_exercise()

This exercise took 2s


## Assignment 2
Return the number of distinct "target" languages.

In [96]:
start_exercise()

In [97]:
%%sparksql
-- write you query here
SELECT COUNT(DISTINCT target) AS Count
FROM dataset_50k



0
Count
68


In [98]:
finish_exercise()

This exercise took 1s


## Assignment 3
Return the sample IDs (i.e., the *sample* field) of the top three games where the guessed language is correct (equal to the target one) ordered by language (ascending), then by country (ascending), then by date (ascending).

In [99]:
start_exercise()

In [100]:
%%sparksql
-- write you query here

SELECT sample
FROM dataset_50k
WHERE guess = target
ORDER BY target,country,date ASC
LIMIT 3



0
sample
fdf23d0a7063ba2fcef4b18eb7d57ad8
13722ceed1eede7ba597ade9b4cb9807
efcd813daec1c836d9f030b30caa07ce


In [101]:
finish_exercise()

This exercise took 3s


## Assignment 4
Aggregate all games by country and target language, counting the number of guesses for each group and return the frequencies of the three most frequent country/language combinations.

In [None]:
start_exercise()

In [68]:
%%sparksql
-- write you query here

SELECT count(*) as count
FROM dataset_50k
GROUP BY country, target
ORDER BY count DESC
LIMIT 3


0
count
770
719
715


In [None]:
finish_exercise()

## Assignment 5
Find the percentage of games where (the answer was correct && the correct guess was the first choice amongst the array of possible answers)

Please write the fraction rounding to 4 decimals (eg. 0.3323)

In [102]:
start_exercise()

In [103]:
%%sparksql
-- write you query here

SELECT count(*)/50000 as perc
FROM dataset_50k
WHERE guess = target AND target = choices[0]





0
perc
0.24352


In [104]:
finish_exercise()

This exercise took 2s


## Assignment 6
Return the number of games played on the latest day.

In [107]:
start_exercise()

In [108]:
%%sparksql
-- write you query here


SELECT count(*), MAX(date) as max_date
FROM dataset_50k
GROUP BY date
ORDER BY date DESC
LIMIt(1)


0,1
count(1),max_date
34429,2013-09-03


In [109]:
finish_exercise()

This exercise took 2s
