## SQL à l'échelle avec Spark SQL et DataFrames

Spark SQL apporte à Spark un support natif de SQL et rationalise le processus d'interrogation des données stockées à la fois dans les RDD (les ensembles de données distribuées de Spark) et dans des sources externes. Spark SQL brouille de manière pratique les lignes entre les RDD et les tables relationnelles. L'unification de ces puissantes abstractions permet aux développeurs de combiner facilement les commandes SQL d'interrogation de données externes avec des analyses complexes, le tout dans une seule application. Concrètement, Spark SQL permettra aux développeurs de

- d'importer des données relationnelles à partir de fichiers de parquets et de tables de stockage
- Exécutez des requêtes SQL sur les données importées et les RDD existants
- Rédiger facilement des RDD sur des tables de ruche ou des fichiers de parquet

In [2]:
import pandas as pd
import matplotlib.pyplot as plt
plt.style.use('fivethirtyeight')

## Récupération des données

Nous utiliserons les données de la [KDD Cup 1999] (http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html), qui est l'ensemble de données utilisé pour le troisième concours international d'outils de découverte de connaissances et d'exploration de données, qui a eu lieu en conjonction avec la KDD-99, la cinquième conférence internationale sur la découverte de connaissances et l'exploration de données. La tâche du concours consistait à construire un détecteur d'intrusion réseau, un modèle prédictif capable de distinguer les "mauvaises" connexions, appelées intrusions ou attaques, des "bonnes" connexions normales. Cette base de données contient un ensemble standard de données à vérifier, qui comprend une grande variété d'intrusions simulées dans un environnement de réseau militaire. 

Nous utiliserons l'ensemble de données réduit `kddcup.data_10_percent.gz` contenant près d'un demi-million d'interactions nework puisque nous téléchargerons ce fichier Gzip du web localement et travaillerons ensuite sur celui-ci. Si vous disposez d'une bonne et stable connexion Internet, n'hésitez pas à télécharger et à travailler avec l'ensemble complet de données disponible sous le nom de `kddcup.data.gz`.

#### Travailler avec les données du web

Le traitement des ensembles de données récupérés sur le web peut être un peu délicat dans les bases de données. Heureusement, nous disposons d'excellents ensembles d'utilitaires comme les "dbutils" qui nous facilitent la tâche. Examinons rapidement quelques fonctions essentielles de ce module.

In [5]:
dbutils.help()

#### Récupération et stockage des données dans les bases de données

Nous allons maintenant exploiter la bibliothèque python `urllib` pour extraire les données de la Coupe KDD 99 de leur dépôt sur le web, les stocker dans un emplacement temporaire et les déplacer ensuite vers le système de fichiers Databricks qui peut permettre un accès facile à ces données pour analyse

Si vous sautez cette étape et téléchargez les données directement, vous risquez d'obtenir une `InvalidInputException' : Erreur "Input path does not exist".

In [7]:
import urllib.request
data = urllib.request.urlretrieve("http://kdd.ics.uci.edu/databases/kddcup99/kddcup.data_10_percent.gz", "/tmp/kddcup_data.gz")
dbutils.fs.mv("file:/tmp/kddcup_data.gz", "dbfs:/kdd/kddcup_data.gz")
display(dbutils.fs.ls("dbfs:/kdd"))

path,name,size
dbfs:/kdd/kddcup_data.gz,kddcup_data.gz,2144903


## Constitution de l'ensemble de données KDD

Maintenant que nos données sont stockées dans le système de fichiers Databricks. Chargeons nos données du disque dans la structure traditionnelle de données abstraites de Spark, le [Resilient Distributed Dataset (RDD)] (https://spark.apache.org/docs/latest/rdd-programming-guide.html#resilient-distributed-datasets-rdds)

In [9]:
data_file = "dbfs:/kdd/kddcup_data.gz"
raw_rdd = sc.textFile(data_file).cache()
raw_rdd.take(5)

In [10]:
type(raw_rdd)

## Construire un DataFrame étincelant sur nos données

Une Spark DataFrame est une structure de données intéressante qui représente une collection de données distribuées. Une DataFrame est un ensemble de données organisé en colonnes nommées. Elle est conceptuellement équivalente à une table dans une base de données relationnelle ou à une dataframe en R/Python, mais avec des optimisations plus riches sous le capot. Les DataFrames peuvent être construites à partir d'un large éventail de sources telles que : des fichiers de données structurées, des tables dans une ruche, des bases de données externes, ou des RDD existants dans notre cas.

En général, le point d'entrée de toutes les fonctionnalités SQL dans Spark est la classe "SQLContext". Pour créer une instance de base de cet appel, tout ce dont nous avons besoin est une référence `SparkContext`. Dans Databricks, cet objet contextuel global est disponible sous la forme "sc" à cette fin.

In [12]:
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
sqlContext 

#### Diviser les données CSV
Chaque entrée de notre RDD est une ligne de données séparées par des virgules que nous devons d'abord séparer avant de pouvoir analyser et construire notre cadre de données

In [14]:
csv_rdd = raw_rdd.map(lambda row: row.split(","))
print(csv_rdd.take(2))
print(type(csv_rdd))

#### Vérifiez le nombre total de caractéristiques (colonnes)
Nous pouvons utiliser le code suivant pour vérifier le nombre total de colonnes potentielles dans notre ensemble de données

In [16]:
len(csv_rdd.take(1)[0])

#### Data Understanding and Parsing

The KDD 99 Cup data consists of different attributes captured from connection data. The full list of attributes in the data can be obtained [__here__](http://kdd.ics.uci.edu/databases/kddcup99/kddcup.names) and further details pertaining to the description for each attribute\column can be found [__here__](http://kdd.ics.uci.edu/databases/kddcup99/task.html). We will just be using some specific columns from the dataset, the details of which are specified below.


| feature num | feature name       | description                                                  | type       |
|-------------|--------------------|--------------------------------------------------------------|------------|
| 1           | duration           | length (number of seconds) of the connection                 | continuous |
| 2           | protocol_type      | type of the protocol, e.g. tcp, udp, etc.                    | discrete   |
| 3           | service            | network service on the destination, e.g., http, telnet, etc. | discrete   |
| 4           | src_bytes          | number of data bytes from source to destination              | continuous |
| 5           | dst_bytes          | number of data bytes from destination to source              | continuous |
| 6           | flag               | normal or error status of the connection                     | discrete   |
| 7           | wrong_fragment     | number of ``wrong'' fragments                                | continuous |
| 8           | urgent             | number of urgent packets                                     | continuous |
| 9           | hot                | number of ``hot'' indicators                                 | continuous |
| 10          | num_failed_logins  | number of failed login attempts                              | continuous |
| 11          | num_compromised    | number of ``compromised'' conditions                         | continuous |
| 12          | su_attempted       | 1 if ``su root'' command attempted; 0 otherwise              | discrete   |
| 13          | num_root           | number of ``root'' accesses                                  | continuous |
| 14          | num_file_creations | number of file creation operations                           | continuous |

Nous allons extraire les colonnes suivantes en fonction de leur position dans chaque point de données (ligne) et construire un nouveau RDD comme suit

In [18]:
from pyspark.sql import Row

parsed_rdd = csv_rdd.map(lambda r: Row(
    duration=int(r[0]), 
    protocol_type=r[1],
    service=r[2],
    flag=r[3],
    src_bytes=int(r[4]),
    dst_bytes=int(r[5]),
    wrong_fragment=int(r[7]),
    urgent=int(r[8]),
    hot=int(r[9]),
    num_failed_logins=int(r[10]),
    num_compromised=int(r[12]),
    su_attempted=r[14],
    num_root=int(r[15]),
    num_file_creations=int(r[16]),
    label=r[-1]
    )
)
parsed_rdd.take(5)

#### Construction du DataFrame
Maintenant que nos données sont bien analysées et formatées, construisons notre DataFrame !

In [20]:
df = sqlContext.createDataFrame(parsed_rdd)
display(df.head(10))

dst_bytes,duration,flag,hot,label,num_compromised,num_failed_logins,num_file_creations,num_root,protocol_type,service,src_bytes,su_attempted,urgent,wrong_fragment
5450,0,SF,0,normal.,0,0,0,0,tcp,http,181,0,0,0
486,0,SF,0,normal.,0,0,0,0,tcp,http,239,0,0,0
1337,0,SF,0,normal.,0,0,0,0,tcp,http,235,0,0,0
1337,0,SF,0,normal.,0,0,0,0,tcp,http,219,0,0,0
2032,0,SF,0,normal.,0,0,0,0,tcp,http,217,0,0,0
2032,0,SF,0,normal.,0,0,0,0,tcp,http,217,0,0,0
1940,0,SF,0,normal.,0,0,0,0,tcp,http,212,0,0,0
4087,0,SF,0,normal.,0,0,0,0,tcp,http,159,0,0,0
151,0,SF,0,normal.,0,0,0,0,tcp,http,210,0,0,0
786,0,SF,1,normal.,0,0,0,0,tcp,http,212,0,0,0


Maintenant, nous pouvons facilement jeter un coup d'oeil au schéma de notre dataframe en utilisant la fonction `printSchema(...)`.

In [22]:
df.printSchema()

## Construire une table temporaire 

Nous pouvons utiliser la fonction `registerTempTable()` pour construire une table temporelle permettant d'exécuter des commandes SQL sur notre DataFrame à l'échelle ! Un point à retenir est que la durée de vie de cette table temporaire est liée à la session. Elle crée une table en mémoire qui est portée à l'échelle de la grappe dans laquelle elle a été créée. Les données sont stockées en utilisant le format en colonnes en mémoire hautement optimisé de Hive. 

Vous pouvez également consulter la fonction "saveAsTable()`" qui crée une table physique permanente stockée dans S3 en utilisant le format Parquet. Cette table est accessible à tous les clusters. Les métadonnées de la table, y compris l'emplacement du ou des fichiers, sont stockées dans la métastore de la ruche.

In [24]:
help(df.registerTempTable)

In [25]:
df.registerTempTable("connections")

## Exécution de SQL à l'échelle
Voyons quelques exemples de la manière dont nous pouvons exécuter des requêtes SQL sur notre table à partir de notre cadre de données. Nous commencerons par quelques requêtes simples, puis nous examinerons les agrégations, les filtres, le tri, les sous-requêtes et les pivots

### Connexions basées sur le type de protocole

Voyons comment nous pouvons obtenir le nombre total de connexions en fonction du type de protocole de connectivité. Tout d'abord, nous obtiendrons ces informations en utilisant la syntaxe DSL DataFrame normale pour effectuer des agrégations.

In [28]:
display(df.groupBy('protocol_type')
          .count()
          .orderBy('count', ascending=False))

protocol_type,count
icmp,283602
tcp,190065
udp,20354


Peut-on aussi utiliser SQL pour effectuer la même agrégation ? Oui, nous pouvons utiliser la table que nous avons construite plus tôt pour cela !

In [30]:
protocols = sqlContext.sql("""
                           SELECT protocol_type, count(*) as freq
                           FROM connections
                           GROUP BY protocol_type
                           ORDER BY 2 DESC
                           """)
display(protocols)

protocol_type,freq
icmp,283602
tcp,190065
udp,20354


Vous pouvez clairement voir que vous obtenez les mêmes résultats et que vous n'avez pas à vous préoccuper de votre infrastructure de fond ou de la façon dont le code est exécuté. Il suffit d'écrire du SQL simple !

### Connexions basées sur les bonnes ou mauvaises signatures (types d'attaques)

Nous allons maintenant effectuer une simple agrégation pour vérifier le nombre total de connexions en fonction des types de bonnes (normales) ou mauvaises (attaques par intrusion).

In [33]:
labels = sqlContext.sql("""
                           SELECT label, count(*) as freq
                           FROM connections
                           GROUP BY label
                           ORDER BY 2 DESC
                           """)
display(labels)

label,freq
smurf.,280790
neptune.,107201
normal.,97278
back.,2203
satan.,1589
ipsweep.,1247
portsweep.,1040
warezclient.,1020
teardrop.,979
pod.,264


We have a lot of different attack types. We can visualize this in the form of a bar chart. The simplest way is to use the excellent interface options in the Databricks notebook itself as depicted in the following figure!

![](https://cdn-images-1.medium.com/max/800/1*MWtgLR6H4siUB1Ugc8sqog.png)

This gives us the following nice looking bar chart! Which you can customize further by clicking on __`Plot Options`__ as needed.

In [35]:
labels = sqlContext.sql("""
                           SELECT label, count(*) as freq
                           FROM connections
                           GROUP BY label
                           ORDER BY 2 DESC
                           """)
display(labels)

label,freq
smurf.,280790
neptune.,107201
normal.,97278
back.,2203
satan.,1589
ipsweep.,1247
portsweep.,1040
warezclient.,1020
teardrop.,979
pod.,264


Un autre moyen est d'écrire le code vous-même pour le faire. Vous pouvez extraire les données agrégées sous forme de DataFrame "pandas" et les représenter ensuite sous forme d'un diagramme à barres régulier.

In [37]:
labels_df = pd.DataFrame(labels.toPandas())
labels_df.set_index("label", drop=True,inplace=True)
labels_fig = labels_df.plot(kind='barh')

plt.rcParams["figure.figsize"] = (7, 5)
plt.rcParams.update({'font.size': 10})
plt.tight_layout()
display(labels_fig.figure)

### Connexions basées sur des protocoles et des attaques

Examinons quels sont les protocoles les plus vulnérables aux attaques actuellement basées sur la requête SQL suivante.

In [39]:
attack_protocol = sqlContext.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as freq
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
display(attack_protocol)

protocol_type,state,freq
icmp,attack,282314
tcp,attack,113252
tcp,no attack,76813
udp,no attack,19177
icmp,no attack,1288
udp,attack,1177


Eh bien, il semble que les connexions ICMP suivies par les connexions TCP ont eu le maximum d'attaques !

### Statistiques de connexion basées sur les protocoles et les attaques

Examinons quelques mesures statistiques relatives à ces protocoles et aux attaques pour nos demandes de connexion.

In [42]:
attack_stats = sqlContext.sql("""
                           SELECT 
                             protocol_type, 
                             CASE label
                               WHEN 'normal.' THEN 'no attack'
                               ELSE 'attack'
                             END AS state,
                             COUNT(*) as total_freq,
                             ROUND(AVG(src_bytes), 2) as mean_src_bytes,
                             ROUND(AVG(dst_bytes), 2) as mean_dst_bytes,
                             ROUND(AVG(duration), 2) as mean_duration,
                             SUM(num_failed_logins) as total_failed_logins,
                             SUM(num_compromised) as total_compromised,
                             SUM(num_file_creations) as total_file_creations,
                             SUM(su_attempted) as total_root_attempts,
                             SUM(num_root) as total_root_acceses
                           FROM connections
                           GROUP BY protocol_type, state
                           ORDER BY 3 DESC
                           """)
display(attack_stats)

protocol_type,state,total_freq,mean_src_bytes,mean_dst_bytes,mean_duration,total_failed_logins,total_compromised,total_file_creations,total_root_attempts,total_root_acceses
icmp,attack,282314,932.14,0.0,0.0,0,0,0,0.0,0
tcp,attack,113252,9880.38,881.41,23.19,57,2269,76,1.0,152
tcp,no attack,76813,1439.31,4263.97,11.08,18,2776,459,17.0,5456
udp,no attack,19177,98.01,89.89,1054.63,0,0,0,0.0,0
icmp,no attack,1288,91.47,0.0,0.0,0,0,0,0.0,0
udp,attack,1177,27.5,0.23,0.0,0,0,0,0.0,0


Il semble que la quantité moyenne de données transmises dans les requêtes TCP soit beaucoup plus élevée, ce qui n'est pas surprenant. Il est intéressant de noter que les attaques ont une charge utile moyenne de données transmises de la source à la destination beaucoup plus élevée.

#### Filtrage des statistiques de connexion basé sur le protocole TCP par service et type d'attaque

Examinons de plus près les attaques TCP étant donné que nous disposons de données et de statistiques plus pertinentes pour celles-ci. Nous allons maintenant agréger les différents types d'attaques TCP en fonction du service, du type d'attaque et observer différentes mesures.

In [45]:
tcp_attack_stats = sqlContext.sql("""
                                   SELECT 
                                     service,
                                     label as attack_type,
                                     COUNT(*) as total_freq,
                                     ROUND(AVG(duration), 2) as mean_duration,
                                     SUM(num_failed_logins) as total_failed_logins,
                                     SUM(num_file_creations) as total_file_creations,
                                     SUM(su_attempted) as total_root_attempts,
                                     SUM(num_root) as total_root_acceses
                                   FROM connections
                                   WHERE protocol_type = 'tcp'
                                   AND label != 'normal.'
                                   GROUP BY service, attack_type
                                   ORDER BY total_freq DESC
                                   """)
display(tcp_attack_stats)

service,attack_type,total_freq,mean_duration,total_failed_logins,total_file_creations,total_root_attempts,total_root_acceses
private,neptune.,101317,0.0,0,0,0.0,0
http,back.,2203,0.13,0,0,0.0,0
other,satan.,1221,0.0,0,0,0.0,0
private,portsweep.,725,1915.81,0,0,0.0,0
ftp_data,warezclient.,708,403.71,0,0,0.0,0
ftp,warezclient.,307,1063.79,0,0,0.0,0
other,portsweep.,260,1058.22,0,0,0.0,0
telnet,neptune.,197,0.0,0,0,0.0,0
http,neptune.,192,0.0,0,0,0.0,0
finger,neptune.,177,0.0,0,0,0.0,0


Il existe de nombreux types d'attaques et la sortie précédente montre une section spécifique de celles-ci.

### Filtrage des statistiques de connexion basé sur le protocole TCP par service et type d'attaque

Nous allons maintenant filtrer certains de ces types d'attaques en imposant certaines contraintes basées sur la durée, les créations de fichiers, les accès à la racine dans notre requête.

In [48]:
tcp_attack_stats = sqlContext.sql("""
                                   SELECT 
                                     service,
                                     label as attack_type,
                                     COUNT(*) as total_freq,
                                     ROUND(AVG(duration), 2) as mean_duration,
                                     SUM(num_failed_logins) as total_failed_logins,
                                     SUM(num_file_creations) as total_file_creations,
                                     SUM(su_attempted) as total_root_attempts,
                                     SUM(num_root) as total_root_acceses
                                   FROM connections
                                   WHERE (protocol_type = 'tcp'
                                          AND label != 'normal.')
                                   GROUP BY service, attack_type
                                   HAVING (mean_duration >= 50
                                           AND total_file_creations >= 5
                                           AND total_root_acceses >= 1)
                                   ORDER BY total_freq DESC
                                   """)
display(tcp_attack_stats)

service,attack_type,total_freq,mean_duration,total_failed_logins,total_file_creations,total_root_attempts,total_root_acceses
telnet,buffer_overflow.,21,130.67,0,15,0.0,5
telnet,loadmodule.,5,63.8,0,9,0.0,3
telnet,multihop.,2,458.0,0,8,0.0,93


Intéressant de voir des attaques multihop capables d'obtenir des accès root aux hôtes de destination !

### Sous-requêtes pour filtrer les types d'attaques TCP en fonction du service

Essayons d'obtenir toutes les attaques TCP basées sur le service et le type d'attaque de telle sorte que la durée moyenne globale de ces attaques soit supérieure à zéro (`> 0`). Pour cela, nous pouvons faire une requête interne avec toutes les statistiques d'agrégation, puis extraire les requêtes pertinentes et appliquer un filtre de durée moyenne dans la requête externe comme indiqué ci-dessous.

In [51]:
tcp_attack_stats = sqlContext.sql("""
                                   SELECT 
                                     t.service,
                                     t.attack_type,
                                     t.total_freq
                                   FROM
                                   (SELECT 
                                     service,
                                     label as attack_type,
                                     COUNT(*) as total_freq,
                                     ROUND(AVG(duration), 2) as mean_duration,
                                     SUM(num_failed_logins) as total_failed_logins,
                                     SUM(num_file_creations) as total_file_creations,
                                     SUM(su_attempted) as total_root_attempts,
                                     SUM(num_root) as total_root_acceses
                                   FROM connections
                                   WHERE protocol_type = 'tcp'
                                   AND label != 'normal.'
                                   GROUP BY service, attack_type
                                   ORDER BY total_freq DESC) as t
                                     WHERE t.mean_duration > 0 
                                   """)
display(tcp_attack_stats)

service,attack_type,total_freq
http,back.,2203
private,portsweep.,725
ftp_data,warezclient.,708
ftp,warezclient.,307
other,portsweep.,260
private,satan.,170
telnet,guess_passwd.,53
telnet,buffer_overflow.,21
ftp_data,warezmaster.,18
imap4,imap.,12


C'est bien ! Maintenant, une façon intéressante de visualiser ces données est d'utiliser un tableau croisé dynamique où un attribut représente les lignes et un autre les colonnes. Voyons si nous pouvons utiliser Spark DataFrames pour y parvenir !

### Construction d'un tableau croisé dynamique à partir de données agrégées

Ici, nous nous baserons sur l'objet DataFrame précédent que nous avons obtenu, où nous avons regroupé les attaques en fonction du type et du service. Pour cela, nous pouvons exploiter la puissance de Spark DataFrames et du DSL DataFrame.

In [54]:
display((tcp_attack_stats.groupby('service')
                         .pivot('attack_type')
                         .agg({'total_freq':'max'})
                         .na.fill(0))
)

service,back.,buffer_overflow.,ftp_write.,guess_passwd.,imap.,ipsweep.,loadmodule.,multihop.,perl.,phf.,portsweep.,rootkit.,satan.,spy.,warezclient.,warezmaster.
telnet,0,21,0,53,0,1,5,2,3,0,0,5,1,2,0,0
ftp,0,1,2,0,0,1,1,2,0,0,0,1,1,0,307,2
pop_3,0,0,0,0,0,0,0,0,0,0,3,0,1,0,0,0
discard,0,0,0,0,0,0,0,0,0,0,0,0,1,0,0,0
login,0,0,2,0,0,0,0,0,0,0,0,0,0,0,0,0
smtp,0,0,0,0,0,0,0,0,0,0,0,0,2,0,0,0
domain,0,0,0,0,0,1,0,0,0,0,0,0,0,0,0,0
http,2203,0,0,0,0,3,0,0,0,4,3,0,0,0,0,0
courier,0,0,0,0,0,0,0,0,0,0,1,0,0,0,0,0
other,0,0,0,0,0,0,0,0,0,0,260,0,0,0,5,0


Nous obtenons un joli tableau croisé dynamique montrant tous les événements en fonction du type de service et d'attaque !