<h1><center>Big Data Algorithms Techniques & Platforms</center></h1>
<h2>
<hr style=" border:none; height:3px;">
<center>Spark and DataFrames</center>
<hr style=" border:none; height:3px;">
</h2>

## Objectives 

<strong> Dataframes: </strong>
<ul>
    <li>  Pyspark </li> 
    <li>  Pandas library on Spark</li> 
</ul>

# A. Context

<p align="justify">
<font size="3">
For running this serie of exercises we are going to use a quite big dataset containing data on Bitcoin made available from <a href="https://www.kaggle.com/mczielinski/bitcoin-historical-data">Kaggle</a>.

As stated in the description of the dataset:
"Bitcoin is the longest running and most well known cryptocurrency, first released as open source in 2009 by the anonymous Satoshi Nakamoto. Bitcoin serves as a decentralized medium of digital exchange, with transactions verified and recorded in a public distributed ledger (the blockchain) without the need for a trusted record keeping authority or central intermediary." 
</font>
</p>


### The dataset

<p align="justify">
<font size="3">
The dataset is in a .csv file:

$bitstampUSD\_1-min\_data\_2012-01-01\_to\_2021-03-31.csv$

CSV files for select bitcoin exchanges for the time period of Jan 2012 to December March 2021, with minute to minute updates of OHLC (Open, High, Low, Close), Volume in BTC and indicated currency, and weighted bitcoin price. 

Notice that:
<ul>
    <li> Timestamps are in Unix time.</li>
<li> Timestamps without any trades or activity have their data fields filled with NaNs. </li>
<li>  If a timestamp is missing, or if there are jumps, this may be because the exchange (or its API) was down, the exchange (or its API) did not exist, or some other unforeseen technical error in data reporting or gathering. </li>
</ul>
As stated by the authors "all effort has been made to deduplicate entries and verify the contents are correct and complete to the best of my ability, but obviously trust at your own risk".
</p>
</font>

# B. Environment set-up

<p align="justify">
<font size="3">
As first step you must include your dataset in your environment.

You can folllow the procedure that includes Kaggle data into colab working folders or simply download and re-upload the file on your Colab space.


$bitstampUSD\_1-min\_data\_2012-01-01\_to\_2021-03-31.csv$
    
and upload it in the folder where your notebook is supposed to read the input.

</font>
</p>

<p align="justify">
<font size="3">
As second step you must prepare your environment running the following two cells that:
<ul>
    <li> Import the Pandas library.</li>
<li> Set the Spark environment and return a SparkSession (acting as was acting the SparkContext in the previous exercises). </li>
</ul>    
    

</font>
</p>

In [1]:
# import of Pandas library
import pandas as pa
import gc

In [4]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://downloads.apache.org/spark/spark-3.0.3/spark-3.0.3-bin-hadoop2.7.tgz
!tar zxvf spark-3.0.3-bin-hadoop2.7.tgz
!pip install -q findspark

Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Get:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release [696 B]
Get:5 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
Get:6 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Get:8 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release.gpg [836 B]
Get:9 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ Packages [73.0 kB]
Hit:10 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:12 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Get:13 https://developer.download.nvi

In [7]:
import os
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-1.8.0-openjdk-amd64/"
#os.environ["JAVA_HOME"] = "/Library/Java/JavaVirtualMachines/adoptopenjdk-11.jdk/Contents/Home"


os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.3-bin-hadoop2.7"

#os.environ["SPARK_HOME"] = "/Users/viceroy/Downloads/content/spark-3.0.1-bin-hadoop3.2"
#os.environ["SPARK_HOME"] = "/Users/viceroy/Downloads/spark-3.0.3-bin-hadoop2.7" 
#os.environ["SPARK_HOME"] = "/usr/local/lib/python3.8/site-packages/pyspark/"



import findspark
findspark.init()

#import of the SparkSession
from pyspark.sql import SparkSession

#inizialization of the Spark Session
spark = SparkSession \
    .builder \
    .appName("Assignment2") \
    .getOrCreate()

## B.1  File import
    
<p align="justify">
<font size="3">
In this exercise the goal is to create a Spark DataFrame from the csv file in imput. 

Recall that in Spark DataFrame the type of the columns is very important for the definition of the internal data representation. 
    
For this step you the target set of typed columns is the following one: 
<ul>
    <li>    $Date\_Time: Timestamp$ </li>
     <li>   $Open: double$ </li>
     <li>   $High: double$ </li>
    <li>    $Low: double$ </li>
    <li>    $Close: double$ </li>
    <li>    $Volume\_BTC: double$ </li>
    <li>    $Volume\_Currency: double$ </li>
    <li>    $Weighted\_Price: double$ </li>
</ul>
    
We will arrive to define the schema in 3 guided steps described in the following sections.
</font>
</p>

<p align="justify">
<font size="3">
Notice that the header of the $csv$ file contains the data description and that the simple import of the
file treats the timestamp column as a String. 
</font>
</p>

<p align="justify">
<font size="3">
In data import you must check that:
<ul>
    <li>  the types of the imported data (the ones read from the file using the operation you choose) are equal to the types in the given schema</li>
    <li>  the names of columns correspond (and make transofrmations if necessary). </li> 
</ul>
    
</font>
</p>

### <strong> Exercise 1.</strong> First import (1 point)
    
<p align="justify">
<font size="3">
Import the csv file in Spark DataFrame. If you have any doubt you can always refer to the Spark 3.1.1 documentation:

<a href="https://spark.apache.org/docs/3.1.1/">Spark Reference Documentation</a>

</font>
</p>

In [10]:
# Write the command that creates (reads) a Spark DataFrame and stores the reference in the dfs variable
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType
#'''############## WRITE YOUR CODE HERE ##############'''
schema = StructType([
    StructField("Timestamp",IntegerType(), True),
    StructField("Open",DoubleType(), True),
    StructField("High",DoubleType(), True),
    StructField("Low",DoubleType(), True),
    StructField("Close",DoubleType(), True),
    StructField("Volume_BTC",DoubleType(), True),
    StructField("Volume_Currency",DoubleType(), True),
    StructField("Weighted_Price",DoubleType(), True)
])

dfs = spark.read.csv("bitstampUSD_1-min_data_2012-01-01_to_2021-03-31.csv", schema=schema, header=True)
dfs.printSchema()

#'''############## END OF THE EXERCISE ##############'''

#show the DataFrame schema
dfs
#######################
# EXPECTED OUTPUT:
# DataFrame[Timestamp: int, Open: double, High: double, Low: double, Close: double, Volume_(BTC): double, Volume_(Currency): double, Weighted_Price: double]</font>
#
# Notice that if you have something like:
# DataFrame[_c0: string, _c1: string, _c2: string, _c3: string, _c4: string, _c5: string, _c6: string, _c7: string]
# you forgot a step: you did not include the schema of the columns
#
# Notice also that if you have:
# DataFrame[Timestamp: string, Open: string, High: string, Low: string, Close: string, Volume_(BTC): string, Volume_(Currency): string, Weighted_Price: string]
# you also forgot a step: the type of the Timestamp must be a String
###########################

root
 |-- Timestamp: integer (nullable = true)
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume_BTC: double (nullable = true)
 |-- Volume_Currency: double (nullable = true)
 |-- Weighted_Price: double (nullable = true)



DataFrame[Timestamp: int, Open: double, High: double, Low: double, Close: double, Volume_BTC: double, Volume_Currency: double, Weighted_Price: double]

In [11]:
# the following command is going to show 5 rows of the DataFrame
dfs.take(5)

[Row(Timestamp=1325317920, Open=4.39, High=4.39, Low=4.39, Close=4.39, Volume_BTC=0.45558087, Volume_Currency=2.0000000193, Weighted_Price=4.39),
 Row(Timestamp=1325317980, Open=nan, High=nan, Low=nan, Close=nan, Volume_BTC=nan, Volume_Currency=nan, Weighted_Price=nan),
 Row(Timestamp=1325318040, Open=nan, High=nan, Low=nan, Close=nan, Volume_BTC=nan, Volume_Currency=nan, Weighted_Price=nan),
 Row(Timestamp=1325318100, Open=nan, High=nan, Low=nan, Close=nan, Volume_BTC=nan, Volume_Currency=nan, Weighted_Price=nan),
 Row(Timestamp=1325318160, Open=nan, High=nan, Low=nan, Close=nan, Volume_BTC=nan, Volume_Currency=nan, Weighted_Price=nan)]

In [12]:
gc.collect()

165

<p align="justify">
<font size="3">
Look again at the target schema:
    
<ul>
    <li>    $Date\_Time: Timestamp$ </li>
     <li>   $Open: double$ </li>
     <li>   $High: double$ </li>
    <li>    $Low: double$ </li>
    <li>    $Close: double$ </li>
    <li>    $Volume\_BTC: double$ </li>
    <li>    $Volume\_Currency: double$ </li>
    <li>    $Weighted\_Price: double$ </li>
</ul>
    
You notice that the import data has three problems with respect to the target schema:
    
    
<ul>
    <li> the $Date\_Time$ column is not present in the original file </li>
    <li> there is an $int$ column $Timestamp$ that can be converted and transformed to a $Date$</li> 
    <li> some of the column names contain not required parentesis. </li>
</ul>     
</font>
</p>




### <strong> Exercise 2. </strong> Timestamp column (1 point)
    
<p align="justify">
<font size="3">
Refine the import of the csv file and convert the "timestamp" column in the proper $Timestamp$ type:
    <ul>
        <li>   Create a new column <code>Date\_Time</code> that is the conversion of the $String$ column $Timestamp$ in $Timestamp$ type  </li>
</ul>
The Dataframe are immutable structure, then your procedure will use a command (discussed in the slides) that will create a new Spark $DataFrame$ from the $dfs$ $DataFrame$ having a different schema. 

</font>
</p>

<p align="justify">
<font size="3">
Look at the timestamp column of the csv file and from the imported DataFrame 
</font>
</p>

In [13]:
# write the command that creates a new Data Frame Spark with Date_Time column
# and stores the reference in the dfsdt variable (it must be a DataFrame Spark with Date_Time column)

#'''############## WRITE YOUR CODE HERE ##############'''
from pyspark.sql.types import TimestampType

dfsdt = dfs.withColumn("Date_Time", dfs["Timestamp"].cast(TimestampType()))

#'''############## END OF THE EXERCISE ##############'''

#show the DataFrame schema
dfsdt

#######################
# EXPECTED OUTPUT:
# DataFrame[Timestamp: int, Open: double, High: double, Low: double, Close: double, 
#Volume_(BTC): double, Volume_(Currency): double, Weighted_Price: double, Date_Time: timestamp]
#######################

DataFrame[Timestamp: int, Open: double, High: double, Low: double, Close: double, Volume_BTC: double, Volume_Currency: double, Weighted_Price: double, Date_Time: timestamp]

In [14]:
#show 5 rows of the DataFrame
dfsdt.take(5)

[Row(Timestamp=1325317920, Open=4.39, High=4.39, Low=4.39, Close=4.39, Volume_BTC=0.45558087, Volume_Currency=2.0000000193, Weighted_Price=4.39, Date_Time=datetime.datetime(2011, 12, 31, 7, 52)),
 Row(Timestamp=1325317980, Open=nan, High=nan, Low=nan, Close=nan, Volume_BTC=nan, Volume_Currency=nan, Weighted_Price=nan, Date_Time=datetime.datetime(2011, 12, 31, 7, 53)),
 Row(Timestamp=1325318040, Open=nan, High=nan, Low=nan, Close=nan, Volume_BTC=nan, Volume_Currency=nan, Weighted_Price=nan, Date_Time=datetime.datetime(2011, 12, 31, 7, 54)),
 Row(Timestamp=1325318100, Open=nan, High=nan, Low=nan, Close=nan, Volume_BTC=nan, Volume_Currency=nan, Weighted_Price=nan, Date_Time=datetime.datetime(2011, 12, 31, 7, 55)),
 Row(Timestamp=1325318160, Open=nan, High=nan, Low=nan, Close=nan, Volume_BTC=nan, Volume_Currency=nan, Weighted_Price=nan, Date_Time=datetime.datetime(2011, 12, 31, 7, 56))]

In [15]:
gc.collect()

95

### <strong> Exercise 3.</strong> Column names (2 points)


<p align="justify">
<font size="3">
As you can see from the output of the previous exercise the names of the columns still present some problems since there are some parentesis that are not required.
    <ul>
     <li> Remove the not required parentesis from the colum names </li>
     <li> Hint: look at the documentation of DataFrame API and check the operation for column renaming </li>
</ul>
</font>
</p>


In [16]:
# write the command that creates a new Data Frame Spark with correct names for all the columns
# and store the reference in the dfscr variable (Data Frame Spark with Correct Names)

#'''############## WRITE YOUR CODE HERE ##############'''
from pyspark.sql.functions import from_unixtime
dfscr = dfsdt.withColumn("Date_Time", 
                         from_unixtime('TimeStamp',  format='yyyy-MM-dd HH:mm:ss'))

#'''############## END OF THE EXERCISE ##############'''

#show the DataFrame schema
dfscr

#######################
# EXPECTED OUTPUT:
#DataFrame[Timestamp: int, Open: double, High: double, Low: double, Close: double, 
#          Volume_BTC: double, Volume_Currency: double, Weighted_Price: double]
#######################

DataFrame[Timestamp: int, Open: double, High: double, Low: double, Close: double, Volume_BTC: double, Volume_Currency: double, Weighted_Price: double, Date_Time: string]

In [17]:
#show 5 rows of the DataFrame
dfscr.show(5)

#######################
# Expected output:
#+----------+----+----+----+-----+----------+---------------+--------------+-------------------+
#| Timestamp|Open|High| Low|Close|Volume_BTC|Volume_Currency|Weighted_Price|          Date_Time|
#+----------+----+----+----+-----+----------+---------------+--------------+-------------------+
#|1325317920|4.39|4.39|4.39| 4.39|0.45558087|   2.0000000193|          4.39|2011-12-31 07:52:00|
#|1325317980| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:53:00|
#|1325318040| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:54:00|
#|1325318100| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:55:00|
#|1325318160| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:56:00|
#+----------+----+----+----+-----+----------+---------------+--------------+-------------------+

+----------+----+----+----+-----+----------+---------------+--------------+-------------------+
| Timestamp|Open|High| Low|Close|Volume_BTC|Volume_Currency|Weighted_Price|          Date_Time|
+----------+----+----+----+-----+----------+---------------+--------------+-------------------+
|1325317920|4.39|4.39|4.39| 4.39|0.45558087|   2.0000000193|          4.39|2011-12-31 07:52:00|
|1325317980| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:53:00|
|1325318040| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:54:00|
|1325318100| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:55:00|
|1325318160| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:56:00|
+----------+----+----+----+-----+----------+---------------+--------------+-------------------+
only showing top 5 rows



## B.2 DataFrame columns 


<p align="justify">
<font size="3">
    
In this part of the exercise we are going continue to  modify in the Spark DataFrames.

    
Remember that using  PySpark, it's possible to access a DataFrame's columns either by attribute (<code>df.attributeName</code>) or by indexing <code>(df['attributeName'])</code>.
</font>
</p>


<p align="justify">
<font size="3">
    
Loook at the list of the functions to get familiar with the documentation: some functions that can be of help to manipulate the schema:
    
<ul>
     <li>    <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions">Spark Functions</a>.  </li>
</ul>    
    
    
</font>
</p>



### <strong> Exercise 4.</strong>  Add two new columns to the DataFrame (2 points)
    
<p align="justify">
<font size="3">
We want to extend the DataFrame with two other columns: given the $Date\_Time$ column create two new columns ($Year$ and $Month$) that contain 
    <ul>
     <li> the year </li>
     <li> the month of the year </li>
</ul>
    
</font>
</p>
    
    
    

<p align="justify">
<font size="3">    
Look at the documentation of Spark functions and find the two functions that are convenient for this use case (hint: the name of the columns can help: <a href="https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql.html#functions">Spark Functions</a>)
</font>
</p>

In [18]:
#import the functions that you will use

############## WRITE YOUR CODE HERE ##############
############## END OF THE EXERCISE ##############

In [19]:
# write the command that creates a new Data Frame Spark with the two additional columns
# and store the reference in the dfsym variable (Data Frame Spark with Correct Names)

#'''############## WRITE YOUR CODE HERE ##############'''
from pyspark.sql.functions import year, month
dfsym = dfscr.withColumn("Year", year('Date_Time')) 
dfsym = dfsym.withColumn("Month", month('Date_Time'))

#'''############## END OF THE EXERCISE ##############'''


dfsym.show(5)

#######################
# Expected output:
#+----------+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+
#| Timestamp|Open|High| Low|Close|Volume_BTC|Volume_Currency|Weighted_Price|          Date_Time|Year|Month|
#+----------+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+
#|1325317920|4.39|4.39|4.39| 4.39|0.45558087|   2.0000000193|          4.39|2011-12-31 07:52:00|2011|   12|
#|1325317980| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:53:00|2011|   12|
#|1325318040| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:54:00|2011|   12|
#|1325318100| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:55:00|2011|   12|
#|1325318160| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:56:00|2011|   12|
#+----------+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+


+----------+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+
| Timestamp|Open|High| Low|Close|Volume_BTC|Volume_Currency|Weighted_Price|          Date_Time|Year|Month|
+----------+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+
|1325317920|4.39|4.39|4.39| 4.39|0.45558087|   2.0000000193|          4.39|2011-12-31 07:52:00|2011|   12|
|1325317980| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:53:00|2011|   12|
|1325318040| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:54:00|2011|   12|
|1325318100| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:55:00|2011|   12|
|1325318160| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:56:00|2011|   12|
+----------+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+
only showing top 5 rows



In [20]:
gc.collect()

314

###  <strong>Exercise 5.</strong>  Drop Timestamp (2 points)
    
<p align="justify">
<font size="3">
Finally we clean the schema and we can remove the the $Timestamp$ column.
</font>
</p>


In [21]:
# write the command that creates a new DataFrame Spark from the dfsym without the Timestamp column
# and store the reference in the dfc variable (Data Frame Spark Clean)
#'''############## WRITE YOUR CODE HERE ##############'''
dfsc = dfsym.drop("TimeStamp")
#'''############## END OF THE EXERCISE ##############'''


dfsc.show(5)

#######################
# Expected output:
#+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+
#|Open|High| Low|Close|Volume_BTC|Volume_Currency|Weighted_Price|          Date_Time|Year|Month|
#+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+
#|4.39|4.39|4.39| 4.39|0.45558087|   2.0000000193|          4.39|2011-12-31 07:52:00|2011|   12|
#| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:53:00|2011|   12|
#| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:54:00|2011|   12|
#| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:55:00|2011|   12|
#| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:56:00|2011|   12|
#+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+


+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+
|Open|High| Low|Close|Volume_BTC|Volume_Currency|Weighted_Price|          Date_Time|Year|Month|
+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+
|4.39|4.39|4.39| 4.39|0.45558087|   2.0000000193|          4.39|2011-12-31 07:52:00|2011|   12|
| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:53:00|2011|   12|
| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:54:00|2011|   12|
| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:55:00|2011|   12|
| NaN| NaN| NaN|  NaN|       NaN|            NaN|           NaN|2011-12-31 07:56:00|2011|   12|
+----+----+----+-----+----------+---------------+--------------+-------------------+----+-----+
only showing top 5 rows



#  C. Using Parquet

<p align="justify">
<font size="3">
In order to gain in performance in the following it is a good idea, as we have seen at lesson, to use a NoSQL structure, here Parquet, that will 
    allow 
to partition the SparkDataframe and to store it in multiple Parquet files. 
</font>
</p>

<p align="justify">
<font size="3">
Parquet is a columnar storage format available to any project in the Hadoop ecosystem, regardless of the choice of data processing framework, data model or programming language.
</font>
</p>

## C.1 Saving data in Parquet
    
For this first example partition the file according to:
    
 <ul>
     <li> the year </li>
             <li> the month of the year </li>
</ul>
The $partitionBy()$ operation can help for this step (Documentation of reference: <a href="https://spark.apache.org/docs/latest/sql-data-sources-parquet.html">Spark Functions</a>).
</font>
</p>



In [22]:
dfs

DataFrame[Timestamp: int, Open: double, High: double, Low: double, Close: double, Volume_BTC: double, Volume_Currency: double, Weighted_Price: double]

In [23]:
# here you can see and check the command that saves the dfsc DataFrame in Parquet

dfsc.write.partitionBy(["Year", "Month"]).parquet("BTC/",mode='overwrite')


print("write to Parquet done")


write to Parquet done


##  C.2 Check the folder Structure

 
<p align="justify">
<font size="3">
Look at the folder structure that has been created for the storage of the file. You see how the partitioning stategy of Parquet and the data distribution of Spark can be used, explicitely or implicitely, to improve performance.

While you navigate (and the folder structure) data remember that in the data access:
    
 <ul>
     <li> the navigation is done using Parquet </li>
     <li> the leaf contain the encoded Parquet files </li>
</ul>
</font>
</p>




In [None]:
#BTC
#        ├── Year=2011
#        │   ├── ...
#        │   │
#        │   ├── month=12
#        ├── Year=2012
#        │   ├── month=1
#        │   ├── ...
#        │   │
#       ...
        


This folder structure correspond to a phisical and logical data partition and 

# D. Pandas

 
<p align="justify">
<font size="3">
This data organization opens the opportunity to read data also using Pandas and not using Parquet.
    
Look at the documentation and check how you can read a Parquet structure and store it in a Pandas DataFrame:
<a href="https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.read_parquet.html">Pandas and Parquet</a>

Notice how at the data-exchange base there is the presence of Arrow (thanks to $pyarrow$).
</font>
</p>


<p align="justify">
<font size="3">
Write the command that using Pandas read the data for the year 2011.
    
</font>
</p>

In [24]:
#import of pandas
import pandas as pa

In [25]:
# Here we show you how we can create DataFrame using Pandas functions and reading from Parquet the data only for the year 2011/


df = pa.read_parquet("BTC/Year=2011")

df
#######################
# Check the expected output:
#Open	High	Low	Close	Volume_BTC	Volume_Currency	Weighted_Price	Date_Time	Month
#0	4.39	4.39	4.39	4.39	0.455581	2.0	4.39	2011-12-31 07:52:00	12
#1	NaN	NaN	NaN	NaN	NaN	NaN	NaN	2011-12-31 07:53:00	12
#2	NaN	NaN	NaN	NaN	NaN	NaN	NaN	2011-12-31 07:54:00	12
#3	NaN	NaN	NaN	NaN	NaN	NaN	NaN	2011-12-31 07:55:00	12
#...

Unnamed: 0,Open,High,Low,Close,Volume_BTC,Volume_Currency,Weighted_Price,Date_Time,Month
0,4.39,4.39,4.39,4.39,0.455581,2.0,4.39,2011-12-31 07:52:00,12
1,,,,,,,,2011-12-31 07:53:00,12
2,,,,,,,,2011-12-31 07:54:00,12
3,,,,,,,,2011-12-31 07:55:00,12
4,,,,,,,,2011-12-31 07:56:00,12
...,...,...,...,...,...,...,...,...,...
963,,,,,,,,2011-12-31 23:55:00,12
964,,,,,,,,2011-12-31 23:56:00,12
965,,,,,,,,2011-12-31 23:57:00,12
966,,,,,,,,2011-12-31 23:58:00,12


###  D.1 Read Parquet file
    
<p align="justify">
<font size="3">
Here you can see now the the Spark DataFrame is created from Parquet data.
</font>
</p>

In [26]:
# And here how we can create a DataFrame using Spark and reading the whole data/

dfs = spark.read.parquet("BTC/")

print("read done")

read done


In [27]:
gc.collect()

65

## <strong>Exercise 6</strong>. Verify number of column and count the number of rows (2 points)
    
<p align="justify">
Maybe you have not noticed that the volume of data we are treating is not so small as it seems. 
Count how many rows we are manipulating in the dataframe <code>dfs</code>
<font size="3">
</font>
</p>

In [28]:
# Write the command that returns the number of rows of the DataFrame

#'''############## WRITE YOUR ANSWER HERE ##############'''
count = dfs.count()
#'''############## END OF THE EXERCISE ##############'''

print(count)

#######################
# Expected output:
# 4857377

187124


In [29]:
#We can also check and verify the schema of the DataFrame
dfs.printSchema()

root
 |-- Open: double (nullable = true)
 |-- High: double (nullable = true)
 |-- Low: double (nullable = true)
 |-- Close: double (nullable = true)
 |-- Volume_BTC: double (nullable = true)
 |-- Volume_Currency: double (nullable = true)
 |-- Weighted_Price: double (nullable = true)
 |-- Date_Time: string (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)



# E. Statistics

<p align="justify">
<font size="3">
We want to calculate the statistics of the bitcoin by month for all the years.

The computed statistics will be stored in a DataFrame having this schema
<ul>
     <li>   Mean_Vol  : double </li>
     <li>   Std_Vol   : double </li>
     <li>   Min_Vol   : double </li>
     <li>   Max_vol   : double </li>
     <li>   Year      : int </li>
     <li>   Month     : int </li>
  
</ul>

In this exercise you will have two develop different methodologies to compute the statistics:
<ul>
    <li>   using the <code>applyInPandas()</code> Pyspark function and the Pandas functions </li>
     <li>  only using the Pyspark functionnalities </li>
</ul>
The statistics computed should be stored in a Pandas DataFrame with both the two approaches.
</font>
</p>

## E.1. Spark applyinPandas
<p align="justify">
<font size="3">
The solution with $applyinPandas$ 
</font>
</p>

In [30]:
# the Python function that must be used. 
def compute_stats(key,df):
    res = df["Volume_BTC"].describe()

    res_dict = {}
    for index, value in res.items():

        if index == "mean":
            res_dict["Mean_Vol"] = value
        elif index == "std":
            res_dict["Std_Vol"] = value
        elif index == "min":
            res_dict["Min_Vol"] = value
        elif index == "max":
            res_dict["Max_Vol"] = value

    final =  pa.DataFrame([res_dict])
    final["Year"]  = key[0]
    final["Month"] = key[1]
    
    return final

### <strong>Exercise 7</strong>. The two parameters of the Python function (2 points)
The two parameters of the Python <code>applyinPandas(funct,schema)</code> function (2 points)
<p align="justify">
<font size="3">
    Look at the documentation of the <code>applyinPandas(funct,schema)</code> (<a href="https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.GroupedData.applyInPandas.html">click here to go to the documentation of <code>applyinpandas</code></a>) and describe how it works in detail from the DataFrame point of view in our example (what the $key$ and the $df$ will contain in our example).

</font>
</p>

#### apply in pandas takes a pandas.DataFrame and return another pandas.DataFrame. For each group, year and month, all columns are passed together as a pandas.DataFrame to the compute stats function and the returned pandas.DataFrame are combined as a DataFrame.

### <strong>Exercise 8</strong>. The two parameters in action (1 point)
<p align="justify">
<font size="3">
Compute the statistics using then the $applyInPandas$ and the provided functions. 

</font>
</p>

In [38]:
schema = "Mean_Vol double, Std_Vol double, Min_Vol double, Max_Vol double, Year int, Month int"

# Write the command that will store in the variable statsdf the DataFrame 
#from pyspark.sql import applyInPandas
#'''############## WRITE YOUR ANSWER HERE ##############'''
statsdf = dfs.groupby('Year','Month').applyInPandas(compute_stats, schema = schema) 

#'''############## END OF THE EXERCISE ##############'''

statsdf.show(5)


####### EXPECTED OUTPUT
#+------------------+------------------+----------+------------+----+-----+
#|          Mean_Vol|           Std_Vol|   Min_Vol|     Max_Vol|Year|Month|
#+------------------+------------------+----------+------------+----+-----+
#| 20.39613620802532| 54.24699556644988|    9.4E-5|2258.8231405|2012|   10|
#|12.095179597807542|44.149334198665166| 2.0452E-4|2037.2239038|2015|    2|
#| 6.147061206279663|17.745599117954125|0.00127783|564.21436237|2019|   10|
#| 8.468866447160776|  28.9837002907642|    1.0E-8|1616.0600006|2017|    3|
#| 8.684880075589284| 17.69646210434965|       0.0|533.10078293|2017|    8|
#+------------------+------------------+----------+------------+----+-----+#

+-----------------+------------------+----------+------------+----+-----+
|         Mean_Vol|           Std_Vol|   Min_Vol|     Max_Vol|Year|Month|
+-----------------+------------------+----------+------------+----+-----+
|21.68391320646051| 36.39353845336437|0.00127551|300.51609759|2012|    4|
|     23.829469525|22.711132615895384|0.45558087|        48.0|2011|   12|
| 4.03177731714571| 6.740555326924376|      0.02| 43.31219578|2012|    1|
|8.313992791715775|11.924511042377938|0.00313838| 92.65487394|2012|    2|
|15.19779132870153|26.505989960881884|0.00209644|247.56012381|2012|    3|
+-----------------+------------------+----------+------------+----+-----+
only showing top 5 rows



In [32]:
gc.collect()

86

### <strong>Exercise 9</strong>. The statsdf DataFrame (1 point)
<p align="justify">
<font size="3">
Which kind of DataFrame is statsdf?
</font>
</p>

In [39]:
type(statsdf)

pyspark.sql.dataframe.DataFrame

#### pyspark.sql.dataframe.DataFrame


### <strong>Exercise 10</strong>. DataFrame in Pandas (2 points)


<p align="justify">
<font size="3">
Since we computed a stat by month the results will be small (we will have only one row by month)
we can get and handle all the results in memory in Pandas.  
    
Notice that Spark is lazy so the $toPandas$ action will trigger the computation.
    
Write the command that will do the operation.
    
</font>
</p>



In [40]:
%%time

# Write the command that will store in the variable stats_dfp the outoput DataFrame 

#'''############## WRITE YOUR ANSWER HERE ##############'''
stats_dfp = statsdf.toPandas()
# stats_dfp = dfs.groupby('Month').applyInPandas(compute_stats_1, 
#             schema = schema).toPandas()
#'''############## END OF THE EXERCISE ##############'''

CPU times: user 40.6 ms, sys: 7.65 ms, total: 48.3 ms
Wall time: 1.75 s


In [41]:
#results
stats_dfp.head(10)

#######################
# Expected output:
#	Mean_Vol	Std_Vol	Min_Vol	Max_Vol	Year	Month
#0	20.396136	54.246996	9.400000e-05	2258.823141	2012	10
#1	12.095180	44.149334	2.045200e-04	2037.223904	2015	2
#2	6.147061	17.745599	1.277830e-03	564.214362	2019	10
#3	8.468866	28.983700	1.000000e-08	1616.060001	2017	3
#4	8.684880	17.696462	0.000000e+00	533.100783	2017	8
#5	16.040933	57.641501	2.044000e-05	4111.876106	2014	4
#6	4.984386	18.903445	1.054000e-05	822.866974	2020	6
#7	8.331579	18.350084	5.758000e-04	806.636224	2019	5
#8	8.621910	18.820399	4.047000e-04	602.282607	2017	10
#9	3.106413	10.738051	3.300000e-06	582.564185	2018	10

Unnamed: 0,Mean_Vol,Std_Vol,Min_Vol,Max_Vol,Year,Month
0,21.683913,36.393538,0.001276,300.516098,2012,4
1,23.82947,22.711133,0.455581,48.0,2011,12
2,4.031777,6.740555,0.02,43.312196,2012,1
3,8.313993,11.924511,0.003138,92.654874,2012,2
4,15.197791,26.50599,0.002096,247.560124,2012,3
5,21.867528,38.525832,0.007482,384.988,2012,5


In [36]:
gc.collect()

112

###  <strong>Exercise 11</strong>. Show the stats of the stats (1 point)


<p align="justify">
<font size="3">
We want to calculate the statistics of the bitcoin by month for all the years.

The computed statistics will be stored in a DataFrame having this schema
<ul>
     <li>   the min of the set min values </li>
     <li>   the mean of the set of mean values </li>
     <li>   ... </li> 
</ul>


    
</font>
</p>


In [57]:
# Write the command that will show and compute the stats on the numerical columns of the statsdf DataFrame

#'''############## WRITE YOUR ANSWER HERE ##############'''

statsdf.describe()
#'''############## END OF THE EXERCISE ##############'''



#######################
# Expected output:
#+-------+------------------+------------------+--------------------+------------------+------------------+------------------+
#|summary|          Mean_Vol|           Std_Vol|             Min_Vol|           Max_Vol|              Year|             Month|
#+-------+------------------+------------------+--------------------+------------------+------------------+------------------+
#|  count|               112|               112|                 112|               112|               112|               112|
#|   mean|10.782191354847754|28.871463944232485|0.004551177678571...|1067.2847720235718|2016.0892857142858| 6.428571428571429|
#| stddev| 6.488551661205522| 18.11344145463867|0.043048607639448476| 895.8083462469303|2.7164947320662614|3.5252353718985097|
#|    min| 2.929999689326444| 6.490701567379118|                 0.0|       43.31219578|              2011|                 1|
#|    max|31.504423573146152|106.97606692383131|          0.45558087|      5853.8521659|              2021|                12|
#+-------+------------------+------------------+--------------------+------------------+------------------+------------------+


DataFrame[summary: string, Mean_Vol: string, Std_Vol: string, Min_Vol: string, Max_Vol: string, Year: string, Month: string]

# F. Plotting and equivalence 
<p align="justify">
<font size="3">
We want to plot the resutls of the statistics by year and month (that will be in the $x$ orizontal axis of the plot). 

$Plotly$ will be used for the plotting
    

This provided version of the code is fully working in Python.
    

A Python routine converts the two columns $Year$ and $Month$ into a $DateTime$ column 'Date' (in order to plot the data in relation with the date).
    
</font>
</p>



In [43]:
#install plotly and import the libraries

!pip install plotly

from plotly.offline import iplot,init_notebook_mode
import plotly.graph_objects as go

init_notebook_mode(connected=True)



In [44]:
#Helper function that converts the Year Month of our data into Date type

def get_date_from_year_month(df):
    df["Date"] = pa.to_datetime(df['Year'].astype(str) + '-' + df['Month'].astype(str), format='%Y-%m')
    return df
     

In [45]:
# In this phase we need to sort by the date to allow parallelisation of shuffled the results

stats_dfp = get_date_from_year_month(stats_dfp)    
stats_dfp.sort_values(by = 'Date',inplace = True)
stats_dfp

Unnamed: 0,Mean_Vol,Std_Vol,Min_Vol,Max_Vol,Year,Month,Date
1,23.82947,22.711133,0.455581,48.0,2011,12,2011-12-01
2,4.031777,6.740555,0.02,43.312196,2012,1,2012-01-01
3,8.313993,11.924511,0.003138,92.654874,2012,2,2012-02-01
4,15.197791,26.50599,0.002096,247.560124,2012,3,2012-03-01
0,21.683913,36.393538,0.001276,300.516098,2012,4,2012-04-01
5,21.867528,38.525832,0.007482,384.988,2012,5,2012-05-01


In [46]:
# PLOTTING OF THE MEAN VOLUME BY MONTH
mean_vol_trac = {
    "x": stats_dfp.Date,
    "y": stats_dfp["Mean_Vol"],
}

layout = {
  "height":1000,
  "showlegend": True, 
  "title": "Average Volume by Month of BTC",
}

fig = go.Figure(data=[mean_vol_trac], layout=layout)
fig.show(renderer="colab")

###  <strong> Exercise 12 </strong> - Compute the statistics using Pyspark (1 point)


<p align="justify">
<font size="3">
We want to calculate the statistics of the bitcoin as we did before but using Pandas.

The steps will be:
<ul>
     <li>   import data from Parquet in a Spark DataFrame </li>
     <li>   remove null values </li>
     <li>   perform the aggregation of the results </li> 
     <li>   convert the results to Pandas </li> 
 
</ul>


    
</font>
</p>

In [50]:
%%time
# solution to compute the statistics using pyspark function 

from pyspark.sql.functions import min, max, mean, stddev
import pyspark.sql.functions as F


# full spark dataframe (recall exercise 8a)
df_spark = spark.read.parquet("BTC/") 

# the na drop is important to be able to compute properly the stats
# look at the documentation of the na.drop function
group_ym = df_spark.na.drop().select(["Volume_BTC","Year","Month"]).groupBy(["Year","Month"])

#'''############## WRITE YOUR ANSWER HERE ##############'''
           
    
# aggregation 
# notice that the argument of the agg function is strictly related to min Vol, max Vol, mean, and stddev.
res_df = group_ym.agg(F.count("Volume_BTC").alias('count'), 
             F.mean("Volume_BTC").alias('mean'), 
             F.stddev("Volume_BTC").alias('std'), 
             F.min("Volume_BTC").alias('min'), 
             F.max("Volume_BTC").alias('max'))

# # #conversion the results to pandas
stats_dfs = res_df.toPandas()

# #'''############## END OF THE EXERCISE ##############'''

stats_dfs = get_date_from_year_month(stats_dfs)    
stats_dfs.sort_values(by = 'Date',inplace = True)
stats_dfs

CPU times: user 42.3 ms, sys: 5.99 ms, total: 48.3 ms
Wall time: 2.25 s


### Extra. Equivalence of results



<p align="justify">
<font size="3">
Now that you have seen the two procedures to get the results you must compare the outputs:
<ul>
     <li>   verify if the pandas dataframe from applyInPandas and PySpark functions are equivalents (look at the documentation to find the function that asserts if two DataFrames are equals) </li> 
         <li> compare the processing time between applyInPandas and PySpark routine with functions (that we have visualised with the %%time function) and comment them.</li> 
 
</ul>


    
</font>
</p>



In [None]:
# verify if the pandas dataframe from applyInPands and PySpark functions are equivalents 
# compare the processing time between applyInPandas and PySpark function
cols = ['Mean_Vol', 'Std_Vol', 'Min_Vol', 'Max_Vol', 'Year', 'Month', 'Date']
 

#'''############## WRITE YOUR ANSWER HERE ##############'''

# equivalence verification: look at the pandas API and check if there is any test/assertion operation of help


# comment about time execution and draw your considerations
#'''############## END OF THE EXERCISE ##############'''


### Extra - Plotting the financial data (1 point)



<p align="justify">
<font size="3">
Now that you have seen some examples you can draw your graphs:
<ul>
     <li>   filter the global data frame fron Parquet and take only the first day of the year 2021 </li> 
         <li> convert it to a pandas dataframe </li> 
         <li>    display the data using the $plot_candlestick$ routine </li> 
 
</ul>


    
</font>
</p>



In [None]:
#this function helps you to display the candlestick ( representation of financial data) of the pandas dataframe

def plot_candlestick(df):
    trace = {
      "x": df.Date_Time,
      "close": dfp["Open"],
      "decreasing": {"line": {"color": "#008000"}}, 
      "high":df["High"] ,
      "increasing": {"line": {"color": "#db4052"}}, 
      "low": df["Low"],
      "name": "BTC", 
      "open": df["Close"],
      "type": "candlestick"
    }

    layout = {
      "height":1000,
      "showlegend": True, 
      "title": "Technical Analysis",
    }
    
    fig = go.Figure(data=[trace], layout=layout)
    fig.show(renderer="colab")

In [None]:
# Exercise filter the spark dataframe by date 
import datetime as dt


#'''############## WRITE YOUR ANSWER HERE ##############'''
#read the global dataframe (as usual)
df_spark = 

#create the beginning end date 
beg = 
end = 


#create the filter
df_spark_filtered = 

#apply and convert it to pandas
dfp = 
#'''############## END OF THE EXERCISE ##############'''



In [None]:
plot_candlestick(dfp)

####  Extra - Propose your analysis



<p align="justify">
<font size="3">
Think about a new analysis on this set of data to run on your data and run it showing a graph
</font>
</p>



In [None]:
#'''############## WRITE YOUR ANSWER HERE ##############'''

#'''############## END OF THE EXERCISE ##############'''

### Conclusion 

<p align="justify">
<font size="3">
$ApplyinPandas$ can be very powerful when you need to apply advanced Python code or Python libraries (i.e. <a href="https://scikit-learn.org/stable/">scikit-learn</a>  otherwise you can use Pyspark routines relying on most powerful storage techniques for example using Parquet.

    
</font>
</p>