### Import Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F 

### Start a simple Spark Session

In [2]:
spark = SparkSession.builder \
    .master('local') \
    .enableHiveSupport()\
    .appName('LibraryManagement') \
    .getOrCreate()

### Créer les données
* Author

In [3]:
lst1 = [('07890','Jean Paul Sartre'),
        ('05678','Pierre de Ronsard')]
rdd1 = spark.sparkContext.parallelize(lst1) ## On obtient un RDD
author = rdd1.toDF(['aid','name'])
author.show()

+-----+-----------------+
|  aid|             name|
+-----+-----------------+
|07890| Jean Paul Sartre|
|05678|Pierre de Ronsard|
+-----+-----------------+



* Book

In [4]:
lst2 = [('0001',"L'existentialisme est un humanisme",'Philosophie'),
        ('0002','Huis clos. Suivi de Les Mouches','Philosophie'),
        ('0003','Mignonne allons voir si la rose','Poeme'),
        ('0004','Les Amours','Poème')]
rdd2 = spark.sparkContext.parallelize(lst2)
book = rdd2.toDF(['bid','title','category'])
book.show()

+----+--------------------+-----------+
| bid|               title|   category|
+----+--------------------+-----------+
|0001|L'existentialisme...|Philosophie|
|0002|Huis clos. Suivi ...|Philosophie|
|0003|Mignonne allons v...|      Poeme|
|0004|          Les Amours|      Poème|
+----+--------------------+-----------+



* Student

In [5]:
lst3 = [('S15','toto','Math'),
        ('S16','popo','Eco'),
        ('S17','fofo','Mécanique')]
rdd3 = spark.sparkContext.parallelize(lst3)
student = rdd3.toDF(['sid','sname','dept'])
student.show()

+---+-----+---------+
|sid|sname|     dept|
+---+-----+---------+
|S15| toto|     Math|
|S16| popo|      Eco|
|S17| fofo|Mécanique|
+---+-----+---------+



* Write

In [6]:
lst4 = [('07890','0001'),
        ('07890','0002'),
        ('05678','0003'),
       ('05678','0003')]
rdd4 = spark.sparkContext.parallelize(lst4)
write = rdd4.toDF(['aid','bid'])
write.show()

+-----+----+
|  aid| bid|
+-----+----+
|07890|0001|
|07890|0002|
|05678|0003|
|05678|0003|
+-----+----+



* Borrow

In [7]:
lst5 = [('S15','0003','02-01-2020','01-02-2020'),
        ('S15','0002','13-06-2020',None),
        ('S15','0001','13-06-2020','13-10-2020'),
        ('S16','0002','24-01-2020', '24-01-2020'),
        ('S17','0001','12-04-2020','01-07-2020')]
rdd5 = spark.sparkContext.parallelize(lst5)
borrow = rdd5.toDF(['sid','bid','checkout-time','return-time'])
borrow.show()

+---+----+-------------+-----------+
|sid| bid|checkout-time|return-time|
+---+----+-------------+-----------+
|S15|0003|   02-01-2020| 01-02-2020|
|S15|0002|   13-06-2020|       null|
|S15|0001|   13-06-2020| 13-10-2020|
|S16|0002|   24-01-2020| 24-01-2020|
|S17|0001|   12-04-2020| 01-07-2020|
+---+----+-------------+-----------+



### Créer une copie des tables SQL

In [8]:
author.createOrReplaceTempView("authorSQL")
book.createOrReplaceTempView("bookSQL")
student.createOrReplaceTempView("studentSQL")
write.createOrReplaceTempView("writeSQL")
borrow.createOrReplaceTempView("borrowSQL")

### Les titres de tous les livres que l'étudiant sid='S15' a emprunté
* DSL

In [9]:
book\
    .join(borrow,['bid'])\
    .join(student,['sid'])\
    .filter(F.col('sid')=='S15')\
    .select('sid','title')\
    .show()

+---+--------------------+
|sid|               title|
+---+--------------------+
|S15|Huis clos. Suivi ...|
|S15|Mignonne allons v...|
|S15|L'existentialisme...|
+---+--------------------+



* SQL

In [10]:
spark.sql("""
    select C.sid, title
    from bookSQL as A
    join borrowSQL as B
    on A.bid = B.bid
    join studentSQL as C
    on B.sid = C.sid
    where B.sid == "S15"
""").show()

+---+--------------------+
|sid|               title|
+---+--------------------+
|S15|Huis clos. Suivi ...|
|S15|Mignonne allons v...|
|S15|L'existentialisme...|
+---+--------------------+



###  Les titres de tous les livres qui n'ont jamais été empruntés par un étudiant

* DSL

In [11]:
book\
    .join(borrow,['bid'], how="full")\
    .join(student,['sid'], how="full")\
    .filter(F.col('sid').isNull())\
    .select('sid','title')\
    .show()

+----+----------+
| sid|     title|
+----+----------+
|null|Les Amours|
+----+----------+



* SQL

In [12]:
spark.sql("""
    select B.sid, title
    from bookSQL as A
    full join borrowSQL as B
    on A.bid = B.bid
    full join studentSQL as C
    on B.sid = C.sid
    where B.sid is NULL
""").show()

+----+----------+
| sid|     title|
+----+----------+
|null|Les Amours|
+----+----------+



### Tous les étudiants qui ont emprunté le livre bid=’0002’
* DSL

In [13]:
book\
    .join(borrow,['bid'])\
    .join(student,['sid'])\
    .filter(F.col('bid')=='0002')\
    .select('bid','sname')\
    .show()

+----+-----+
| bid|sname|
+----+-----+
|0002| popo|
|0002| toto|
+----+-----+



* SQL

In [14]:
spark.sql("""
    select B.bid, sname
    from bookSQL as A
    join borrowSQL as B
    on A.bid = B.bid
    join studentSQL as C
    on B.sid = C.sid
    where B.bid == "0002"
""").show()

+----+-----+
| bid|sname|
+----+-----+
|0002| popo|
|0002| toto|
+----+-----+



### Les titres de tous les livres empruntés par des étudiants en informatique (département informatique)
* DSL

In [15]:
book\
    .join(borrow,['bid'])\
    .join(student,['sid'])\
    .filter(F.col('dept')=='Mécanique')\
    .select('dept','title')\
    .show()

+---------+--------------------+
|     dept|               title|
+---------+--------------------+
|Mécanique|L'existentialisme...|
+---------+--------------------+



* SQL

In [16]:
spark.sql("""
    select C.dept, title
    from bookSQL as A
    join borrowSQL as B
    on A.bid = B.bid
    join studentSQL as C
    on B.sid = C.sid
    where C.dept == "Mécanique"
""").show()

+---------+--------------------+
|     dept|               title|
+---------+--------------------+
|Mécanique|L'existentialisme...|
+---------+--------------------+



### Les étudiants qui n’ont jamais emprunté de livre
* DSL

In [17]:
book\
    .join(borrow,['bid'], how="full")\
    .join(student,['sid'], how="full")\
    .filter(F.col('bid').isNull())\
    .select('sid','sname')\
    .show()

+---+-----+
|sid|sname|
+---+-----+
+---+-----+



* SQL

In [18]:
spark.sql("""
    select B.sid, sname
    from bookSQL as A
    full join borrowSQL as B
    on A.bid = B.bid
    full join studentSQL as C
    on B.sid = C.sid 
    where A.bid is NULL
""").show()

+---+-----+
|sid|sname|
+---+-----+
+---+-----+



### L’auteur qui a écrit le plus de livres
* DSL

In [19]:
nb_livre = write\
            .join(author,['aid'])\
            .groupby('name')\
            .agg(F.count('aid').alias('nb_livre'))
maximum = nb_livre.collect()[0]['nb_livre']
nb_livre\
    .filter(F.col('nb_livre')==maximum)\
    .show()

+-----------------+--------+
|             name|nb_livre|
+-----------------+--------+
| Jean Paul Sartre|       2|
|Pierre de Ronsard|       2|
+-----------------+--------+



* SQL

In [20]:
spark.sql("""
    create table livresSQL as 
    select name, nb_livre
    from (
        select name, count(A.aid) as nb_livre
        from authorSQL as A
        join writeSQL as B
        on A.aid = B.aid
        group by name
        )
""")
spark.sql("""
    select * 
    from livresSQL 
    where nb_livre == (select max(nb_livre)
                        from livresSQL)
""").show()

+-----------------+--------+
|             name|nb_livre|
+-----------------+--------+
|Pierre de Ronsard|       2|
| Jean Paul Sartre|       2|
+-----------------+--------+



###  Les personnes qui n’ont pas encore rendu les livres
* DSL

In [21]:
borrow\
    .join(student,['sid'], how="full")\
    .filter(F.col('return-time').isNull())\
    .select('sid','sname')\
    .distinct()\
    .show()

+---+-----+
|sid|sname|
+---+-----+
|S15| toto|
+---+-----+



* SQL

In [22]:
spark.sql("""
    select distinct B.sid, sname
    from borrowSQL as A
    join studentSQL as B
    on A.sid = B.sid
    where A.`return-time` is NULL
""").show()

+---+-----+
|sid|sname|
+---+-----+
|S15| toto|
+---+-----+



### Création d'une nouvelle colonne dans la table borrow qui prend la valeur 1, si la durée d'emprunt est supérieur à 3 mois,  sinon 0

In [23]:
borrow = borrow\
    .withColumn("format",F.lit("dd-MM-yyyy"))\
    .withColumn("start",F.expr("to_date(`checkout-time`, format)"))\
    .withColumn("end",F.expr("to_date(`return-time`, format)"))\
    .withColumn("plus que 3 mois", 
                F.when(F.months_between(F.col('end'),F.col('start'))>=3,1).\
                otherwise(0))\
    .drop('format','start','end')

In [24]:
borrow\
    .write\
    .mode('overwrite')\
    .option("header", "true")\
    .csv('contention')

* SQL

In [25]:
spark.sql("""
    select *, 
    case
        when (months_between(to_date(`return-time`, 'dd-MM-yyyy'),
                   to_date(`checkout-time`, 'dd-MM-yyyy')) >= 3) then 1
        else 0
    end as `plus que 3 mois`
    from borrowSQL
""").show()

+---+----+-------------+-----------+---------------+
|sid| bid|checkout-time|return-time|plus que 3 mois|
+---+----+-------------+-----------+---------------+
|S15|0003|   02-01-2020| 01-02-2020|              0|
|S15|0002|   13-06-2020|       null|              0|
|S15|0001|   13-06-2020| 13-10-2020|              1|
|S16|0002|   24-01-2020| 24-01-2020|              0|
|S17|0001|   12-04-2020| 01-07-2020|              0|
+---+----+-------------+-----------+---------------+



### Les livres qui n’ont jamais été empruntés
* DSL

In [26]:
book\
    .join(borrow,['bid'], how='full')\
    .filter(F.col('sid').isNull())\
    .select('title')\
    .show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



* SQL

In [27]:
spark.sql("""
    select A.title
    from bookSQL as A
    full join borrowSQL as B
    on A.bid = B.bid
    where B.sid is NULL
""").show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+

