In [0]:
from pyspark.sql.functions import isnan, when, count, col, countDistinct, desc
from pyspark.sql.types import NumericType, IntegerType, DoubleType

# 1. Storage and Ingest

## 1.1 Ingest stored data

In [0]:
wine_df = spark.read.option("header",True).option('inferSchema', True).csv("/FileStore/tables/wines_SPA.csv")
wine_df.show()

+-------------------+--------------------+----+------+-----------+-------+----------------+----------------+--------------------+----+-------+
|             winery|                wine|year|rating|num_reviews|country|          region|           price|                type|body|acidity|
+-------------------+--------------------+----+------+-----------+-------+----------------+----------------+--------------------+----+-------+
|      Teso La Monja|               Tinto|2013|   4.9|         58| Espana|            Toro|           995.0|            Toro Red|   5|      3|
|             Artadi|       Vina El Pison|2018|   4.9|         31| Espana|  Vino de Espana|           313.5|         Tempranillo|   4|      2|
|       Vega Sicilia|               Unico|2009|   4.8|       1793| Espana|Ribera del Duero|          324.95|Ribera Del Duero Red|   5|      3|
|       Vega Sicilia|               Unico|1999|   4.8|       1705| Espana|Ribera del Duero|          692.96|Ribera Del Duero Red|   5|      3|

In [0]:
wine_df.printSchema()

root
 |-- winery: string (nullable = true)
 |-- wine: string (nullable = true)
 |-- year: string (nullable = true)
 |-- rating: double (nullable = true)
 |-- num_reviews: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- price: double (nullable = true)
 |-- type: string (nullable = true)
 |-- body: string (nullable = true)
 |-- acidity: string (nullable = true)



In [0]:
display(wine_df)

winery,wine,year,rating,num_reviews,country,region,price,type,body,acidity
Teso La Monja,Tinto,2013,4.9,58,Espana,Toro,995.0,Toro Red,5.0,3.0
Artadi,Vina El Pison,2018,4.9,31,Espana,Vino de Espana,313.5,Tempranillo,4.0,2.0
Vega Sicilia,Unico,2009,4.8,1793,Espana,Ribera del Duero,324.95,Ribera Del Duero Red,5.0,3.0
Vega Sicilia,Unico,1999,4.8,1705,Espana,Ribera del Duero,692.96,Ribera Del Duero Red,5.0,3.0
Vega Sicilia,Unico,1996,4.8,1309,Espana,Ribera del Duero,778.06,Ribera Del Duero Red,5.0,3.0
Vega Sicilia,Unico,1998,4.8,1209,Espana,Ribera del Duero,490.0,Ribera Del Duero Red,5.0,3.0
Vega Sicilia,Unico,2010,4.8,1201,Espana,Ribera del Duero,349.0,Ribera Del Duero Red,5.0,3.0
Vega Sicilia,Unico,1995,4.8,926,Espana,Ribera del Duero,810.89,Ribera Del Duero Red,5.0,3.0
Vega Sicilia,Unico Reserva Especial Edicion,2015,4.8,643,Espana,Ribera del Duero,345.0,Ribera Del Duero Red,5.0,3.0
Vega Sicilia,Unico,2011,4.8,630,Espana,Ribera del Duero,315.0,Ribera Del Duero Red,5.0,3.0


In [0]:
print((wine_df_cleaned.count(), len(wine_df_cleaned.columns)))

(7500, 11)


# 2. ETL transformations

##2.1 Rename Column

In [0]:
wine_df_cleaned = wine_df \
    .withColumnRenamed("rating", "avg_rating") \
    .withColumnRenamed("year","harvested_year")

## 2.2 Checking for the NAN and null values

In [0]:
wine_df_cleaned.select([count(when(col(c) == "NA", c)).alias(c) for c in wine_df_cleaned.columns]).show()

+------+----+--------------+----------+-----------+-------+------+-----+----+----+-------+
|winery|wine|harvested_year|avg_rating|num_reviews|country|region|price|type|body|acidity|
+------+----+--------------+----------+-----------+-------+------+-----+----+----+-------+
|     0|   0|             0|         0|          0|      0|     0|    0| 545|1169|   1169|
+------+----+--------------+----------+-----------+-------+------+-----+----+----+-------+



In [0]:
wine_df_cleaned.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in wine_df_cleaned.columns]).show()

+------+----+--------------+----------+-----------+-------+------+-----+----+----+-------+
|winery|wine|harvested_year|avg_rating|num_reviews|country|region|price|type|body|acidity|
+------+----+--------------+----------+-----------+-------+------+-----+----+----+-------+
|     0|   0|             2|         0|          0|      0|     0|    0|   0|   0|      0|
+------+----+--------------+----------+-----------+-------+------+-----+----+----+-------+



In [0]:
wine_df_cleaned = wine_df_cleaned \
.withColumn("body", when(col("body")=="NA" ,0) .otherwise(col("body"))) \
.withColumn("acidity", when(col("acidity")=="NA" ,0) .otherwise(col("acidity")))

In [0]:
wine_df_cleaned.show(5)

+-------------+-------------+--------------+----------+-----------+-------+----------------+------+--------------------+----+-------+
|       winery|         wine|harvested_year|avg_rating|num_reviews|country|          region| price|                type|body|acidity|
+-------------+-------------+--------------+----------+-----------+-------+----------------+------+--------------------+----+-------+
|Teso La Monja|        Tinto|          2013|       4.9|         58| Espana|            Toro| 995.0|            Toro Red|   5|      3|
|       Artadi|Vina El Pison|          2018|       4.9|         31| Espana|  Vino de Espana| 313.5|         Tempranillo|   4|      2|
| Vega Sicilia|        Unico|          2009|       4.8|       1793| Espana|Ribera del Duero|324.95|Ribera Del Duero Red|   5|      3|
| Vega Sicilia|        Unico|          1999|       4.8|       1705| Espana|Ribera del Duero|692.96|Ribera Del Duero Red|   5|      3|
| Vega Sicilia|        Unico|          1996|       4.8|       

In [0]:
wine_df_cleaned.printSchema()

root
 |-- winery: string (nullable = true)
 |-- wine: string (nullable = true)
 |-- harvested_year: integer (nullable = true)
 |-- avg_rating: integer (nullable = true)
 |-- num_reviews: integer (nullable = true)
 |-- country: string (nullable = true)
 |-- region: string (nullable = true)
 |-- price: double (nullable = true)
 |-- type: string (nullable = true)
 |-- body: integer (nullable = true)
 |-- acidity: integer (nullable = true)



## 2.3 Changing data types of column

In [0]:
wine_df_cleaned = wine_df_cleaned\
.withColumn("avg_rating", wine_df_cleaned["avg_rating"].cast(DoubleType()))\
.withColumn("body", wine_df_cleaned["body"].cast(IntegerType()))\
.withColumn("harvested_year", wine_df_cleaned["harvested_year"].cast(IntegerType()))\
.withColumn("acidity", wine_df_cleaned["acidity"].cast(IntegerType()))

# 3 Storing data frame to CSV

## 3.1 Storing CSV

In [0]:
wine_df_cleaned.write.format("csv") \
 .option("header", "true") \
 .save("/FileStore/tables/df_analysis.csv")

## 3.2 Retrieving stored CSV

In [0]:
df_analysis = sqlContext.read.format("csv") \
  .option("header", "true") \
  .load("/FileStore/tables/df_analysis.csv")
df_analysis.show(5)

+-------------+-------------+--------------+----------+-----------+-------+----------------+------+--------------------+----+-------+
|       winery|         wine|harvested_year|avg_rating|num_reviews|country|          region| price|                type|body|acidity|
+-------------+-------------+--------------+----------+-----------+-------+----------------+------+--------------------+----+-------+
|Teso La Monja|        Tinto|          2013|       4.9|         58| Espana|            Toro| 995.0|            Toro Red|   5|      3|
|       Artadi|Vina El Pison|          2018|       4.9|         31| Espana|  Vino de Espana| 313.5|         Tempranillo|   4|      2|
| Vega Sicilia|        Unico|          2009|       4.8|       1793| Espana|Ribera del Duero|324.95|Ribera Del Duero Red|   5|      3|
| Vega Sicilia|        Unico|          1999|       4.8|       1705| Espana|Ribera del Duero|692.96|Ribera Del Duero Red|   5|      3|
| Vega Sicilia|        Unico|          1996|       4.8|       

# 4. Query and visualization

In [0]:
print((df_analysis.count(), len(df_analysis.columns)))

(7500, 11)


## 4.1 Query_1

In [0]:
#display(df_analysis)
display(df_analysis.filter("avg_rating >= 4.5").orderBy("avg_rating").select("price","harvested_year","avg_rating"))

price,harvested_year,avg_rating
45.0,2018.0,4.5
75.0,2015.0,4.5
69.8,2015.0,4.5
37.7,2019.0,4.5
400.0,2005.0,4.5
146.95,2015.0,4.5
68.9,,4.5
70.5,2016.0,4.5
64.0,2016.0,4.5
125.0,2005.0,4.5


## 4.2 Query_2

In [0]:
#display(df_analysis.groupBy('type').mean('avg_rating'))
#display(df_analysis)
display(df_analysis.groupBy("type").agg({'avg_rating':'mean'}))

type,avg(avg_rating)
Priorat Red,4.270623145400549
Chardonnay,4.33076923076923
Rioja Red,4.232159524819702
,4.228807339449552
Tempranillo,4.263230240549854
Syrah,4.346666666666666
Toro Red,4.264189189189215
Albarino,4.214682539682559
Verdejo,4.374074074074072
Mencia,4.210638297872358


## 4.3 Query_3

In [0]:
display(df_analysis.groupBy("winery").agg({'price':'mean'}).sort(desc('avg(price)')))

winery,avg(price)
Dominio de Pingus,796.4312903225807
Descendientes de J. Palacios,789.9075
Alvaro Palacios,601.7845833333334
Vega Sicilia,601.1777319587628
Teso La Monja,528.0600000000001
Clos Erasmus,458.5381818181818
Barbadillo,380.0
Bodega Numanthia,324.0999999999999
Emilio Hidalgo,320.0
Martinez Lacuesta,280.0


## 4.4 Query_4

In [0]:
display(df_analysis.groupBy("region").agg({'price':'mean'}).sort(desc('avg(price)')))

region,avg(price)
Montilla-Moriles,178.68961538461537
Conca de Barbera,164.54333333333332
Jerez Palo Cortado,144.47545454545457
Mentrida,121.98666666666666
Ribera del Duero,109.23036351534836
Aragon,108.505
Priorato,104.08990877490074
Dominio de Valdepusa,99.87
Jumilla,80.57602513777478
Arinzano,76.98666666666666
