<a href="https://colab.research.google.com/github/Fuenfgeld/2022TeamADataEngineeringBC/blob/main/PySparkTutorial.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

##0. Data Engineering Bootcamp
In this tutorial, you will be introduced to an aspect of Data Engineering called ETL. Together we will implement an ETL workflow with Apache Spark in Python. By the end of the tutorial, you will be able to adapt such a workflow to your specific needs and know the benefits of using Spark in doing so.

### 0.1 What is Data Engineering ?

Data engineering is the practice of designing and building systems for collecting, storing, and analyzing data at scale. Data engineers work in a variety of settings to build systems that collect, manage, and convert raw data into usable information for data scientists and business analysts to interpret. Their ultimate goal is to make data accessible so that organizations can use it to evaluate and optimize their performance. This last sentence also sums up the difference between a data engineer and a data analyst, whereas the former manages the data resources, the latter exploits them to gain valuable insights.

### 0.2 What is ETL ?

According to IBM, ETL, which stands for extract, transform and load, is a data integration process that combines data from multiple data sources into a single, consistent data store. It is closely linked with the concept of a *Data Warehouse* that describes central repositories of integrated data from one or more disparate sources. 

#### Extraction
During data extraction, raw data is copied or exported from source locations from a variety of data sources, which can be structured or unstructured such as SQL databases, JSON files or even web pages.
#### Transformation
The collected raw data then undergoes data processing. Here, the data is transformed and consolidated for its intended analytical use case. Steps taken during transformation are de-duplicating values, performing calculations, translations, or summarizations based on the raw data and changing the shape of the data via joining and grouping operation in order to match the schema of the target data warehouse. The environment in which the transformation step is performed is also called *staging area*.
#### Loading
In this last step, the transformed data is moved from the staging area into a target data warehouse. Typically, this involves an initial loading of all data, followed by periodic loading of incremental data changes 

### 0.3 What is Spark ?

According to the official website

>*Apache Spark is a multi-language engine for executing data engineering, data science, and machine learning on single-node machines or clusters.*

Now, what does that mean ? You can think of Spark as a programming library that allows you to outsource your data engineering workflow to a set of servers (cluster) which enables you to parallelize operations, enabling faster execution and the ability to work with amounts of data that couldn't be handled on a single computer (Big Data). 

Hence, what Spark does is managing the interaction between your local  node (computer) and each node (server) of the cluster. Since Spark was originally written in Scala, there is no direct way to access its functionality in Python. This is where *PySpark* comes into play. You can think of PySpark as a Python-based wrapper on top of the Scala API. There are also similar wrappers for *R* and other programming languages, this is why the official website describes Spark as a *multi-language engine*. 

#### SparkSQL

Although the *Resilient Distributed Dataset* (RDD) is the  fundamental data structure on which all higher-level data structures are constructed in Spark, this tutorial is going to focus on the *DataFrame* from the SparkSQL model which deals with structured data such as .JSON and .csv files. 

The DataFrame has two big advantages over the RDD. First it has  significant performance benefits over RDDs due to a powerful optimization engine and secondly important data science module such as *spark.streaming* an *spark.ml* work with DataFrames instead of RDDs.
 
## 0.4. Conclusion.
You learned what data engineering is, how an ETL workflow is structured and what role Spark plays in such a  context. Now let's get started with coding stuff. 

The next three lines of code will make Pyspark and all the relevant data you need to finish this tutorial available in your Colab notebook.


In [2]:
!pip install pyspark pandas

Collecting pyspark
  Downloading pyspark-3.2.1.tar.gz (281.4 MB)
[K     |████████████████████████████████| 281.4 MB 36 kB/s 
Collecting py4j==0.10.9.3
  Downloading py4j-0.10.9.3-py2.py3-none-any.whl (198 kB)
[K     |████████████████████████████████| 198 kB 66.6 MB/s 
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.2.1-py2.py3-none-any.whl size=281853642 sha256=db72071d16de98cfcb5459ba6c2e815f3bb2658c69d9e6a5447bca6376b62dcb
  Stored in directory: /root/.cache/pip/wheels/9f/f5/07/7cd8017084dce4e93e84e92efd1e1d5334db05f2e83bcef74f
Successfully built pyspark
Installing collected packages: py4j, pyspark
Successfully installed py4j-0.10.9.3 pyspark-3.2.1


In [3]:
!wget -cq https://raw.githubusercontent.com/Fuenfgeld/2022TeamADataEngineeringBC/ca4b2ecc9e9ee242037d11c27edd4f4ad770e7ee/iris.json

In [4]:
!wget -cq https://raw.githubusercontent.com/Fuenfgeld/2022TeamADataEngineeringBC/main/iris2.json

##1. Getting Started 

Let's get started building our first Spark application.At the core of our Spark application is the *SparkSession* object,which acts as a point of entry to interact with underlying Spark
functionalities.

Usually, Spark would delegate the computation jobs to the so-called *executors* (CPUs in each node of the cluster) through the so-called *driver*.Since this notebook isn't connected to a cluster,the jobs will be performed locally, though.The concept however is still the same,just be aware that we aren't exploiting Sparks full capabilities here for practical reasons (Clusters are not trivial to deal with).


In [5]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

Now that our Spark session is initialized we can load in some data to work with. Spark can handle data from all kinds of sources such as .json files, .csv files and even access data from AWS or Azure via dedicated interfaces.

### 1.1. Schemas and Creating DataFrames
The core object of pyspark.sql is the *DataFrame* which stores the information obtained from external sources (.JSON, .csv files etc.) or created by the user within Python. To the humans eye the DataFrame behaves like a table, computationally however it is a more complex structure due to the distributed nature of spark objects.

Before we load in our first dataset in such a DataFrame we should talk about so-called *schemas*. When reading in structured data from an external source, it is often known to the user how the data is structured, for instance what the names and data types of the columns are. In contrast to the user however, Spark doesn't have any idea about what the types and columns are going to look like.

Defining and including a schema in the data loading pipeline in line `3` makes Spark aware of the structure of the data before loading it. This has three big advantages:

1.   You relieve Spark from the onus of inferring data types.

2.   You prevent Spark from creating a separate job just to read a large portion of your file to ascertain the schema, which for a large data file can be expensive and time-consuming.

3. You can detect errors early if data doesn’t match the schema.

It is therefore encouraged to define a schema upfront, especially when dealing with large sources of data. One easy way to define a schema in PySpark is via a so-called *Data Definition Language* (DDL) string, as shown in line `1`.

 Since  `iris.json` is a large file with multiple lines, we also want to inform PySpark about this fact by adding `.option("multiline",True)` to the pipeline. Although the second option in the pipeline instructing PySpark to make a header isn't necessary for .JSON files, it is for .csv files, and we include it for the sake of completeness.

In [6]:
schema = "petalLength DOUBLE, petalWidth DOUBLE, sepalLength DOUBLE, sepalWidth DOUBLE, species STRING"

df1 = (spark.read.option("multiline",True)
                 .option("header",True)
                 .schema(schema)
                 .json('iris.json'))

print(f"Object Type: {type(df1)}\n")
print("Column Info:")
df1.printSchema()
print("Summary Statistics of columns:")
df1.describe().show()
print("Overview Dataframe:")
df1.show(10)

Object Type: <class 'pyspark.sql.dataframe.DataFrame'>

Column Info:
root
 |-- petalLength: double (nullable = true)
 |-- petalWidth: double (nullable = true)
 |-- sepalLength: double (nullable = true)
 |-- sepalWidth: double (nullable = true)
 |-- species: string (nullable = true)

Summary Statistics of columns:
+-------+------------------+------------------+------------------+-------------------+---------+
|summary|       petalLength|        petalWidth|       sepalLength|         sepalWidth|  species|
+-------+------------------+------------------+------------------+-------------------+---------+
|  count|               150|               150|               150|                150|      150|
|   mean|3.7580000000000027| 1.199333333333334| 5.843333333333335|  3.057333333333334|     null|
| stddev|1.7652982332594662|0.7622376689603467|0.8280661279778637|0.43586628493669793|     null|
|    min|               1.0|               0.1|               4.3|                2.0|   setosa|
|    m

You might have wondered why we called `.show()` behind `.describe()` in line 6, especially if you are familiar with *Pandas*. The reason is the so called *Lazy Execution* where code is only executed  once so called *actions* are called. The next chapter will begin introducing these concepts in more depth. We will also get to know *transformations*, code that changes the structure and entries of DataFrames.

##2. Actions and Basic Transformations

Spark operations on distributed data can be classified into two types: *transformations* and *actions*. Transformations, as the name suggests, transform a Spark DataFrame into a new DataFrame without altering the original data, giving it the property of *immutability*. 


All transformations are evaluated lazily. That is, their results are not computed immediately, but they are recorded or remembered as a *lineage*. A recorded lineage allows Spark, at a later time in its execution plan, to rearrange certain transformations or optimize them into stages for more efficient execution. 

Because Spark records each transformation in its lineage and the DataFrames are immutable between transformations, it can reproduce its original state by simply replaying the recorded lineage, giving it resiliency in the event of failures.

Now let's set up some simple Data Engineering workflows in PySpark using actions and transformations.

### 2.1. Accessing Rows and Columns

Since Spark was conceived to work with distributed data, there is no simple way to access rows at will.

If you want to do so anyway, you have the possibility to pull the data onto your local node.

The action `.collect()` collects the distributed data to the driver side as local data in Python. Note that this can throw an out-of-memory error when the dataset is too large to fit in the driver side because it collects all the data from executors to the driver side.

Other actions to get only a subset of the  data to your driver's side are `.take()` and `.sample()`.

In [7]:
# Returns list of Row objects
local_df1 = df1.collect()
print(f"Type of entries: {type(local_df1[0])}\n")
print(f"Entries: {local_df1[:5]}")

Type of entries: <class 'pyspark.sql.types.Row'>

Entries: [Row(petalLength=1.4, petalWidth=0.2, sepalLength=5.1, sepalWidth=3.5, species='setosa'), Row(petalLength=1.4, petalWidth=0.2, sepalLength=4.9, sepalWidth=3.0, species='setosa'), Row(petalLength=1.3, petalWidth=0.2, sepalLength=4.7, sepalWidth=3.2, species='setosa'), Row(petalLength=1.5, petalWidth=0.2, sepalLength=4.6, sepalWidth=3.1, species='setosa'), Row(petalLength=1.4, petalWidth=0.2, sepalLength=5.0, sepalWidth=3.6, species='setosa')]



Accessing columns doesn't come with the difficulties associated with handling rows. If we want to get specific columns we can simply do so through the `.select()` method. 

In [8]:
df1.select("petalLength").show(5)

+-----------+
|petalLength|
+-----------+
|        1.4|
|        1.4|
|        1.3|
|        1.5|
|        1.4|
+-----------+
only showing top 5 rows



### 2.2 Adding and Removing  Columns

In case we want to add columns we can do so via the `.withColumn()` method. Note that we have to specify the name of the column which is in this case `petalSum`. Usually, the new column is a function of one or more of the old columns. 

In [9]:
df_extraCol = df1.withColumn('newColumn', df1.petalWidth + df1.petalLength)
df_extraCol.show(5)

+-----------+----------+-----------+----------+-------+------------------+
|petalLength|petalWidth|sepalLength|sepalWidth|species|         newColumn|
+-----------+----------+-----------+----------+-------+------------------+
|        1.4|       0.2|        5.1|       3.5| setosa|1.5999999999999999|
|        1.4|       0.2|        4.9|       3.0| setosa|1.5999999999999999|
|        1.3|       0.2|        4.7|       3.2| setosa|               1.5|
|        1.5|       0.2|        4.6|       3.1| setosa|               1.7|
|        1.4|       0.2|        5.0|       3.6| setosa|1.5999999999999999|
+-----------+----------+-----------+----------+-------+------------------+
only showing top 5 rows



The name `'newColumn'` isn't really informative. It's therefore hard for the user to deduce that is it the sum of `'petalWidth'` and `'petalLength'`. So why not rename it to something more indicative ? We can do this via the `.withColumnRenamed()` method.

In [10]:
df_extraCol = df_extraCol.withColumnRenamed('newColumn','petalSum')
df_extraCol.show(5)

+-----------+----------+-----------+----------+-------+------------------+
|petalLength|petalWidth|sepalLength|sepalWidth|species|          petalSum|
+-----------+----------+-----------+----------+-------+------------------+
|        1.4|       0.2|        5.1|       3.5| setosa|1.5999999999999999|
|        1.4|       0.2|        4.9|       3.0| setosa|1.5999999999999999|
|        1.3|       0.2|        4.7|       3.2| setosa|               1.5|
|        1.5|       0.2|        4.6|       3.1| setosa|               1.7|
|        1.4|       0.2|        5.0|       3.6| setosa|1.5999999999999999|
+-----------+----------+-----------+----------+-------+------------------+
only showing top 5 rows



In order to get rid of our new column `.drop()` can be used. In contrast to `.select()`, this method removes the specified column completely instead of returning it as slice ot the table.




In [11]:
df1 = df_extraCol.drop(df_extraCol.petalSum)
df1.show(5)

+-----------+----------+-----------+----------+-------+
|petalLength|petalWidth|sepalLength|sepalWidth|species|
+-----------+----------+-----------+----------+-------+
|        1.4|       0.2|        5.1|       3.5| setosa|
|        1.4|       0.2|        4.9|       3.0| setosa|
|        1.3|       0.2|        4.7|       3.2| setosa|
|        1.5|       0.2|        4.6|       3.1| setosa|
|        1.4|       0.2|        5.0|       3.6| setosa|
+-----------+----------+-----------+----------+-------+
only showing top 5 rows



### 2.3. Basic Data Cleaning

Just as in the hospital,hygiene is of great importance to working with data,sometimes rows contain entries that make dealing with our data more difficult or lower its quality (information pollution).Two examples come to mind: Duplicate entries could bias introduce into our data,which negatively impacts the performance of a lot of machine learning algorithms.

The second example would be null entries, which might render some rows useless due to the fact that most algorithms generally can't handle such entries. Luckily, PySpark provides us with two methods `.dropna()` and `.dropDuplicates()` to get rid of such problematic rows.


In [12]:
(df1.select("petalWidth")
    .withColumnRenamed("petalWidth", "pW")
    .dropna()
    .dropDuplicates()
    .show(n=5))

+---+
| pW|
+---+
|2.4|
|0.2|
|1.4|
|1.7|
|2.3|
+---+
only showing top 5 rows



Although our dataframe is now free of unwanted entries we might still want to put further restrictions on the data we want to keep. 

### 2.4. Conditional Selection of Rows.

In 2.1. We explained that directly accessing rows of a DataFrame comes with some caveats, it is however possible to indirectly access rows without pulling all the data onto your local node. This is done via conditional selection, where we select rows based on user given conditions via the `.filter()` method. This means however that we don't know which rows we will obtain in the end, hence why we speak of indirect access.

Let's say we want to get only the flowers of type `"virginica"` we then have to write the following:

In [13]:
(df1.select("species", "petalWidth")
    .filter(df1.species == "virginica")
    .dropDuplicates()
    .show(5))             

+---------+----------+
|  species|petalWidth|
+---------+----------+
|virginica|       2.2|
|virginica|       1.7|
|virginica|       2.5|
|virginica|       2.4|
|virginica|       1.6|
+---------+----------+
only showing top 5 rows



We can also filter based on multiple conditions. In the next example we will use the alias `.where()` instead of `.filter()`. Note that both aliases are used, with `.where()` coming from SQL syntax.

In [14]:
(df1.select("species", "petalWidth", "petalLength")
    .where((df1.species == "setosa") & (df1.petalLength > 1.3))
    .withColumn("petalSum", df1.petalWidth + df1.petalLength)
    .dropDuplicates()
    .describe()
    .show(5))

+-------+-------+-------------------+-------------------+-------------------+
|summary|species|         petalWidth|        petalLength|           petalSum|
+-------+-------+-------------------+-------------------+-------------------+
|  count|     16|                 16|                 16|                 16|
|   mean|   null|0.30000000000000004| 1.5999999999999999| 1.9000000000000001|
| stddev|   null| 0.1414213562373095|0.15916448515084428|0.24221202832779934|
|    min| setosa|                0.1|                1.4|                1.5|
|    max| setosa|                0.6|                1.9|                2.3|
+-------+-------+-------------------+-------------------+-------------------+



We can now select records based on our conditions. Records that don't fulfill the conditions will be discarded. This binary behavior ("keep or not") only gets us so far. What if we want to transform the data in such a way that all records fulfilling a certain condition get mapped to 1 and all others to zero ? 

### 2.5. Conclusion

You learned how to perform some basic transformations of the table, but maybe you also want to apply more complex functions to the DataFrame's rows or columns, such as summary statistics. In the next chapter, we are going to take a look at advanced transformations.


##3. Functions and  Advanced Transformations
Advanced transformations are where PySpark really shines, enabling us to execute very complex queries using simple syntax to extract valuable insights from our data. In this chapter we will see the power of methods such as `.groupBy()`, `.join()` especially in combination with more complex functions that are provided by the `functions` module. 

### 3.1. Intro to Spark functions ?
In general, it is possible to use functions from other libraries such as `numpy` on Spark `DataFrame` objects, however this defeats the purpose of Spark which is its ability to optimize the performance of transformation pipelines due to lazy execution. 

This is why `functions` exists, which provides the user with a copious amount of functions for all kinds of purposes.

Suppose we want to map all record in the iris dataset whose petal width is above, let's say 0.5 to one and all others to zero. Although you wouldn't think of such an if-else statement as a function at first in Python in PySpark it is implemented in such a way through the function `.when()`.



In [15]:
import pyspark.sql.functions as F

(df1.select("species",
            F.when(df1["petalLength"] > 0.5 ,1)
             .otherwise(0).alias("pL above 0.5"))
    .show(5))

+-------+------------+
|species|pL above 0.5|
+-------+------------+
| setosa|           1|
| setosa|           1|
| setosa|           1|
| setosa|           1|
| setosa|           1|
+-------+------------+
only showing top 5 rows



Now, 0.5 is a pretty arbitrary value. Something like the mean or median would likely be more interesting. You might be tempted to write something like `df1["petalLength"] > F.mean("petalLength")` as the first argument of `F.when()` in line `7` above. This however will get you into "*Teufels Küche*" i.e. one hell of a mess.

The problem is the following functions such as `F.mean()` aggregate the values of a column into one value, but in our case we want to compare each value of the "petalLength" column with the mean of the column.

In order to deal with such dimensional mismatches we can use *window functions*, the way they are combined with `F.mean()` might look a bit unintuitive but what they do is basically return the mean, or the result of any other aggregation function, in form of a column containing as many rows as desired. In our case, we want to have as many rows as there are in the `df1.petalWidth`. Maybe a look at the graphic before the code will help you to get an even better understanding of what's going on.


In [16]:
from pyspark.sql.window import Window

w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)

(df1.select("species",
            F.when(df1["petalLength"] > F.mean("petalLength").over(w) ,1)
             .otherwise(0).alias("pL above mean"))
            .sample(fraction=0.1).show(5))

+----------+-------------+
|   species|pL above mean|
+----------+-------------+
|    setosa|            0|
|    setosa|            0|
|versicolor|            1|
|versicolor|            1|
|versicolor|            1|
+----------+-------------+
only showing top 5 rows



Now what happens if the function we want to apply simply doesn't exist? After all, the `functions` module contains only a finite amount of functions and chaining and combining them will only get us so far. This is where *User Defined Functions (UDFs)* come into play. Buckle up!

### 3.2. User Defined Functions

By allowing us to define their own functions, Spark gives us a lot of flexibility. After defining a Python function, we can register it in our Spark session via `spark.udf.register()`. It can then be used in all Spark pipelines just as functions from the `functions` module would be used. 




In [17]:
def firstUpper(s: str) -> str:
    s  = s[0].upper() + s[1:]
    return s

firstUpper_UDF = F.udf(firstUpper, "STRING")
# Apply function to our DataFrame contraining the Iris data.
df1.select(firstUpper_UDF("species").alias("Species")).show(2)

+-------+
|Species|
+-------+
| Setosa|
| Setosa|
+-------+
only showing top 2 rows



The function `firstUpper` we just wrote is quite trivial. From a code style perspective, it would be appropriate to substitute it via an anonymous function. Anonymous functions in Python can be created via the `lambda` keyword. 


In [18]:
firstUpper_UDF = F.udf(lambda s: s[0].upper() + s[1:], "STRING")
# Apply function to our DataFrame contraining the Iris data.
df1.select(firstUpper_UDF("species").alias("Species")).show(2)

+-------+
|Species|
+-------+
| Setosa|
| Setosa|
+-------+
only showing top 2 rows



As you know, there is no free lunch. The flexibility that comes with UDF's come with a performance trade-off, since UDF's are a black-box to PySpark it can't use the optimization engines used for optimizing the execution of functions from the `functions` module. For this reason, you should use the functions from the `functions` module whenever possible.

Sometimes we don't want to apply functions to all the entries in a column, but rather to a subset. An important way to generate such subsets is the `groupBy()` transformation, which we will see in action up next.


### 3.3. Grouping Values by Attribute.
What if we want to know how many plants belong to each species in our data set ? `.describe()` from section *1.1* didn't give us that information. In section *3.1* we've seen how to get the mean of the column of a data set, specifically the mean petalWidth of the iris flower. What if we want to get the specific mean of a certain species of iris flower ?

This is where the `.groupBy()` transformation comes into play. It assigns entries of a data set to groups based on the values of those entries in a specific column. Afterwards, so-called *aggregation* functions can be applied to each group which map the group to a single object, usually a numerical value such as the mean.

Figuring out how many plants belong to each species is an example of aggregation based on the same column by which the values are grouped.




In [19]:
(df1.select("species")    # Consider the column "species" [string].
    .groupBy("species")   # Identical entries are assigned to the same group.
    .count()              # For each group: count the number of entries belonging to it.
    .orderBy("count")     
    .show(n=3))

+----------+-----+
|   species|count|
+----------+-----+
| virginica|   50|
|versicolor|   50|
|    setosa|   50|
+----------+-----+



Getting the mean petal width for each species could be done  similarly to the code above. In the case of the mean, however, we might also want to round our results and give them a concise name. Maybe we are also interested in knowing the maximum petal width or other summary statistics of each species petal width.

In general, the `.agg()` transformation takes a list or dictionary of aggregation functions as input and applies them to the groups created by `.groupBy()`. A typical workflow could look like this:




In [20]:
(df1.select("petalWidth", "species")            # Consider the two columns "petalWidth" [double] and "species" [string].
    .where(F.col("petalWidth").isNotNull())     # Only consider entries where the petalWidth attribute is not Null.
    .groupBy("species")                         # Group the entries in the petalWidth column by their species.
    .agg(F.round(F.mean("petalWidth"),2)        # Calculate the mean petalWidth of each species rounded to two digits.
          .alias("mean_pW"),
         F.max("petalWidth"))                   # Name resulting column containing the means "mean_pW".
    .orderBy("mean_pW", ascending=False)        
    .show(n=3))

+----------+-------+---------------+
|   species|mean_pW|max(petalWidth)|
+----------+-------+---------------+
| virginica|   2.03|            2.5|
|versicolor|   1.33|            1.8|
|    setosa|   0.25|            0.6|
+----------+-------+---------------+



Now that we've been introduced to `.count()` and `.groupBy()` we can even specify our analysis of the petal Length from *3.1*. We can now find out the number of plants from each species whose petal length is above the mean petal length for the respective species. This section of code is probably the most complicated one in the tutorial, but pondering over it will give your understanding of the material a huge boost, so take a good look!

In [28]:
w = Window.partitionBy("species")                                               # Apply window species-wise.

df = (df1.select("species",                                                         
                 F.when(df1.petalLength > F.mean("petalLength").over(w), 1)          
                  .otherwise(0).alias("pL above species mean"))                      
         .groupBy("species")
         .agg((F.sum("pL above species mean") / F.count("species"))                       
                .alias("pL larger than species mean (%)")))

df.show()

+----------+-------------------------------+
|   species|pL larger than species mean (%)|
+----------+-------------------------------+
|    setosa|                           0.52|
|versicolor|                           0.54|
| virginica|                            0.5|
+----------+-------------------------------+



All that was a bit tougher than the concepts in the second  chapter, after all we are dealing with *advanced* transformations. The next and final  concept we will learn about is the `.join()` that some of you might know from working with SQL.

### 3.4. Joining DataFrames.
Suppose we have a set of *entities* (flowers in our concrete example) whose *attributes* are spread over multiple datasets. This means that one dataset might contain the petal width and petal length and another the type of species. A logical thing to do would be to combine *join* the datasets (which are our  DataFrames) into one  big dataset containing all attributes associated with an entity.

Before joining two DataFrames in PySpark we have to decide on two things: Based on which attributes to join the two DataFrames and how to join them. In simple cases  both DataFrames contain an ID column which can easily be used to join  records from  both DataFrames.

The question "how to join" arises when the DataFrames don't contain the same records. We could decide to only consider records corresponding to IDs that are contained in both columns. This would lead to loss of information however, since we would have to discard records that contain non-Null values. Alternatively we could fill the columns from the DataFrame in which the ID isn't contained with Null-values. 

The approaches we have described here can be selected in `.join()` by setting `how="inner"` or `how="full_outer"` respectively. There are other types of joins such as *left joins* and *right joins* as well as others which make the `.join()` transformation a potent tool for data combination.

The idea behind joins can be neatly visualized via Venn-diagrams. Take a look at the graphic before running the code cell below.






In [22]:
# https://sparkbyexamples.com/pyspark/pyspark-join-explained-with-examples/

# Since the iris dataset isn't very useful for illustrating joins
# you can take the first four sections of this tutorial as an example.

### 3.5. Conclusion.

Nice! You went through some pretty serious stuff! All in all you learned how to extract very specific information from data via functions, windows and `groupBy()` transformations as well as flexibly merging DataFrames via `.join()`. In the final chapter you will see how to perform the final step of an ETL workflow and that is loading the extracted and transformed data from the staging are into a data warehouse. 

## 4. The Loading Step.

Now that we have gained an in-depth look on how to do transformations, we can come to the final step of ETL which is loading our data from the staging are (the cells where we performed transformations) to our data warehouse. We will store the data in form of a SQL table. We will use the parquet format for this since it offers a few advantages when it comes to dealing with large data sets.

### 4.1. What's a parquet file ?
According to databricks, a company founded by the developers of Spark
> Apache Parquet is an open source, column-oriented data file format designed for efficient data storage and retrieval. It provides efficient data compression and encoding schemes with enhanced performance to handle complex data in bulk.

It can be used to store data of all kinds of formats such as structured data tables, images, videos and documents. It uses a range of sophisticated techniques to optimize aspects such as storage space and query speed. Parquet files are able to only read the needed columns for a given query, therefore greatly minimizing the IO for instance.

The advantages of using Parquet files are especially clear when we look at the monetary cost of working with them instead of .csv files. According to databricks costs for storing data in an AWS S3 bucket and running querries on it can be 99.7% less costly using .parquet instead of .csv files.

### 4.2. Load Data to an SQL-Table.
Loading data from PySpark into an SQL table can be done easily via the `.saveAsTable()` method. We are also supposed to choose a format for storing the data which will be `"parquet"` in our case.

In [None]:
df = df.withColumnRenamed("pL larger than species mean (%)",'pL_larger_than_species_mean') #SQL has special naming conventions for tables.

df.write.format("parquet").saveAsTable("df_parquet")

### 4.3. Conclusion.
You've seen now an example of how we can perform the loading step in an ETL pipeline. We've hereby come to the end of our tutorial. You should now be able to set up an ETL pipeline in PySpark yourself gathering interesting insights and managing the flow of data in an efficient way by using schemas for reading data and .parquet files to load it into a data warehouse. Feel free to take a look at our sources to dig deeper yourself, there is still a lot of valuable things to learn about Spark and we hope we sparked your curiosity for doing so.

## 5. Sources.



1.   Karau, H., Konwinski, A., Wendell, P., Zaharia, M. (2015). Learning Spark. O'Reilly. ISBN: 9781449358624 

2. https://databricks.com/de/glossary/what-is-parquet

4. https://spark.apache.org/docs/latest/

5. https://sparkbyexamples.com/

6. https://www.ibm.com/cloud/learn/etl
