### Importation des Packages

In [2]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

!ls

import findspark
findspark.init()

import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate() 
spark

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
0% [Waiting for headers] [1 InRelease 2,586 B/88.7 kB 3%] [Connecting to cloud.                                                                               Get:2 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease [15.9 kB]
0% [Waiting for headers] [1 InRelease 14.2 kB/88.7 kB 16%] [Connecting to cloud                                                                               Hit:3 http://archive.ubuntu.com/ubuntu bionic InRelease
0% [1 InRelease 88.7 kB/88.7 kB 100%] [Connecting to cloud.r-project.org (65.9.                                                                               Get:4 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
                                                                               Hit:5 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
0% [4 InRelease 56.2 kB/88.7 kB 63%] [Connected 

In [3]:
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

### Création de la base de données

In [17]:
t1 = [('07890', 'Jean Paul Sartre'), ('05678', 'Pierre de Ronsard')]
rdd1 = spark.sparkContext.parallelize(t1)
Auteur = rdd1.toDF(['aid', 'name'])
Auteur.show()

+-----+-----------------+
|  aid|             name|
+-----+-----------------+
|07890| Jean Paul Sartre|
|05678|Pierre de Ronsard|
+-----+-----------------+



In [18]:
t2 = [('0001', "L'existentialisme est un humanisme", 'Philosophie'), \
      ('0002', 'Huis clos. Suivi de Les Mouches', 'Philosophie'), \
     ('0003', 'Mignonne allons voir si la rose', 'Poeme'), \
     ('0004', 'Les Amours', 'Poème')]
rdd2 = spark.sparkContext.parallelize(t2)
livre = rdd2.toDF(['bid', 'title', 'category'])
livre.show()

+----+--------------------+-----------+
| bid|               title|   category|
+----+--------------------+-----------+
|0001|L'existentialisme...|Philosophie|
|0002|Huis clos. Suivi ...|Philosophie|
|0003|Mignonne allons v...|      Poeme|
|0004|          Les Amours|      Poème|
+----+--------------------+-----------+



In [19]:
t3 = [('S15', 'toto', 'Math'), \
      ('S16', 'popo', 'Eco'), \
     ('S17', 'fofo', 'Mécanique')]
rdd3 = spark.sparkContext.parallelize(t3)
Etudiant = rdd3.toDF(['sid', 'sname', 'dept'])
Etudiant.show()

+---+-----+---------+
|sid|sname|     dept|
+---+-----+---------+
|S15| toto|     Math|
|S16| popo|      Eco|
|S17| fofo|Mécanique|
+---+-----+---------+



In [20]:
t4 = [('07890', '0001'), \
      ('07890', '0002'), \
     ('05678', '0003'), \
     ('05678', '0003')]
rdd4 = spark.sparkContext.parallelize(t4)
write = rdd4.toDF(['aid', 'bid'])
write.show()

+-----+----+
|  aid| bid|
+-----+----+
|07890|0001|
|07890|0002|
|05678|0003|
|05678|0003|
+-----+----+



In [21]:
t5 = [('S15', '0003', '02-01-2020', '01-02-2020'), \
      ('S15', '0002', '13-06-2020', 'null'), \
     ('S15', '0001', '13-06-2020', '13-10-2020'), \
     ('S16', '0002', '24-01-2020', '24-01-2020'), \
     ('S17', '0001', '12-04-2020', '01-07-2020')]
rdd5 = spark.sparkContext.parallelize(t5)
Emprunt = rdd5.toDF(['sid', 'bid', 'checkout_time', 'return_time'])
Emprunt.show()

+---+----+-------------+-----------+
|sid| bid|checkout_time|return_time|
+---+----+-------------+-----------+
|S15|0003|   02-01-2020| 01-02-2020|
|S15|0002|   13-06-2020|       null|
|S15|0001|   13-06-2020| 13-10-2020|
|S16|0002|   24-01-2020| 24-01-2020|
|S17|0001|   12-04-2020| 01-07-2020|
+---+----+-------------+-----------+



###Création des Tables SQL

In [81]:
Auteur.createOrReplaceTempView('Auteur_SQL')
livre.createOrReplaceTempView('Livre_SQL')
Etudiant.createOrReplaceTempView('Etudiant_SQL')
write.createOrReplaceTempView('Write_SQL')
Emprunt.createOrReplaceTempView('Emprunt_SQL')

### Q1 : Trouver les titres de tous les livres que l'étudiant sid='S15' a emprunté

In [40]:
# SQL
spark.sql("""select title
            from Livre_SQL 
            join Emprunt_SQL on Livre_SQL.bid = Emprunt_SQL.bid
            where Emprunt_SQL.sid == 'S15' """).show()

+--------------------+
|               title|
+--------------------+
|Huis clos. Suivi ...|
|Mignonne allons v...|
|L'existentialisme...|
+--------------------+



In [34]:
# DSL
livre.join(Emprunt, 'bid')\
     .select('title')\
     .filter(F.col('sid')=='S15')\
     .show()

+--------------------+
|               title|
+--------------------+
|Huis clos. Suivi ...|
|Mignonne allons v...|
|L'existentialisme...|
+--------------------+



### Q2 : Trouver les titres de tous les livres qui n'ont jamais été empruntés par un étudiant

In [47]:
# SQL
spark.sql("""select title 
            from Livre_SQL
            left join Emprunt_SQL on Livre_SQL.bid = Emprunt_SQL.bid
            where Emprunt_SQL.bid is NULL""").show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



In [36]:
# DSL
livre.join(Emprunt, livre.bid==Emprunt.bid, how='left')\
    .select('title')\
    .filter(F.col('sid').isNull())\
    .show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+



### Q3 : Trouver tous les étudiants qui ont emprunté le livre bid=’0002’

In [39]:
# SQL
spark.sql("""select sname
            from Etudiant_SQL
            join Emprunt_SQL on Etudiant_SQL.sid = Emprunt_SQL.sid
            where Emprunt_SQL.bid == '0002' """).show()

+-----+
|sname|
+-----+
| popo|
| toto|
+-----+



In [41]:
# DSL
Etudiant.join(Emprunt, 'sid')\
    .select('sname')\
    .filter(F.col('bid')=='0002')\
    .show()

+-----+
|sname|
+-----+
| popo|
| toto|
+-----+



### Q4 : Trouver les titres de tous les livres empruntés par des étudiants en Mécanique

In [43]:
# SQL
spark.sql("""select title
            from Livre_SQL
            join Emprunt_SQL on Livre_SQL.bid = Emprunt_SQL.bid
            join Etudiant_SQL on Emprunt_SQL.sid = Etudiant_SQL.sid
            where Etudiant_SQL.dept == 'Mécanique' """).show()

+--------------------+
|               title|
+--------------------+
|L'existentialisme...|
+--------------------+



In [44]:
# DSL
livre.join(Emprunt, 'bid')\
    .join(Etudiant, 'sid')\
    .select('title')\
    .filter(F.col('dept')=='Mécanique')\
    .show()

+--------------------+
|               title|
+--------------------+
|L'existentialisme...|
+--------------------+



### Q5 : Trouver les étudiants qui n’ont jamais emprunté de livre

In [60]:
# SQL
spark.sql("""select sname
            from Etudiant_SQL
            left join Emprunt_SQL on Etudiant_SQL.sid = Emprunt_SQL.sid
            where Emprunt_SQL.sid is null""").show()

+-----+
|sname|
+-----+
+-----+



In [61]:
# DSL
Etudiant.join(Emprunt, Etudiant.sid==Emprunt.sid, how='left')\
    .select('sname')\
    .filter(F.col('sname').isNull())\
    .show()

+-----+
|sname|
+-----+
+-----+



### Q6 : Déterminer l’auteur qui a écrit le plus de livres

In [54]:
# SQL 
spark.sql("""select first(name) as auteur, count(distinct bid) as nbr
            from Auteur_SQL
            join Write_SQL on Auteur_SQL.aid = Write_SQL.aid
            group by name""").show()

+-----------------+---+
|           auteur|nbr|
+-----------------+---+
| Jean Paul Sartre|  2|
|Pierre de Ronsard|  1|
+-----------------+---+



In [53]:
# DSL
Auteur.join(write, "aid") \
    .distinct() \
    .groupBy("name") \
    .agg(F.count("bid").alias("nbr")) \
    .sort(F.col("nbr").desc()) \
    .select(F.first("name").alias("auteur"),F.first("nbr").alias("nbr")) \
    .show()

+----------------+---+
|          auteur|nbr|
+----------------+---+
|Jean Paul Sartre|  2|
+----------------+---+



### Q7: Déterminer les personnes qui n’ont pas encore rendu les livres

In [59]:
# SQL
spark.sql("""select sname
            from Etudiant_SQL
            join Emprunt_SQL on Etudiant_SQL.sid = Emprunt_SQL.sid
            where Emprunt_SQL.return_time == 'null' """).show()

+-----+
|sname|
+-----+
| toto|
+-----+



In [57]:
# DSL
Etudiant.join(Emprunt, 'sid')\
    .select('sname')\
    .filter(F.col('return_time')=='null')\
    .show()

+-----+
|sname|
+-----+
| toto|
+-----+



### Q8 : Créer une nouvelle colonne dans la table Emprunt qui prend la valeur 1, si la durée d'emprunt est supérieur à 3 mois,  sinon 0.

### Q9 : Déterminer les livres qui n’ont jamais été empruntés

In [70]:
NewEmprunt=Emprunt.withColumn("check_to_date", F.to_date(F.col("checkout_time"), "dd-MM-yyyy"))\
    .withColumn("ret_to_date", F.to_date(F.col("return_time"), "dd-MM-yyyy"))\
    .withColumn("Duree", F.datediff(F.col("ret_to_date"), F.col("check_to_date")))\
    .withColumn("plusde3mois", (F.when(F.col("Duree")>=90, 1).otherwise(0)))\
    .select("sid", "bid", "checkout_time", "return_time", "plusde3mois")

In [76]:
NewEmprunt.toPandas().to_csv("../NewEmprunt.csv", header=True) #On exporte en CSV 

In [79]:
# SQL
spark.sql("""select t1.bid, t1.title
            from Livre_SQL as t1
            left join Emprunt_SQL as t2 on t1.bid = t2.bid
            where t2.sid is null""").show()

+----+----------+
| bid|     title|
+----+----------+
|0004|Les Amours|
+----+----------+



In [80]:
# DSL
livre.join(Emprunt, livre.bid==Emprunt.bid, how='left')\
    .select('title')\
    .filter(F.col('sid').isNull())\
    .show()

+----------+
|     title|
+----------+
|Les Amours|
+----------+

