# Spark Beer Analysis #
Analyse de dataset de bières, avec Apache Spark.



[Guide de styles BJCP](https://www.brassageamateur.com/wiki/index.php/Guide_de_styles_BJCP) (Beer Judge Certification Program ou "Programme de certification de juge de bière")

[Dictionnaire Anglais-Français des termes brassicoles](https://www.brassageamateur.com/wiki/index.php/Dictionnaire_Anglais-Fran%C3%A7ais_des_termes_brassicoles)

[Lexique francais/anglais](https://www.brassageamateur.com/wiki/index.php/Lexique_francais/anglais)


Dataset:
- [Recettes de bière](https://www.kaggle.com/jtrofe/beer-recipes/kernels) scraped from [Brewer's Friend](https://www.brewersfriend.com/)
- [Critiques de bières](https://data.world/socialmediadata/beeradvocate)


## TODO

- [X] Charger le fichier des critiques et trouver les style de bières les plus appréciés
- [ ] Chercher parmis les résultats des corrélations entre note et degré d'alcool et amertume
- [ ] Charger le fichier de recettes et filtrer sur les styles trouvés précédement
- [ ] puis grouper par ABV / IBU


In [1]:
val spark = SparkSession.builder.config(sc.getConf).getOrCreate

// Configure checkpoint
val savePath = "./"
spark.sparkContext.setCheckpointDir(savePath)

spark = org.apache.spark.sql.SparkSession@12b2bb45
savePath = ./


./

## Reviews

In [2]:
val reviews = spark.read
    .format("csv")
    .option("header","true")
    .option("delimiter", ",")
    .option("inferSchema", "true")
    .load("./data/beer_reviews.csv")

reviews.printSchema()

root
 |-- brewery_id: integer (nullable = true)
 |-- brewery_name: string (nullable = true)
 |-- review_time: integer (nullable = true)
 |-- review_overall: double (nullable = true)
 |-- review_aroma: double (nullable = true)
 |-- review_appearance: double (nullable = true)
 |-- review_profilename: string (nullable = true)
 |-- beer_style: string (nullable = true)
 |-- review_palate: double (nullable = true)
 |-- review_taste: double (nullable = true)
 |-- beer_name: string (nullable = true)
 |-- beer_abv: double (nullable = true)
 |-- beer_beerid: integer (nullable = true)



reviews = [brewery_id: int, brewery_name: string ... 11 more fields]


[brewery_id: int, brewery_name: string ... 11 more fields]

In [3]:
val reviewFilter = $"review_taste" > 4 &&
    $"review_palate" > 4 &&
    $"review_aroma" > 4 &&
    $"review_overall" > 3.5

val goodBeers = reviews.filter(reviewFilter).checkpoint()

val goodBeerCountByStyle = goodBeers
    .groupBy("beer_style")
    .count().sort($"count".desc)

val countByStyle = reviews.groupBy("beer_style").count().withColumnRenamed("count","total")

val bestStyles = goodBeerCountByStyle.join(countByStyle, "beer_style")
// or join(countByStyle, Seq("beer_style"), "inner") possible join types: inner, outer, left_outer, right_outer, leftsemi
    .withColumn("percentage", goodBeerCountByStyle.col("count") / countByStyle.col("total")  * 100)
    .sort($"percentage".desc)
    .filter($"percentage" > 19)

bestStyles.show(bestStyles.count().toInt, false)  // false, not truncate column name


+--------------------------------+-----+-----+------------------+
|beer_style                      |count|total|percentage        |
+--------------------------------+-----+-----+------------------+
|Quadrupel (Quad)                |4279 |18086|23.659183899148513|
|American Double / Imperial Stout|11698|50705|23.070703086480624|
|American Wild Ale               |3562 |17794|20.017983589974147|
|Gueuze                          |1174 |6009 |19.537360625728077|
|Eisbock                         |513  |2663 |19.263987983477282|
|Russian Imperial Stout          |10329|54129|19.08219254004323 |
+--------------------------------+-----+-----+------------------+



reviewFilter = ((((review_taste > 4) AND (review_palate > 4)) AND (review_aroma > 4)) AND (review_overall > 3.5))
goodBeers = [brewery_id: int, brewery_name: string ... 11 more fields]
goodBeerCountByStyle = [beer_style: string, count: bigint]
countByStyle = [beer_style: string, total: bigint]
bestStyles = [beer_style: string, count: bigint ... 2 more fields]


[beer_style: string, count: bigint ... 2 more fields]

In [4]:
%%dataframe
bestStyles

beer_style,count,total,percentage
Quadrupel (Quad),4279,18086,23.659183899148516
American Double / Imperial Stout,11698,50705,23.070703086480624
American Wild Ale,3562,17794,20.017983589974147
Gueuze,1174,6009,19.53736062572808
Eisbock,513,2663,19.26398798347728
Russian Imperial Stout,10329,54129,19.08219254004323


## Recipies

### Styles

In [5]:
val styles = spark.read
    .format("csv")
    .option("header","true")
    .option("delimiter", ",")
    .option("inferSchema", "true")
    .load("./data/styleData.csv")

styles.printSchema()

root
 |-- Style: string (nullable = true)
 |-- StyleID: integer (nullable = true)



styles = [Style: string, StyleID: int]


[Style: string, StyleID: int]

In [6]:
%%dataframe
styles

Style,StyleID
Altbier,1
Alternative Grain Beer,2
Alternative Sugar Beer,3
American Amber Ale,4
American Barleywine,5
American Brown Ale,6
American IPA,7
American Lager,8
American Light Lager,9
American Pale Ale,10


In [7]:
//val bestStyles = bestReviewBeerStyles.select("beer_style").map(_.getString(0)).collect.toSeq
val bestStylesList = bestStyles.select("beer_style").as[String].collect.toSeq
val stylesInBestStyles = styles.filter($"Style" isin (bestStylesList: _*))
stylesInBestStyles.show

+--------------------+-------+
|               Style|StyleID|
+--------------------+-------+
|             Eisbock|     62|
|              Gueuze|     83|
|Russian Imperial ...|    132|
+--------------------+-------+



bestStylesList = WrappedArray(Quadrupel (Quad), American Double / Imperial Stout, American Wild Ale, Gueuze, Eisbock, Russian Imperial Stout)
stylesInBestStyles = [Style: string, StyleID: int]


[Style: string, StyleID: int]

### Recipe

In [8]:
val recipes = spark.read
    .format("csv")
    .option("header","true")
    .option("delimiter", ",")
    .option("inferSchema", "true")
    .load("./data/recipeData.csv")

recipes.printSchema

root
 |-- BeerID: integer (nullable = true)
 |-- Name: string (nullable = true)
 |-- URL: string (nullable = true)
 |-- Style: string (nullable = true)
 |-- StyleID: integer (nullable = true)
 |-- Size(L): double (nullable = true)
 |-- OG: double (nullable = true)
 |-- FG: double (nullable = true)
 |-- ABV: double (nullable = true)
 |-- IBU: double (nullable = true)
 |-- Color: double (nullable = true)
 |-- BoilSize: double (nullable = true)
 |-- BoilTime: integer (nullable = true)
 |-- BoilGravity: string (nullable = true)
 |-- Efficiency: double (nullable = true)
 |-- MashThickness: string (nullable = true)
 |-- SugarScale: string (nullable = true)
 |-- BrewMethod: string (nullable = true)
 |-- PitchRate: string (nullable = true)
 |-- PrimaryTemp: string (nullable = true)
 |-- PrimingMethod: string (nullable = true)
 |-- PrimingAmount: string (nullable = true)
 |-- UserId: integer (nullable = true)



recipes = [BeerID: int, Name: string ... 21 more fields]


[BeerID: int, Name: string ... 21 more fields]

In [9]:
val bestStylesIds = stylesInBestStyles.select("StyleID").as[Int].collect.toSeq
val recipesWithGoodStyle = recipes.filter($"StyleID" isin (bestStylesIds: _*))

bestStylesIds = WrappedArray(62, 83, 132)
recipesWithGoodStyle = [BeerID: int, Name: string ... 21 more fields]


[BeerID: int, Name: string ... 21 more fields]

In [10]:
%%dataframe
recipesWithGoodStyle

BeerID,Name,URL,Style,StyleID,Size(L),OG,FG,ABV,IBU,Color,BoilSize,BoilTime,BoilGravity,Efficiency,MashThickness,SugarScale,BrewMethod,PitchRate,PrimaryTemp,PrimingMethod,PrimingAmount,UserId
59,Bakke Brygg Imperial Stout 20 L,/homebrew/recipe/view/83233/bakke-brygg-imperial-stout-20-l,Russian Imperial Stout,132,20.0,1.098,1.025,9.67,69.25,50.0,24.0,90,1.082,65.0,,Specific Gravity,All Grain,1.0,18.0,Sukkerlake,5 5 g sukker/L,18325.0
148,Speedway Stout Clone,/homebrew/recipe/view/215423/speedway-stout-clone,Russian Imperial Stout,132,14.2,1.124,1.024,13.24,69.09,40.0,23.66,90,1.075,80.1,1.5,Specific Gravity,All Grain,,20.0,,,
155,Hunahpu (clone),/homebrew/recipe/view/106240/hunahpu-clone-,Russian Imperial Stout,132,22.71,1.13,1.036,12.35,114.57,40.0,34.07,180,1.086,80.0,1.25,Specific Gravity,All Grain,1.0,16.67,Corn sugar,2.5oz,
186,Traditional German Gose,/homebrew/recipe/view/14429/traditional-german-gose,Gueuze,83,41.64,1.049,1.01,5.11,12.23,4.47,49.21,90,,72.0,,Specific Gravity,All Grain,,,,,380.0
190,Yeti Imperial Stout (Great Divide Clone),/homebrew/recipe/view/239676/yeti-imperial-stout-great-divide-clone-,Russian Imperial Stout,132,20.82,1.09,1.019,9.26,88.9,50.0,24.61,60,1.076,70.0,1.3,Specific Gravity,All Grain,0.35,21.11,,,
205,Chocolate Orange Stout,/homebrew/recipe/view/18841/chocolate-orange-stout,Russian Imperial Stout,132,22.71,1.096,1.027,9.18,35.95,29.92,28.39,60,,70.0,,Specific Gravity,All Grain,,,,,3750.0
275,Imperial Stout,/homebrew/recipe/view/18479/imperial-stout,Russian Imperial Stout,132,43.53,1.09,1.021,8.95,83.29,39.77,53.0,90,1.074,75.0,1.33,Specific Gravity,All Grain,1.25,17.78,,,2111.0
303,Dark Heart Root Beer,/homebrew/recipe/view/47893/dark-heart-root-beer,Russian Imperial Stout,132,20.82,1.101,1.025,9.9,60.65,40.0,28.39,60,,75.0,,Specific Gravity,All Grain,,20.0,Corn Sugar,4 oz,
335,Coffee Blackout Stout,/homebrew/recipe/view/11157/coffee-blackout-stout,Russian Imperial Stout,132,20.82,1.098,1.024,9.61,86.79,40.0,26.5,90,,65.0,,Specific Gravity,All Grain,,,Force Carb,,955.0
340,Dark Lord clone,/homebrew/recipe/view/231334/dark-lord-clone,Russian Imperial Stout,132,41.64,1.136,1.031,13.69,79.54,50.0,58.67,120,1.096,70.0,1.5,Specific Gravity,All Grain,0.5,20.0,,,


In [11]:
import org.apache.spark.sql.functions.round

val goodBeerCountByDegree = goodBeers
    .withColumn("beer_abv_rounded", round($"beer_abv"))
    .groupBy("beer_abv_rounded")
    .count().sort($"count".desc)

goodBeerCountByDegree = [beer_abv_rounded: double, count: bigint]


[beer_abv_rounded: double, count: bigint]

In [19]:
val countByDegree = reviews.withColumn("beer_abv_rounded", round($"beer_abv")).groupBy("beer_abv_rounded").count().withColumnRenamed("count","total")

val bestDegree = goodBeerCountByDegree.join(countByDegree, "beer_abv_rounded")
// or join(countByStyle, Seq("beer_style"), "inner") possible join types: inner, outer, left_outer, right_outer, leftsemi
    .withColumn("percentage", goodBeerCountByDegree.col("count") / countByDegree.col("total")  * 100)
    .sort($"percentage".desc)

bestDegree.show(bestDegree.count().toInt, false)  // false, not truncate column name

+----------------+-----+------+------------------+
|beer_abv_rounded|count|total |percentage        |
+----------------+-----+------+------------------+
|29.0            |12   |16    |75.0              |
|39.0            |6    |10    |60.0              |
|21.0            |12   |22    |54.54545454545454 |
|24.0            |8    |20    |40.0              |
|26.0            |27   |74    |36.486486486486484|
|27.0            |126  |355   |35.49295774647888 |
|15.0            |1622 |5162  |31.421929484695855|
|19.0            |57   |188   |30.319148936170215|
|25.0            |9    |31    |29.03225806451613 |
|13.0            |2671 |10358 |25.78683143463989 |
|14.0            |998  |4357  |22.905669038329126|
|11.0            |12496|63196 |19.773403379960758|
|20.0            |10   |51    |19.607843137254903|
|12.0            |5727 |29884 |19.164101191272923|
|16.0            |187  |1033  |18.102613746369798|
|41.0            |12   |76    |15.789473684210526|
|22.0            |7    |45    |

countByDegree = [beer_abv_rounded: double, total: bigint]
bestDegree = [beer_abv_rounded: double, count: bigint ... 2 more fields]


[beer_abv_rounded: double, count: bigint ... 2 more fields]

In [21]:
val recipesWithGoodStyleAndAbv = recipesWithGoodStyle.filter($"abv" >= 13 && $"abv" <= 39).sort($"abv".desc)
recipesWithGoodStyleAndAbv.show

+------+--------------------+--------------------+--------------------+-------+-------+-------+-------+-----+------+-----+--------+--------+-----------+----------+-------------+----------------+------------+---------+-----------+-------------+-------------+------+
|BeerID|                Name|                 URL|               Style|StyleID|Size(L)|     OG|     FG|  ABV|   IBU|Color|BoilSize|BoilTime|BoilGravity|Efficiency|MashThickness|      SugarScale|  BrewMethod|PitchRate|PrimaryTemp|PrimingMethod|PrimingAmount|UserId|
+------+--------------------+--------------------+--------------------+-------+-------+-------+-------+-----+------+-----+--------+--------+-----------+----------+-------------+----------------+------------+---------+-----------+-------------+-------------+------+
| 31052|San Antonio Imper...|/homebrew/recipe/...|Russian Imperial ...|    132|   21.0|  1.351|  1.083| 35.1| 66.42|10.29|   100.0|      60|      1.074|      35.0|          N/A|Specific Gravity|Partial Mas

recipesWithGoodStyleAndAbv = [BeerID: int, Name: string ... 21 more fields]


[BeerID: int, Name: string ... 21 more fields]

In [20]:
recipesWithGoodStyleAndAbv.show(recipesWithGoodStyleAndAbv.count.toInt, true)

+------+--------------------+--------------------+--------------------+-------+-------+-------+-------+-----+------+-----+--------+--------+-----------+----------+-------------+----------------+------------+---------+-----------+-------------+-------------+------+
|BeerID|                Name|                 URL|               Style|StyleID|Size(L)|     OG|     FG|  ABV|   IBU|Color|BoilSize|BoilTime|BoilGravity|Efficiency|MashThickness|      SugarScale|  BrewMethod|PitchRate|PrimaryTemp|PrimingMethod|PrimingAmount|UserId|
+------+--------------------+--------------------+--------------------+-------+-------+-------+-------+-----+------+-----+--------+--------+-----------+----------+-------------+----------------+------------+---------+-----------+-------------+-------------+------+
|   148|Speedway Stout Clone|/homebrew/recipe/...|Russian Imperial ...|    132|   14.2|  1.124|  1.024|13.24| 69.09| 40.0|   23.66|      90|      1.075|      80.1|          1.5|Specific Gravity|   All Grai