# Spark SQL and DataFrames

### 1. Basic manipulations
First, let's create some users:

In [0]:
from datetime import date

# Let's first create some users:
col_names = ["first_name", "last_name", "birth_date", "gender", "country"]
users = [
  ("Alice", "Jones", date(1981, 4, 15), "female", "Canada"),
  ("John", "Doe", date(1951, 1, 21), "male", "USA"),
  ("Barbara", "May", date(1951, 9, 1), "female", "Australia"),
  ("James", "Smith", date(1975, 7, 12), "male", "United Kingdom"),
  ("Gerrard", "Dupont", date(1968, 5, 9), "male", "France"),
  ("Amanda", "B.", date(1988, 12, 16), "female", "New Zeland")
]

users_df = spark.createDataFrame(users, col_names)
display(users_df) # Only works in Databricks. Elswehere, use "df.show()" or "df.toPandas()"

first_name,last_name,birth_date,gender,country
Alice,Jones,1981-04-15,female,Canada
John,Doe,1951-01-21,male,USA
Barbara,May,1951-09-01,female,Australia
James,Smith,1975-07-12,male,United Kingdom
Gerrard,Dupont,1968-05-09,male,France
Amanda,B.,1988-12-16,female,New Zeland


Now it's your turn, create more users (at least 3, with different names) and add to the initial users, saving the result in a new variable.

In [0]:
userTmp = [
  ("Aminata", "Camara", date(1986, 3, 14), "female", "Guinee"),
  ("Fatim", "Diarra", date(1945, 5, 12), "male", "Mali"),
  ("Moussa", "Keita", date(1958, 9, 6), "female", "Niger"),
  ("Ousmane", "Barry", date(1965, 4, 13), "male", "Senegal"),
]

new_users_df = spark.createDataFrame(userTmp, col_names)
all_users_df = users_df.union(new_users_df)
display(all_users_df) # or all_users_df.show()

first_name,last_name,birth_date,gender,country
Alice,Jones,1981-04-15,female,Canada
John,Doe,1951-01-21,male,USA
Barbara,May,1951-09-01,female,Australia
James,Smith,1975-07-12,male,United Kingdom
Gerrard,Dupont,1968-05-09,male,France
Amanda,B.,1988-12-16,female,New Zeland
Aminata,Camara,1986-03-14,female,Guinee
Fatim,Diarra,1945-05-12,male,Mali
Moussa,Keita,1958-09-06,female,Niger
Ousmane,Barry,1965-04-13,male,Senegal


Now, select only two columns and show the resulting DataFrame, without saving it into a variable.

In [0]:
all_users_df.select("first_name", "birth_date").show()

Now, register your DataFrame as a table and select the same two columns with a SQL query string

In [0]:
query_string = "SELECT first_name, birth_date FROM prem_table"
all_users_df.createOrReplaceTempView("prem_table") # creates a local temporary table accessible by a SQL query
spark.sql(query_string).show()

Now we want to add an unique identifier for each user in the table. There are many strategies for that, and for our example we will use the string `{last_name}_{first_name}`

You can use `all_users_df` since your latest operation did not override its values. Add a new column called `user_id` to your DataFrame and save to a new variable.

**Hint:** The first place to look is in the [functions](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#module-pyspark.sql.functions) package

In [0]:
from pyspark.sql import functions as fn

## On povait aussi rajouter fn.lower pour convertir les strings en miniscule
users_with_id = all_users_df.select(fn.concat_ws('_', all_users_df.last_name, all_users_df.first_name).alias('user_id'), "*")
display(users_with_id)

user_id,first_name,last_name,birth_date,gender,country
Jones_Alice,Alice,Jones,1981-04-15,female,Canada
Doe_John,John,Doe,1951-01-21,male,USA
May_Barbara,Barbara,May,1951-09-01,female,Australia
Smith_James,James,Smith,1975-07-12,male,United Kingdom
Dupont_Gerrard,Gerrard,Dupont,1968-05-09,male,France
B._Amanda,Amanda,B.,1988-12-16,female,New Zeland
Camara_Aminata,Aminata,Camara,1986-03-14,female,Guinee
Diarra_Fatim,Fatim,Diarra,1945-05-12,male,Mali
Keita_Moussa,Moussa,Keita,1958-09-06,female,Niger
Barry_Ousmane,Ousmane,Barry,1965-04-13,male,Senegal


You can also do the same thing with an User Defined Function by passing a lambda, although it is not recommended when there is already a function in the `functions` package.

Add a new column called `user_id_udf` to the DataFrame, using an UDF that receives two parameters and concatenate them.

In [0]:
concat_udf = fn.udf(lambda str1, str2: str1 + "_" + str2)
users_with_id_udf = all_users_df.withColumn("user_id_udf", concat_udf(all_users_df.last_name, all_users_df.first_name))
display(users_with_id_udf)

first_name,last_name,birth_date,gender,country,user_id_udf
Alice,Jones,1981-04-15,female,Canada,Jones_Alice
John,Doe,1951-01-21,male,USA,Doe_John
Barbara,May,1951-09-01,female,Australia,May_Barbara
James,Smith,1975-07-12,male,United Kingdom,Smith_James
Gerrard,Dupont,1968-05-09,male,France,Dupont_Gerrard
Amanda,B.,1988-12-16,female,New Zeland,B._Amanda
Aminata,Camara,1986-03-14,female,Guinee,Camara_Aminata
Fatim,Diarra,1945-05-12,male,Mali,Diarra_Fatim
Moussa,Keita,1958-09-06,female,Niger,Keita_Moussa
Ousmane,Barry,1965-04-13,male,Senegal,Barry_Ousmane


Now, let's add another column called `age` with the computed age (in years) of each user, based on a given reference date, and save the resulting DataFrame into a new variable.

**Hint:** You can first compute the age in months, and then divide by 12. A final operation will probably be needed to get an integer number.

In [0]:
reference_date = date(2017, 12, 31)

users_with_age = all_users_df.withColumn("age", fn.round(fn.months_between(fn.lit(reference_date),all_users_df.birth_date, True)/(12)))
display(users_with_age)

first_name,last_name,birth_date,gender,country,age
Alice,Jones,1981-04-15,female,Canada,37.0
John,Doe,1951-01-21,male,USA,67.0
Barbara,May,1951-09-01,female,Australia,66.0
James,Smith,1975-07-12,male,United Kingdom,42.0
Gerrard,Dupont,1968-05-09,male,France,50.0
Amanda,B.,1988-12-16,female,New Zeland,29.0
Aminata,Camara,1986-03-14,female,Guinee,32.0
Fatim,Diarra,1945-05-12,male,Mali,73.0
Moussa,Keita,1958-09-06,female,Niger,59.0
Ousmane,Barry,1965-04-13,male,Senegal,53.0


Now, an analytical question: How many users of each gender who are more than 40 years old exist in this data? The solution must be a DataFrame with two columns: `gender` and `count` and two lines, one for each gender.

Bonus: Try to do your solution in a single chain (without intermediate variables)

**Hint:** You will need to filter and aggregate the data.

In [0]:
result = users_with_age.select("*").where("age > 40").groupBy('gender').count()
display(result)

gender,count
male,5
female,2


### 2. Reading files, performing joins, and aggregating

For this section you will use some fake data of two datasets: `Users` and `Donations`. The data is provided in two CSV files *with header* and using *comma* as separator.

The `Users` dataset contains information about about the users. 

The `Donations` dataset contains information about Donations performed by those users.

The first task is to read these files into the appropriate DataFrames.

**Note:** You need to set the option "inferSchema" to true in order to have the columns in the correct types. Besides, you can use `DataFrame.withColumn` in combination with `fn.to_date` or `fn.to_timestamp` to get proper dates.

_The data for this section has been created using [Mockaroo](https://www.mockaroo.com/)_.

In [0]:
users_from_file = spark.read.csv("/FileStore/tables/users.csv", sep=",", header=True, inferSchema=True)
users_from_file = users_from_file.withColumn("birth_date", fn.to_date(users_from_file.birth_date)) 
display(users_from_file)

donations = spark.read.csv("/FileStore/tables/donations.csv", sep=",", header=True, inferSchema=True)
donations = donations.withColumn("date", fn.to_date(donations.date)) \
                     .withColumn("timestamp", fn.to_timestamp(donations.timestamp))
display(donations)

user_id,first_name,last_name,gender,birth_date,country
1,Janifer,Lagadu,Female,1940-08-09,United States
2,Bruis,Danilov,Male,1966-05-28,Brazil
3,Omar,Spinozzi,Male,1977-08-28,China
4,Mag,Kennedy,Female,1963-11-11,China
5,Gerome,Lushey,Male,1990-06-08,Brazil
6,Lazarus,Andrus,Male,1983-04-22,France
7,Adora,Bartunek,Female,1971-06-04,France
8,Gladys,Gulland,Female,1959-07-07,Brazil
9,Pearline,Kitcher,Female,1944-06-01,China
10,Jolie,Diggin,Female,1944-07-31,Nigeria


donation_id,user_id,donation_value,date,time,timestamp
1,63,25.41,2017-03-06,18:28:20,2017-03-06T18:28:20.000+0000
2,53,93.32,2017-03-15,15:15:09,2017-03-15T15:15:09.000+0000
3,74,136.04,2017-10-29,20:36:23,2017-10-29T20:36:23.000+0000
4,94,37.25,2017-05-16,12:33:58,2017-05-16T12:33:58.000+0000
5,83,89.45,2017-05-09,22:45:44,2017-05-09T22:45:44.000+0000
6,80,163.67,2017-07-25,21:50:36,2017-07-25T21:50:36.000+0000
7,19,10.69,2017-11-15,8:43:54,2017-11-15T08:43:54.000+0000
8,53,135.09,2017-04-14,7:43:37,2017-04-14T07:43:37.000+0000
9,46,94.38,2017-03-07,10:30:34,2017-03-07T10:30:34.000+0000
10,90,68.63,2017-02-02,14:23:43,2017-02-02T14:23:43.000+0000


Now investigate the columns, contents and size of both datasets

In [0]:
# print the column names and types
users_from_file.printSchema()
donations.printSchema()

# print 5 elements of the datasets in a tabular format
users_from_file.show(5)
donations.show(5)

# print the number of lines of each dataset
print(users_from_file.count())
print(donations.count())

**Note:** If all the column types shown in the previous results are "string", you need to make sure you passed "inferSchema" as true when reading the CSV files and that you have converted date and timestamp columns to the matching types before continuing.

Before using the data, we may want to add some information about the users. 

Add a column containing the age of each user.

##### Je garde toujours la date référence de la question similaire précédente

In [0]:
# Je garde toujours la référence date de la question similaire précédente

users_from_file = users_from_file.withColumn("age", fn.round(fn.months_between(fn.lit(reference_date),users_from_file.birth_date, True)/(12)))
users_from_file.show()
users_from_file.printSchema()

Another useful information to have is the age **range** of each user. Using the `when` function, create the following 5 age ranges:
- "(0, 25]"
- "(25, 35]"
- "(35, 45]"
- "(45, 55]"
- "(55, ~]"

And add a new column to the users DataFrame, containing this information.

**Note:** When building logical operations with Spark DataFrames, it's better to be add parantheses. Example:
```python
df.select("name", "age").where( (df("age") > 20) & (df("age") <= 30) )
```

**Note 2:** If you are having problems with the `when` function, you can make an User Defined Function and do your logic in standard python.

In [0]:
ageRank = fn.when((users_from_file.age > 0) & (users_from_file.age <= 25.0), '(0,25])') \
            .when((users_from_file.age > 25) & (users_from_file.age <= 35), '(25,35])') \
            .when((users_from_file.age > 35) & (users_from_file.age <= 45), '(35,45])') \
            .when((users_from_file.age > 45) & (users_from_file.age <= 55), '(45,55])') \
            .otherwise('(55,~])')
users_from_file = users_from_file.withColumn("age_range", ageRank)
users_from_file.show()

Now that we have improved our users' DataFrame, the first analysis we want to make is the average donation performed by each gender. However, the gender information and the donation value are in different tables, so first we need to join them, using the `user_id` as joining key.

**Note:** Make sure you are not using the `users` DataFrame from the first part of the lab.

**Note 2:** For better performance, you can [broadcast](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.broadcast) the smaller dataset. But you should only do this when you are sure the DataFrame fits in the memory of the nodes.

**J'utilise broadcast sur le petit ensemble qui est `users_from_file`**

In [0]:
#### Sans broadcast
# joined_df = users_from_file.join(donations, 'user_id')

#### Avec Broadcast
joined_df = donations.join(fn.broadcast(users_from_file), 'user_id')

display(joined_df)

user_id,donation_id,donation_value,date,time,timestamp,first_name,last_name,gender,birth_date,country,age,age_range
63,1,25.41,2017-03-06,18:28:20,2017-03-06T18:28:20.000+0000,Tabor,Bramer,Male,1983-05-29,Brazil,35.0,"(25,35])"
53,2,93.32,2017-03-15,15:15:09,2017-03-15T15:15:09.000+0000,Chris,Climie,Male,1970-06-02,France,48.0,"(45,55])"
74,3,136.04,2017-10-29,20:36:23,2017-10-29T20:36:23.000+0000,Livia,Lodemann,Female,1967-03-11,United States,51.0,"(45,55])"
94,4,37.25,2017-05-16,12:33:58,2017-05-16T12:33:58.000+0000,Kendre,Pankhurst.,Female,1941-05-28,China,77.0,"(55,~])"
83,5,89.45,2017-05-09,22:45:44,2017-05-09T22:45:44.000+0000,Quincy,Marquess,Male,1997-08-13,Brazil,20.0,"(0,25])"
80,6,163.67,2017-07-25,21:50:36,2017-07-25T21:50:36.000+0000,Boothe,Endley,Male,1960-03-03,China,58.0,"(55,~])"
19,7,10.69,2017-11-15,8:43:54,2017-11-15T08:43:54.000+0000,Sharl,Airy,Female,1941-09-08,Brazil,76.0,"(55,~])"
53,8,135.09,2017-04-14,7:43:37,2017-04-14T07:43:37.000+0000,Chris,Climie,Male,1970-06-02,France,48.0,"(45,55])"
46,9,94.38,2017-03-07,10:30:34,2017-03-07T10:30:34.000+0000,Ethelyn,Raubenheim,Female,1953-07-19,Brazil,64.0,"(55,~])"
90,10,68.63,2017-02-02,14:23:43,2017-02-02T14:23:43.000+0000,Farlee,O'Connell,Male,1985-05-15,China,33.0,"(25,35])"


Now, use aggregation to find the the min, max and avg donation by gender.

In [0]:
donations_by_gender = joined_df.groupBy('gender').agg(fn.min('donation_value').alias('min'),
                                                      fn.max('donation_value').alias('max'),
                                                      fn.avg('donation_value').alias("avg")
                                                      )
donations_by_gender.show()

Now, make the necessary transformations and aggregations to answer to the following questions about the data. Note that some questions are only about the users, so make sure to use the smaller possible dataset when looking for your answers!

**Question 1:** 

a) What's the average, min and max age of the users? 

b) What's the average, min and max age of the users, by gender?

In [0]:
result_1a = users_from_file.agg(fn.avg('age').alias('average'),
                                fn.min('age').alias('min'),
                                fn.max('age').alias('max')
                               )
result_1a.show()

result_1b = users_from_file.groupBy('gender').agg(fn.avg('age').alias('average'),
                                                  fn.min('age').alias('min'),
                                                  fn.max('age').alias('max')
                                                 )
result_1b.show()

**Question 2:**

a) How many distinct country origins exist in the data? Print a DataFrame listing them.

b) What are the top 5 countries with the most users? Print a DataFrame containing the name of the countries and the counts.

In [0]:
result_2a = users_from_file.select('country').distinct()
result_2a.show()

result_2b = users_from_file.groupBy('country').agg(fn.count('user_id').alias('count_country')).orderBy('count_country', ascending=False).limit(5)
result_2b.show()

**Question 3:**

What's the number of donations average, min and max donations values by age range?

In [0]:
result_3 = joined_df.groupBy('age_range').agg( fn.count('donation_id').alias('number of donations').alias('number_of_donations'),
                                               fn.sum('donation_value').alias('sum_of_donations_value'),
                                               fn.min('donation_value').alias('min'),
                                               fn.max('donation_value').alias('max'),
                                               fn.avg('donation_value').alias("avg")
                                              )
result_3.show()

**Question 4:**

a) What's the number of donations, average, min and max donation values by user location (country)?

b) What is the number of donations, average, min and max donation values by gender for each user location (contry)? (the resulting DataFrame must contain 5 columns: the gender of the user, their country and the 3 metrics)

In [0]:
result_4a = joined_df.groupBy('country').agg(  fn.count('donation_id').alias('number of donations').alias('number_of_donations'),
                                               fn.sum('donation_value').alias('sum_of_donations_value'),
                                               fn.avg('donation_value').alias("avg"),
                                               fn.min('donation_value').alias('min'),
                                               fn.max('donation_value').alias('max'),
                                              )
result_4a.show()

result_4b = joined_df.groupBy(['gender', 'country']).agg(  
#                                                 fn.count('donation_id').alias('number of donations').alias('number_of_donations'),
                                               fn.avg('donation_value').alias("avg"),
                                               fn.min('donation_value').alias('min'),
                                               fn.max('donation_value').alias('max'),
                                              )
result_4b.show()

**Question 5**

Which month of the year has the largest aggregated donation value?

**Hint**: you can use a function to extract the month from a date, then you can aggregate to find the total donation value.

In [0]:
result_5 = donations.withColumn("month", fn.month(donations.date)) \
                    .groupBy("month") \
                    .agg(fn.sum("donation_value").alias("sum_of_donations")) \
                    .orderBy("sum_of_donations", ascending = False) \
                    .limit(1)
result_5.show()

### 3. Window Functions

_This section uses the same data as the last one._

Window functions are very useful for gathering aggregated data without actually aggregating the DataFrame. They can also be used to find "previous" and "next" information for entities, such as an user. [This article](https://databricks.com/blog/2015/07/15/introducing-window-functions-in-spark-sql.html) has a very nice explanation about the concept.

We want to find the users who donated less than a threshold and remove them from the donations dataset. Now, there are two ways of doing that:

1) Performing a traditional aggregation to find the users who donated less than the threshold, then filtering these users from the donations dataset, either with `where(not(df("user_id").isin(a_local_list)))` or with a join of type "anti-join".

2) Using window functions to add a new column with the aggregated donations per user, and then using a normal filter such as `where(aggregated_donation < threshold)`

Let's implement both and compare the complexity:

First, perform the traditional aggregation and find the users who donated less than 500 in total:

In [0]:
bad_users = joined_df.groupBy("user_id") \
                     .agg(fn.sum("donation_value").alias("somme_donation")) \
                     .where("somme_donation< 500")
bad_users.show()

You should have found around 10 users. Now, perform an "anti-join" to remove those users from `joined_df`.

**Hint:** The `join` operation accepts a third argument which is the join type. The accepted values are: 'inner', 'outer', 'full', 'fullouter', 'leftouter', 'left', 'rightouter', 'right', 'leftsemi', 'leftanti', 'cross'.

In [0]:
good_donations = joined_df.join(bad_users, "user_id", "leftanti")
good_donations.count()

Verify if the count of `good_donations` makes sense by performing a normal join to find the `bad_donations`.

In [0]:
bad_donations = joined_df.join(bad_users, "user_id")
bad_donations.count()

If you done everything right, at this point `good_donations.count()` + `bad_donations.count()` = `joined_df.count()`.

But using the join approach can be very heavy and it requires multipe operations. For this kind of problems, Window Functions are better.

In [0]:
good_donations.count() + bad_donations.count() == joined_df.count()

The first step is to create your window specification over `user_id` by using partitionBy

In [0]:
from pyspark.sql import Window

window_spec = Window.partitionBy('user_id')

Then, you can use one of the window functions of the `pyspark.sql.functions` package, appling to the created window_spec by using the `over` method. 

**Hint:** If you are blocked, try looking at the [documentation](http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.Column.over) for the `over` method, or searching on StackOverflow.

In [0]:
new_column = fn.sum("donation_value").over(window_spec)

donations_with_total = joined_df.withColumn('total_donated_by_user', new_column)
donations_with_total.show()

And now you can just filter on the `total_donated_by_user` column:

In [0]:
good_donations_wf = donations_with_total.where("total_donated_by_user >= 500")
good_donations_wf.count()

If you done everything right, you should obtain `good_donations_wf.count()` = `good_donations.count()`

In [0]:
good_donations_wf.count() == good_donations.count()

Window functions also can be useful to find the "next" and "previous" operations by using `functions.lead` and `functions.lag`. In our example, we can use it to find the date interval between two donations by a specific user.

For this kind of Window function, the window specification must be ordered by the date. So, create a new window specification partitioned by the `user_id` and ordered by the donation timestamp.

In [0]:
ordered_window = Window.partitionBy('user_id').orderBy("timestamp")

Now use `functions.lag().over()` to add a column with the timestamp of the previous donation of the same user. Then, inspect the result to see what it looks like.

In [0]:
new_column = fn.lag("timestamp", 1).over(ordered_window)

donations_with_lag = good_donations.withColumn('timestamp_order_lag', new_column)
donations_with_lag.orderBy("user_id", "timestamp").show()

Finally, compute the average time it took for each user between two of their consecutive donations (in days), and print the 5 users with the smallest averages. The result must include at least the users' id, last name and birth date, as well as the computed average.

**J'applique join après groupBy pour récupérer pour chaque user: first_name, last_name, ...**

In [0]:
average_between_donation = fn.avg(fn.datediff("timestamp", "timestamp_order_lag"))

users_average_between_donations = donations_with_lag.groupBy("user_id").agg(average_between_donation.alias("average_between_donation")) \
                                                    .join(users_from_file, 'user_id') \
                                                    .orderBy("average_between_donation")

users_average_between_donations.show(5)

Congratulations, you have finished this notebook!