# <center>Big Data for Engineers – Exercises</center>
## <center>Spring 2022 – Week 8 – ETH Zurich</center>
## <center>Notebook for the Spark Moodle Quiz</center>

## Start docker

In your exercise 08 directory, start docker

```
docker compose up
```

After docker finishes downloading the images, you should be able to start the jupyter notebook by copying the following URL to your browser

```
http://127.0.0.1:8888/lab
```

In [4]:
import time
from pyspark.context import SparkContext
# sc is the Spark Context object 
sc = SparkContext('local', 'test')

ValueError: Cannot run multiple SparkContexts at once; existing SparkContext(app=test, master=local) created by __init__ at /tmp/ipykernel_128/2200975250.py:4 

### The Moodle Quiz

For this quiz we will be using the [language confusion dataset](https://quietlyamused.org/blog/2014/03/12/language-confusion/).

As mentioned in the exercise, this quiz is a part of the small project you will be doing over the following 3 weeks to compare Spark, Spark with DataFrames/SQL, and JSONiq. You will hear more about it in the coming weeks.

For the Moodle quiz we ask you to submit the following things:
- The query you wrote (although it is not graded, having your query is helpful for arguing about points)
- Something related to its output (**the only part that is graded**)
- The time it took you to write it (thinking time)
- The time it took you to run it (execution time)

You don't need to submit this notebook, only the queries you wrote.

On your own laptop, download and decompress the dataset into the ex08 folder using the commands below. You can also copy the URL to your browser to download it, then decompress it using the default decompression tools Windows/Mac. Alternatively, you can also run the commands in jupyter notebook, but it takes several minutes to decompress it in the docker container.

```bash
wget https://cloud.inf.ethz.ch/s/a8FoHew6dHKGYKK/download/confusion20140302.tbz2; tar -jxvf confusion20140302.tbz2
```

In [7]:
data = sc.textFile('./confusion-2014-03-02/confusion-2014-03-02.json')

Since the entries are JSON records, you will need to parse them and use their respective object representations. You can use this mapping for all queries. Since some of the queries take a long time to execute on the dataset, you may want to answer these queries on the first `100000` entries. 

**For the quiz, fill in the results by running the queries on the 100000-entry subset (`test_entries` as defined in the following cell) instead of the entire dataset.**

In [8]:
import json

testset = sc.parallelize(data.take(100000))
test_entries = testset.map(json.loads)
print(test_entries.first())

{'guess': 'Norwegian', 'target': 'Norwegian', 'country': 'AU', 'choices': ['Maori', 'Mandarin', 'Norwegian', 'Tongan'], 'sample': '48f9c924e0d98c959d8a6f1862b3ce9a', 'date': '2013-08-19'}


Good! Let's get to work. A few last things:
- Take into account that some of the queries might have very large outputs, which Jupyter (or sometimes even Spark) won't be able to handle. It is normal for the queries to take some time, but if the notebook crashes or stops responding, try restarting the kernel. Avoid printing large outputs. You can print the first few entries to confirm the query has worked, as shown in query 1.
- Remember to delete the cluster if you want to stop working! You can recreate it using the same container name and your resources will still be there.
- Refer to the [documentation](http://spark.apache.org/docs/2.3.0/api/python/pyspark.html#pyspark.RDD), as well as the programming guides on actions and transformations linked to above.

And now to the actual queries: *Please make sure that in your queries you *only* use PySpark, and avoid any dataframes (they will covered in next week's exercises)*

1\. Find all games such that the guessed language is correct (=target), and such that this language is Spanish. What is the length of the resulting sequence?

In [20]:
start = time.time()
# Query:
matching = test_entries.filter(lambda e: e["target"] == "Spanish" and e["guess"] == "Spanish").collect()
end = time.time()
print('length =', len(matching))
print('Time consumption {} sec'.format(end - start))


length = 2094
Time consumption 0.5120983123779297 sec


2\. Find the number of all distinct values of the *guessed* languages (i.e. the *guess* field). What is the length of the resulting sequence?

In [91]:
from pyspark.sql.functions import collect_set
start = time.time()
# Query:
#matching = test_entries.map(lambda e: e["target"]).reduce(lambda x,y: set(y))
matching = test_entries.map(lambda e: e["target"]).distinct().collect()
end = time.time()
print(matching)
print(len(matching))
print('Time consumption {} sec'.format(end - start))

['Norwegian', 'Dinka', 'Samoan', 'Somali', 'Japanese', 'Turkish', 'French', 'German', 'Spanish', 'Romanian', 'Cantonese', 'Assyrian', 'Hebrew', 'Kurdish', 'Italian', 'Maltese', 'Kannada', 'Albanian', 'Lao', 'Swahili', 'Vietnamese', 'Portuguese', 'Nepali', 'Indonesian', 'Finnish', 'Fijian', 'Burmese', 'Khmer', 'Polish', 'Dutch', 'Russian', 'Estonian', 'Serbian', 'Tagalog', 'Slovenian', 'Latvian', 'Czech', 'Urdu', 'Tigrinya', 'Tamil', 'Danish', 'Mandarin', 'Armenian', 'Sinhalese', 'Bangla', 'Arabic', 'Punjabi', 'Hindi', 'Dari', 'Tongan', 'Malay', 'Farsi', 'Ukrainian', 'Greek', 'Maori', 'Gujarati', 'Yiddish', 'Macedonian', 'Swedish', 'Korean', 'Bulgarian', 'Slovak', 'Amharic', 'Bosnian', 'Hungarian', 'Malayalam', 'Croatian', 'Thai']
68
Time consumption 1.1223254203796387 sec


3\. Return the top three games where the guessed language is incorrect ($\ne$target) ordered by country (ascending), then target language (ascending), then date (ascending). What is the sample id of the 3rd item in the list? 

Enter it without quotes, for example 48f9c924e0d98c959d8a6f1862b3ce9a

In [46]:
start = time.time()
# Query:
wrong_guesses = test_entries.filter(lambda e: e["target"] != e["guess"])
ordered_by_date  = wrong_guesses.sortBy(lambda e: e["date"], ascending = True )
ordered_by_target_language = ordered_by_date.sortBy(lambda e: e["target"], ascending = True )
ordered_by_country = ordered_by_target_language.sortBy(lambda e: e["country"], ascending = True )
result = ordered_by_country.collect()
print(result[2]["sample"])
end = time.time()
print('Time consumption {} sec'.format(end - start))

00b85faa8b878a14f8781be334deb137
Time consumption 1.0367622375488281 sec


4\. Aggregate all games by guessed and target language, counting the number of guessing games that were done for each pair (guess, target). How many times has Dutch been mistaken for Norwegian (i.e. Dutch was the true answer)?

In [52]:
start = time.time()
# Query:
#aggregation = test_entries.groupBy("target","guess").count()
#mistaken = aggregation.filter(lambda e: e["target"] == "Dutch" and e["guess"] == "Norvegian")
#result = mistaken.collect()
matching = test_entries.filter(lambda e: e["target"] == "Dutch" and e["guess"] == "Norwegian").collect()
print(len(matching))
#print(result)
end = time.time()
print('Time consumption {} sec'.format(end - start))

19
Time consumption 0.4961109161376953 sec


5\. Among all the games where the guess was correct (=target), what is the percentage of cases where the second choice (among the array of possible answers) was the target?

Please write the fraction rounding to 4 decimals (eg. 0.3323)

In [53]:
start = time.time()
# Query:
correct_guesses_count = test_entries.filter(lambda e: e["target"] == e["guess"]).count()
correct_guesses = test_entries.filter(lambda e: e["target"] == e["guess"])
second_choice_is_target_count = correct_guesses.filter(lambda e: e["choices"][1] == e["target"]).count()
print(second_choice_is_target_count/correct_guesses_count)
end = time.time()
print('Time consumption {} sec'.format(end - start))

0.37891275334002944
Time consumption 1.757666826248169 sec


6\. For each target language, compute the percentage of successful guess games (i.e. *guess* == *target*) relative to all games for that target language, and display the pairs `(target_language, percentage)` in descending order of the percentage. What is the third language in this list?

In [108]:
start = time.time()
# Query:
mapping = test_entries.map(lambda e: (e["target"], (1, int(e["target"] == e["guess"]))))
reducing = mapping.reduceByKey(lambda x, y: (x[0] + y[0], x[1] + y[1]))
result_rdd = reducing.map(lambda x: (x[0], 100 * x[1][1] / x[1][0])).sortBy(lambda x: x[1], ascending=False)

results = result_rdd.collect()
print(results)
    
end = time.time()
print('Time consumption {} sec'.format(end - start))

[('French', 95.97374179431073), ('German', 94.55670103092784), ('Mandarin', 92.7967985771454), ('Spanish', 92.77802392556491), ('Cantonese', 92.01053555750659), ('Italian', 91.94690265486726), ('Japanese', 91.11008751727314), ('Korean', 89.9624765478424), ('Russian', 88.50305021116847), ('Vietnamese', 86.93492300049677), ('Arabic', 85.48942449581898), ('Thai', 85.03937007874016), ('Ukrainian', 81.30412633723893), ('Hebrew', 79.55334987593052), ('Lao', 77.89580171977744), ('Polish', 77.0517511761631), ('Czech', 76.11379495437467), ('Norwegian', 75.75593952483801), ('Slovak', 74.6218487394958), ('Swedish', 74.40217391304348), ('Romanian', 74.33070866141732), ('Greek', 73.39901477832512), ('Swahili', 72.66666666666667), ('Portuguese', 72.5958866036687), ('Yiddish', 72.52124645892351), ('Slovenian', 72.13525360050095), ('Punjabi', 71.11111111111111), ('Gujarati', 71.08208955223881), ('Bulgarian', 71.03494623655914), ('Danish', 70.71990320629159), ('Dutch', 69.37463471654003), ('Khmer', 68.

7\. How many games in France (country=FR) were played on the last day?

In [100]:
start = time.time()
# Query:
ditinct_dates = test_entries.map(lambda e: e["date"]).distinct().collect()
last_day = (max(ditinct_dates))
matching = test_entries.filter(lambda e: e["date"] == last_day and e["country"] == "FR").collect()
print(len(matching))
end = time.time()
print('Time consumption {} sec'.format(end - start))

494
Time consumption 1.1423659324645996 sec
