<a href="https://colab.research.google.com/github/seznam/IT-akademie-bigdata/blob/main/big-data/notebooks/002_apache-spark-in-practice.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# What you'll put your hands on in this notebook

- reading data from the Parquet format
- linking two separate data sets together based on a common field
- writing simple aggregation

# Preparing the data & Spark session

Following snippet will download our example dataset which you'll be working with:

In [None]:
!test -f example-dataset.tar.xz || wget https://github.com/seznam/IT-akademie-bigdata/raw/main/big-data/data/example-dataset.tar.xz
!test -d example-dataset || tar -xf example-dataset.tar.xz
!ls -l

Now let's install Spark on PySpark:

In [None]:
# Install Spark

import os
os.chdir("/content")
!test -f spark-3.2.1-bin-hadoop2.7.tgz || wget https://archive.apache.org/dist/spark/spark-3.2.1/spark-3.2.1-bin-hadoop2.7.tgz
!test -d spark-3.2.1-bin-hadoop2.7 || tar -xf spark-3.2.1-bin-hadoop2.7.tgz

# Setup pyspark
!pip install findspark
import findspark
os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop2.7"
findspark.init()

# Create new SparkSession
from pyspark.sql import SparkSession
spark = SparkSession.builder \
        .master("local[*]") \
        .getOrCreate()

In [None]:
# Just some handy functions to keep the code cells clean later on
# Note that we don't use asterisk (*) because then the Colab completion doesn't work
from pyspark.sql.functions import col, floor, udf, explode

# Reading the data

Now it's time for you to come up with what you've learned from the previous [Introduction to Apache Spark - IT Academy 2022](https://colab.research.google.com/github/seznam/IT-akademie-bigdata/blob/main/big-data/notebooks/001_introduction_to_apache_spark.ipynb) notebook.

Let us help you for starters by looking into the structure of the data we have prepared for you.

In [None]:
!ls -l example-dataset/

Okay, we have two directories `clicks` and `pageviews`. There were taken from Seznam's ad division, so a click means some user clicked an ad, while an impression means an ad was loaded by the browser and rendered.

Let's look into the directories to see the data format:

In [None]:
!ls -l example-dataset/*

As you can see, the data are using Parquet format. And here comes your first task:
- load clicks parquet directory into one DataFrame (named `clicks`)
- and pageviews directory into other DataFrame (named `pageviews`)

*HINT: [PySpark Documentation](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrameReader.parquet.html?highlight=parquet) might be handy if autocompletion does not suffice*

In [None]:
clicks = 

In [None]:
pageviews = 

Let's see the schema and the data to verify you have loaded it successfully

In [None]:
clicks.printSchema()
clicks.show()

In [None]:
pageviews.printSchema()
pageviews.show()

Great! At this point, you're ready to go on to the second part.

# Linking two datasets together

As you can see from the schemas of both DataFrames, there are some fields, which we can use to link the data together.

One of them is `click.ImpressionTimestamp` column, which should correlate to `pageview.timestamp` column. We will use this to link the two datasets together.

The second one is a bit tricky to understand, because first you need to understand, what is an impression within our dataset.
- single line of `pageviews` DataFrame represents a single rendering result from the browser, but it potentially includes more ads at once, which is why there is an column named `randomIds`, which is actually an array of integers.
- each `randomId` can be considered as a single ad
- since one line of `clicks` DataFrame represents a single click on a single ad, we can link `pageviews` to `clicks` only after we *explode* our `randomIds` array
  - what do we mean by *exploding* the array?
  - well, it's like flattening the structure, so that we get "more lines" in the DataFrame at the end, so that we have one pageview with exactly one randomId

That said, we now know we can also link `click.RandomId` field with exploded `pageview.randomIds[]` array.

See function docs you should use:
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.withColumn.html
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.functions.explode.html

In [None]:
exploded_pageviews = 

In [None]:
# Before we proceed to joining two data frames, we need to unify correlated column names
exploded_pageviews = exploded_pageviews \
                        .drop('randomIds') \
                        .withColumnRenamed('timestamp', 'impressionTimestamp')

Let's see what we've got:

In [None]:
exploded_pageviews.printSchema()
exploded_pageviews.show()

Now that we've exploded pageviews using `randomIds`, it's time for you to join these DataFrames by `randomId` and  `impressionTimestamp` fields. Please consult the docs:
- https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html

In [None]:
linked_pageviews = 

In [None]:
# To avoid ambiguity, rename Timestamp column from clicks
linked_pageviews = linked_pageviews \
                      .withColumnRenamed('Timestamp', 'ClickTimestamp') 

Let's see if we got it right:

In [None]:
linked_pageviews.printSchema()
linked_pageviews.show()

## Bonus task

- create also unlinked DataFrame, which will contain all these pageviews or clicks, which were not linked by the other DataFrame

In [None]:
# Hint: using just "DataFrame.join" method is enough

unlinked_pageviews =

unlinked_pageviews.printSchema()
unlinked_pageviews.count()

# Performing basic aggregation

Your task here is to create a summary statistics of how many clicks were registered per each `AdId`.

For example:

| AdId | Clicks |
| - | - |
| 1002 | 30 |
| 586 | 2 |
| ... | ... |

*Hint: Use group by aggregation from the [Introduction to Apache Spark - IT Academy 2022](https://colab.research.google.com/github/seznam/IT-akademie-bigdata/blob/main/big-data/notebooks/001_introduction_to_apache_spark.ipynb) notebook.*

In [None]:
clicks_by_ad_id =

clicks_by_ad_id.printSchema()
clicks_by_ad_id.show()

Great, you've got it!

Now, let's create an UDF to extract UNIX timestamp of the nearest hour (rounded to floor) from the `ClickTimestamp` field (within the `linked_pageviews` DataFrame) and print the number of linked pageviews per each hour.

In [None]:
def extract_hour(click_timestamp):
  # TODO
  return None

In [None]:
extract_hour_udf = udf(extract_hour)

In [None]:
# TODO: Apply this UDF to linked_pageviews DataFrame and compute count of them per each hour (and print first few lines)

linked_pageviews