# Fonctions SparkSQL

SparkSQL fournit un ensemble de fonctions intégrées utilisables avec les dataframes. Ces fonctions ont deux avantages :
 * Les fonctions intégrées sont généralement optimisées par le moteur Catalyst pour une meilleure performance.
 * Les fonctions intégrées sont disponibles dans toutes les installations Spark, ce qui facilite le partage et la portabilité du code.

Références :
 * Scala API : https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/functions$.html
 * SQL : https://spark.apache.org/docs/latest/api/sql/index.html

## Préambule

In [1]:
import $ivy.`org.apache.spark::spark-core:3.3.2`
import $ivy.`org.apache.spark::spark-sql:3.3.2`
import $ivy.`org.slf4j:slf4j-reload4j:2.0.6`

import org.apache.logging.log4j.Level
import org.apache.logging.log4j.core.config.Configurator

// Avoid disturbing logs
Configurator.setRootLevel(Level.OFF)

[32mimport [39m[36m$ivy.$                                   
[39m
[32mimport [39m[36m$ivy.$                                  
[39m
[32mimport [39m[36m$ivy.$                               

[39m
[32mimport [39m[36morg.apache.logging.log4j.Level
[39m
[32mimport [39m[36morg.apache.logging.log4j.core.config.Configurator

// Avoid disturbing logs
[39m

In [2]:
import org.apache.spark.sql._
import org.apache.spark.sql.functions._
import org.apache.spark.rdd._

val spark = {
  NotebookSparkSession.builder()
    .master("local[*]")
    // L'appel ci-dessous sert à donner un nom à votre application
    // Ce apparaîtra notamment dans la Spark UI
    .appName("Sales Analysis - SparkSQL")
    .getOrCreate()
}

import spark.implicits._

// Ce script fournit que élément supplémentaires pour rendre l'affichage plus confortable
import $file.^.internal.spark_helper, spark_helper._

Loading spark-stubs
Getting spark JARs


SLF4J: No SLF4J providers were found.
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See https://www.slf4j.org/codes.html#noProviders for further details.


Creating SparkSession


[32mimport [39m[36morg.apache.spark.sql._
[39m
[32mimport [39m[36morg.apache.spark.sql.functions._
[39m
[32mimport [39m[36morg.apache.spark.rdd._

[39m
[36mspark[39m: [32mSparkSession[39m = org.apache.spark.sql.SparkSession@7ecde642
[32mimport [39m[36mspark.implicits._

// Ce script fournit que élément supplémentaires pour rendre l'affichage plus confortable
[39m
[32mimport [39m[36m$file.$                      , spark_helper._[39m

## Tweets

Pour cet atelier, nous allons utiliser un dataser contenant des tweets au format JSON.

In [3]:
val tweets =
  spark.read
    .json("data/tweets.json.gz")
    .cache()
    .where($"_corrupt_record".isNull)
    .drop("_corrupt_record")

tweets.showHTML(limit=10)

contributors,coordinates,created_at,current_user_retweet,display_text_range,entities,extended_entities,extended_tweet,favorite_count,favorited,filter_level,geo,id,id_str,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_status_id_str,in_reply_to_user_id,in_reply_to_user_id_str,is_quote_status,lang,metadata,place,possibly_sensitive,quoted_status,quoted_status_id,quoted_status_id_str,retweet_count,retweeted,retweeted_status,scopes,source,text,truncated,user,withheld_copyright,withheld_in_countries,withheld_scope
[],,1632390898000,,,"[null,ArraySeq([A...",,,0,False,,,1440977986258354178,1440977986258354178,,,,,,False,en,"[en,recent]",,False,,,,10,False,"[ArraySeq(),null,...",[],"<a href=""https://...",RT @bigdataconf: ...,False,"[false,false,fals...",False,[],
[],,1632390146000,,,"[null,ArraySeq(),...",,,2,False,,,1440974830694715394,1440974830694715394,0x627A,1.4409744305246577e+18,1.4409744305246577e+18,1.0260159117973912e+18,1.0260159117973912e+18,False,pl,"[pl,recent]",,False,,,,0,False,,[],"<a href=""https://...",@0x627A Ja akurat...,True,"[false,false,fals...",False,[],
[],,1632389583000,,,"[null,ArraySeq(),...",,,0,False,,,1440972470945816576,1440972470945816576,,,,,,False,fr,"[fr,recent]",,False,,,,0,False,,[],"<a href=""https://...",Comment choisir u...,False,"[false,false,fals...",False,[],
[],,1632389234000,,,"[null,ArraySeq(),...",,,0,False,,,1440971004378030081,1440971004378030081,,,,,,False,en,"[en,recent]",,False,,,,7,False,"[ArraySeq(),null,...",[],"<a href=""https://...",RT @aahiknsv: Ful...,False,"[false,false,fals...",False,[],
[],,1632388896000,,,"[null,ArraySeq(),...",,,0,False,,,1440969590578704389,1440969590578704389,,,,,,False,en,"[en,recent]",,False,,,,7,False,"[ArraySeq(),null,...",[],"<a href=""https://...",RT @aahiknsv: Ful...,False,"[false,false,fals...",False,[],
[],,1632388186000,,,"[null,ArraySeq([A...",,,0,False,,,1440966612379574280,1440966612379574280,,,,,,False,en,"[en,recent]",,False,,,,2,False,"[ArraySeq(),null,...",[],"<a href=""https://...",RT @urbancompany_...,False,"[false,false,fals...",False,[],
[],,1632387911000,,,"[null,ArraySeq(),...",,,0,False,,,1440965458086154246,1440965458086154246,,,,,,False,en,"[en,recent]",,False,,,,7,False,"[ArraySeq(),null,...",[],"<a href=""http://t...",RT @aahiknsv: Ful...,False,"[false,false,fals...",False,[],
[],,1632387775000,,,"[null,ArraySeq([A...",,,0,False,,,1440964886884798469,1440964886884798469,,,,,,False,en,"[en,recent]",,False,,,,1,False,"[ArraySeq(),null,...",[],"<a href=""http://t...",RT @ra9z: 🎙️ E4:...,False,"[false,false,fals...",False,[],
[],,1632387762000,,,"[null,ArraySeq([A...",,,0,False,,,1440964834250559488,1440964834250559488,,,,,,False,en,"[en,recent]",,False,,,,2,False,"[ArraySeq(),null,...",[],"<a href=""http://t...",RT @urbancompany_...,False,"[false,false,fals...",False,[],
[],,1632387602000,,,"[null,ArraySeq([A...",,,0,False,,,1440964159433105416,1440964159433105416,,,,,,False,en,"[en,recent]",,False,,,,0,False,,[],"<a href=""https://...",Would you recomme...,False,"[false,false,fals...",False,[],


[36mtweets[39m: [32mDataFrame[39m = [contributors: array<string>, coordinates: string ... 36 more fields]

Le [schéma des tweets](https://developer.twitter.com/en/docs/twitter-api/data-dictionary/object-model/tweet) est imposant et contient des sous-structures.

Note : sur l'affichage ci-dessous, n'hésitez pas à cliquer-droit sur l'affichage du schéma et à sélectionner "Enable Scrolling for Outputs".

In [69]:
tweets.printSchema()

root
 |-- contributors: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- coordinates: string (nullable = true)
 |-- created_at: long (nullable = true)
 |-- current_user_retweet: string (nullable = true)
 |-- display_text_range: string (nullable = true)
 |-- entities: struct (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- display_url: string (nullable = true)
 |    |    |    |-- expanded_url: string (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- id_str: string (nullable = true)
 |    |    |    |-- indices: array (nullable = true)
 |   

## Les langues représentées

Dans chaque tweet, il y a un champ `lang`, qui indique la langue dans laquelle a été écrit le tweet. La valeur de champ est décrit par la norme BCP47 ([liste des langues](http://www.iana.org/assignments/language-subtag-registry/language-subtag-registry)).

Donner la liste des langues utilisées dans les tweets, en les classant de la plus utilisée à la moins utilisée.

In [72]:
val result =
  tweets
    .groupBy($"lang")
    // .count()
    .agg(count($"lang").as("count"))
    .orderBy($"count".desc)

result.showHTML()
result.explain()

lang,count
en,2801
ja,117
pt,50
es,37
it,29
ca,24
fr,23
fa,12
und,11
ru,10


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#5926L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#5926L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=2833]
      +- HashAggregate(keys=[lang#3676], functions=[count(lang#3676)])
         +- Exchange hashpartitioning(lang#3676, 200), ENSURE_REQUIREMENTS, [plan_id=2830]
            +- HashAggregate(keys=[lang#3676], functions=[partial_count(lang#3676)])
               +- Project [lang#3676]
                  +- Filter isnull(_corrupt_record#3655)
                     +- InMemoryTableScan [_corrupt_record#3655, lang#3676], [isnull(_corrupt_record#3655)]
                           +- InMemoryRelation [_corrupt_record#3655, contributors#3656, coordinates#3657, created_at#3658L, current_user_retweet#3659, display_text_range#3660, entities#3661, extended_entities#3662, extended_tweet#3663, favorite_count#3664L, favorited#3665, filter_level#3666, geo#3667, id#3668L, id_str#3669, in_reply_to_scre

[36mresult[39m: [32mDataset[39m[[32mRow[39m] = [lang: string, count: bigint]

**Ce qu'il faut voir**

La fonction `count` sous deux formes dans le plan d'exécution, dans l'ordre :
 * `HashAggregate(keys=[lang#29], functions=[partial_count(1)])`
 * `HashAggregate(keys=[lang#29], functions=[count(1)])`

Ce plan montre que Spark va organiser le décompte des lignes en deux phases. Dans une première phase, chaque exécuteur va compter de son côté le nombre de lignes par langue (`partial_count`). Puis, les décomptes partiels sont redistribués selon un _hash_ calculé sur la langue (`Exchange hashpartitioning(lang#3676, 200), ENSURE_REQUIREMENTS, [plan_id=2830]`), afin d'avoir les décomptes partiels d'une même langue au sein du même exécuteur. Le traitement se termine alors par la somme des décomptes partiels par langues (`count`).

## Utilisateurs

Dans un tweet, un utilisateur est représenté par une sous-structure contenant plusieurs informations, dont l'ID, le nom, le site Web...

Donner la liste du _screen_name_ des utilisateurs apparaissant dans les tweets, en les classant de l'utilisateur ayant le plus tweeté à l'utilisateur ayant le moins tweeté.

Note : Pour accéder à un champ `b` d'une sous-structure `a`, vous devez utiliser la syntaxe `a.b`.

In [73]:
val result =
  tweets
    .groupBy($"user.screen_name")
    .count()
    .orderBy($"count".desc)

result.showHTML()

screen_name,count
austin_castel,53
opensource_orgs,44
ServerlessFan,42
iPythonistaBot,38
DeveloperBot_v1,36
bigdataconf,32
DjangoBot_,31
CodeWithTwitchi,26
hubofml,24
PythonExpertBot,23


[36mresult[39m: [32mDataset[39m[[32mRow[39m] = [screen_name: string, count: bigint]

## Hashtags

Les tweets peuvent contenir des hashtags, qui sont des mots apparaissant dans le texte du tweet précédé d'un _hash_ (`#`).

Dans la structure du tweet, les hashtags apparaissent dans le champ `entities`, qui est une structure. Cette structure contient le champ `hashtags`. Ce champ `hashtags` est un _array_ (ou liste). Celui-ci un champ `text`.

Voici un extrait du schéma qui nous intéresse :

```
 |-- entities: struct (nullable = true)
 |    |-- description: string (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
```

Nous voulons la liste des hashtags les plus utilisés.

Il est possible de vérifier si un tweet contient des hashtags. Pour cela, vous pouvez utiliser l'opération `.where()` et utiliser la fonction `size()` sur le champ `entities.hashtags`.

Pour décomposer une liste, vous pouvez utiliser la fonction `explode` sur le champ `entities.hashtags` dans un `.select()`. Cette fonction crée une nouvelle pour chaque élément contenu dans la liste.

In [78]:
val result = 
  tweets
    .where(size($"entities.hashtags") > 0)
    .select(explode($"entities.hashtags").as("hashtag"))
    .select($"hashtag.text")
    .groupBy($"text")
    .count()
    .orderBy($"count".desc)

result.showHTML()
result.explain()

text,count
NoSQL,303
DataScience,290
MachineLearning,189
GridDB,173
IoT,150
BigData,138
AI,116
geospatial,109
TimeSeries,109
NYC,109


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [count#11694L DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(count#11694L DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=3140]
      +- HashAggregate(keys=[text#11690], functions=[count(1)])
         +- Exchange hashpartitioning(text#11690, 200), ENSURE_REQUIREMENTS, [plan_id=3137]
            +- HashAggregate(keys=[text#11690], functions=[partial_count(1)])
               +- Project [hashtag#11688 AS text#11690]
                  +- Generate explode(_extract_text#13074), false, [hashtag#11688]
                     +- Project [entities#3661.hashtags.text AS _extract_text#13074]
                        +- Filter (((isnull(_corrupt_record#3655) AND (size(entities#3661.hashtags, true) > 0)) AND (size(entities#3661.hashtags.text, true) > 0)) AND isnotnull(entities#3661.hashtags.text))
                           +- InMemoryTableScan [_corrupt_record#3655, entities#3661], [isnull(_corrupt_record#3655), (size(en

[36mresult[39m: [32mDataset[39m[[32mRow[39m] = [text: string, count: bigint]

Moyenne du nombre de tweets envoyés par tranche horaire.

In [41]:
val result = df
  .where($"created_at".isNotNull)
  .select($"created_at", date_trunc("hour", to_timestamp($"created_at" / 1000)).as("date_hour"))
  .groupBy($"date_hour")
  .count()
  .groupBy(hour($"date_hour").as("hour"))
  .agg(avg($"count").as("avg"))
  .orderBy($"hour".desc)

result.showHTML(limit=24,truncate=40)
result.explain()

hour,avg
23,9.5
22,7.444444444444445
21,11.666666666666666
20,20.55555555555556
19,18.88888888888889
18,22.77777777777778
17,23.11111111111111
16,25.88888888888889
15,15.77777777777778
14,15.444444444444445


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- Sort [hour#726 DESC NULLS LAST], true, 0
   +- Exchange rangepartitioning(hour#726 DESC NULLS LAST, 200), ENSURE_REQUIREMENTS, [plan_id=1742]
      +- HashAggregate(keys=[_groupingexpression#752], functions=[avg(count#723L)])
         +- Exchange hashpartitioning(_groupingexpression#752, 200), ENSURE_REQUIREMENTS, [plan_id=1739]
            +- HashAggregate(keys=[_groupingexpression#752], functions=[partial_avg(count#723L)])
               +- HashAggregate(keys=[date_hour#717], functions=[count(1)])
                  +- Exchange hashpartitioning(date_hour#717, 200), ENSURE_REQUIREMENTS, [plan_id=1735]
                     +- HashAggregate(keys=[date_hour#717], functions=[partial_count(1)])
                        +- Project [date_trunc(hour, cast((cast(created_at#11L as double) / 1000.0) as timestamp), Some(GMT)) AS date_hour#717]
                           +- Filter isnotnull(created_at#11L)
                              +- Fi

[36mresult[39m: [32mDataset[39m[[32mRow[39m] = [hour: int, avg: double]

In [50]:
val result = df
  .where($"created_at".isNotNull)
  .select($"created_at", hour(to_timestamp($"created_at" / 1000)).as("hour"))

result.showHTML(limit=10, truncate=40)
result.explain()

created_at,hour
1632390898000,9
1632390146000,9
1632389583000,9
1632389234000,9
1632388896000,9
1632388186000,9
1632387911000,9
1632387775000,9
1632387762000,9
1632387602000,9


== Physical Plan ==
*(1) Project [created_at#11L, hour(cast((cast(created_at#11L as double) / 1000.0) as timestamp), Some(GMT)) AS hour#840]
+- *(1) Filter isnotnull(created_at#11L)
   +- FileScan json [created_at#11L] Batched: false, DataFilters: [isnotnull(created_at#11L)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/work/02_spark_sql/data/tweets.json.gz], PartitionFilters: [], PushedFilters: [IsNotNull(created_at)], ReadSchema: struct<created_at:bigint>




[36mresult[39m: [32mDataFrame[39m = [created_at: bigint, hour: int]

## User-Defined Function (UDF)


In [51]:
import java.time._

def hourOfEpochMilli(epochMilli: Long): Int = {
  val instant = Instant.ofEpochMilli(epochMilli)
  val dateTime = LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
  dateTime.getHour()
}

val hourOfEpochMilli_udf = udf[Int, Long](hourOfEpochMilli).withName("hourOfEpochMilli")

val result = df
  .where($"created_at".isNotNull)
  .select($"created_at", hourOfEpochMilli_udf($"created_at").as("hour"))

result.showHTML(limit=10, truncate=40)
result.explain()

created_at,hour
1632390898000,9
1632390146000,9
1632389583000,9
1632389234000,9
1632388896000,9
1632388186000,9
1632387911000,9
1632387775000,9
1632387762000,9
1632387602000,9


== Physical Plan ==
*(1) Project [created_at#11L, if (isnull(created_at#11L)) null else hourOfEpochMilli(knownnotnull(created_at#11L)) AS hour#851]
+- *(1) Filter isnotnull(created_at#11L)
   +- FileScan json [created_at#11L] Batched: false, DataFilters: [isnotnull(created_at#11L)], Format: JSON, Location: InMemoryFileIndex(1 paths)[file:/home/jovyan/work/02_spark_sql/data/tweets.json.gz], PartitionFilters: [], PushedFilters: [IsNotNull(created_at)], ReadSchema: struct<created_at:bigint>




[32mimport [39m[36mjava.time._

[39m
defined [32mfunction[39m [36mhourOfEpochMilli[39m
[36mhourOfEpochMilli_udf[39m: [32mexpressions[39m.[32mUserDefinedFunction[39m = [33mSparkUserDefinedFunction[39m(
  f = ammonite.$sess.cmd50$Helper$$Lambda$6559/1533161759@5717a94b,
  dataType = IntegerType,
  inputEncoders = [33mList[39m(
    [33mSome[39m(
      value = [33mExpressionEncoder[39m(
        objSerializer = [33mBoundReference[39m(
          ordinal = [32m0[39m,
          dataType = LongType,
          nullable = [32mfalse[39m
        ),
        objDeserializer = [33mAssertNotNull[39m(
          child = [33mUpCast[39m(
            child = [33mGetColumnByOrdinal[39m(ordinal = [32m0[39m, dataType = LongType),
            target = LongType,
            walkedTypePath = [33mList[39m([32m"- root class: \"scala.Long\""[39m)
          ),
          walkedTypePath = [33mList[39m([32m"- root class: \"scala.Long\""[39m)
        ),
        clsTag = Long
  

In [60]:
val hashtag = df
  .where(size($"entities.hashtags") > 0)
  .select(explode($"entities.hashtags").as("hashtag"))
  .select($"hashtag.text")
  .groupBy($"text")
  .count()
  .orderBy($"count".desc)
  .select($"text")
  .first()(0)

println(hashtag)
//result.printSchema
//result.showHTML()

NoSQL


[36mresult[39m: [32mAny[39m = [32m"NoSQL"[39m

In [53]:
df.

id,clientId,timestamp,product,price
87365481,XztHU0aeUckvR7AC,2022-11-14 13:25:...,café allongé,1.4
42761208,t_CUBr6tyTQxGj2X,2022-11-14 13:29:...,café crème,2.5
90524048,hdVMQjoIgOov09zb,2022-11-14 13:34:...,chocolat chaud,2.6
9935741,hdVMQjoIgOov09zb,2022-11-14 13:37:...,chocolat chaud,2.6
3486136,TX7wC0pTqCRlCOhi,2022-11-14 13:40:...,expresso,1.1
46727424,H-Mp22FLe99MNhRa,2022-11-14 13:45:...,décaféiné,1.4
97190478,oplTx8h-38G3be4c,2022-11-14 13:50:...,décaféiné,1.4
49642764,TX7wC0pTqCRlCOhi,2022-11-14 13:53:...,expresso,1.1
33866371,JBoCs7rWb_jEs87W,2022-11-14 13:56:...,double café,2.6
55962364,t_CUBr6tyTQxGj2X,2022-11-14 16:00:...,expresso,1.1


In [82]:
tweets.rollup("lang", "user.screen_name").count().orderBy($"count".desc).showHTML()

lang,screen_name,count
,,3149
en,,2801
ja,,117
en,austin_castel,53
pt,,50
en,opensource_orgs,43
en,ServerlessFan,42
en,iPythonistaBot,38
es,,37
en,DeveloperBot_v1,36


In [5]:
import java.time._

def hourOfEpochMilli(epochMilli: Long): Int = {
  val instant = Instant.ofEpochMilli(epochMilli)
  val dateTime = LocalDateTime.ofInstant(instant, ZoneOffset.UTC)
  dateTime.getHour()
}

val hourOfEpochMilli_udf = udf[Int, Long](hourOfEpochMilli).withName("hourOfEpochMilli")

tweets.createOrReplaceTempView("tweets")

spark.udf.register("hourOfEpochMilli", hourOfEpochMilli_udf)

spark.sql("""
SELECT created_at, hourOfEpochMilli(created_at) AS hour
FROM tweets
""").showHTML()

created_at,hour
1632390898000,9
1632390146000,9
1632389583000,9
1632389234000,9
1632388896000,9
1632388186000,9
1632387911000,9
1632387775000,9
1632387762000,9
1632387602000,9


[32mimport [39m[36mjava.time._

[39m
defined [32mfunction[39m [36mhourOfEpochMilli[39m
[36mhourOfEpochMilli_udf[39m: [32mexpressions[39m.[32mUserDefinedFunction[39m = [33mSparkUserDefinedFunction[39m(
  f = ammonite.$sess.cmd4$Helper$$Lambda$5857/309698468@40e4dd6,
  dataType = IntegerType,
  inputEncoders = [33mList[39m(
    [33mSome[39m(
      value = [33mExpressionEncoder[39m(
        objSerializer = [33mBoundReference[39m(
          ordinal = [32m0[39m,
          dataType = LongType,
          nullable = [32mfalse[39m
        ),
        objDeserializer = [33mAssertNotNull[39m(
          child = [33mUpCast[39m(
            child = [33mGetColumnByOrdinal[39m(ordinal = [32m0[39m, dataType = LongType),
            target = LongType,
            walkedTypePath = [33mList[39m([32m"- root class: \"scala.Long\""[39m)
          ),
          walkedTypePath = [33mList[39m([32m"- root class: \"scala.Long\""[39m)
        ),
        clsTag = Long
     