In [12]:
import json
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf, from_json, split, col, regexp_replace, avg, trim
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

In [2]:
spark = SparkSession \
        .builder \
        .appName("Wines extract") \
        .enableHiveSupport() \
        .getOrCreate()

### Reading in a CSV format file

In [3]:
df = spark.read.format("kafka")\
    .option("kafka.bootstrap.servers", "kafka:29092")\
    .option("subscribe","wines")\
    .option("startingOffsets", "earliest")\
    .option("endingOffsets", "latest")\
    .load()

Using cache() method, Spark provides an optimization mechanism to store the intermediate computation of an RDD, DataFrame, and Dataset so they can be reused in subsequent actions(reusing the RDD, Dataframe, and Dataset computation result’s).

This therefore curbs the following warning from showing up when performing each computation: WARN CachedKafkaConsumer: CachedKafkaConsumer is not running in UninterruptibleThread. It may hang when CachedKafkaConsumer's methods are interrupted because of KAFKA-1894.

In [4]:
df.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

In [5]:
df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [6]:
df_value_as_strings=df.selectExpr("CAST(value AS STRING)")

In [7]:
df_value_as_strings.show(10)

+--------------------+
|               value|
+--------------------+
|7.4,0.7,0,1.9,0.0...|
|7.8,0.88,0,2.6,0....|
|7.8,0.76,0.04,2.3...|
|11.2,0.28,0.56,1....|
|7.4,0.7,0,1.9,0.0...|
|7.4,0.66,0,1.8,0....|
|7.9,0.6,0.06,1.6,...|
|7.3,0.65,0,1.2,0....|
|7.8,0.58,0.02,2,0...|
|7.5,0.5,0.36,6.1,...|
+--------------------+
only showing top 10 rows



In order to not see a truncated view of the column values, you can use a "false" argument as the second argument in the show() method. The argument conveys trucate=false. 

In [8]:
df_value_as_strings.show(10, False)

+-----------------------------------------------------+
|value                                                |
+-----------------------------------------------------+
    |0.7,0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5
    |0.88,0,2.6,0.098,25,67,0.9968,3.2,0.68,9.8,5
 |.8,0.76,0.04,2.3,0.092,15,54,0.997,3.26,0.65,9.8,5
|11.2,0.28,0.56,1.9,0.075,17,60,0.998,3.16,0.58,9.8,6
    |0.7,0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5
   |,0.66,0,1.8,0.075,13,40,0.9978,3.51,0.56,9.4,5
  |9,0.6,0.06,1.6,0.069,15,59,0.9964,3.3,0.46,9.4,5
    |0.65,0,1.2,0.065,15,21,0.9946,3.39,0.47,10,7
   |,0.58,0.02,2,0.073,9,18,0.9968,3.36,0.57,9.5,7
|7.5,0.5,0.36,6.1,0.071,17,102,0.9978,3.35,0.8,10.5,5
+-----------------------------------------------------+
only showing top 10 rows



We can see some white space issues with the last column that is causing the output format to be out of expected order. We can use the *trim* function for this while reading in the last column. 

In [13]:
wines_df = df_value_as_strings.withColumn("fixed_acidity",split(col("value"),",").getItem(0))\
                              .withColumn("volatile_acidity",split(col("value"),",").getItem(1))\
                              .withColumn("citric_acid",split(col("value"),",").getItem(2))\
                              .withColumn("residual+sugar",split(col("value"),",").getItem(3))\
                              .withColumn("chlorides",split(col("value"),",").getItem(4))\
                              .withColumn("free_sulfur_dioxide",split(col("value"),",").getItem(5))\
                              .withColumn("total_sulfur_dioxide",split(col("value"),",").getItem(6))\
                              .withColumn("density",split(col("value"),",").getItem(7))\
                              .withColumn("pH",split(col("value"),",").getItem(8))\
                              .withColumn("sulphates",split(col("value"),",").getItem(9))\
                              .withColumn("alcohol",split(col("value"),",").getItem(10))\
                              .withColumn("quality",trim(split(col("value"),",").getItem(11)))

In [14]:
wines_df.show(10)

+--------------------+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|               value|fixed_acidity|volatile_acidity|citric_acid|residual+sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality|
+--------------------+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-------+
|7.4,0.7,0,1.9,0.0...|          7.4|             0.7|          0|           1.9|    0.076|                 11|                  34| 0.9978|3.51|     0.56|    9.4|     5
|7.8,0.88,0,2.6,0....|          7.8|            0.88|          0|           2.6|    0.098|                 25|                  67| 0.9968| 3.2|     0.68|    9.8|     5
|7.8,0.76,0.04,2.3...|          7.8|            0.76|       0.04|           2.3|    0.092|                 15|                  54|  0.997|3.26|     

### Transformations and Formatting

The trailing space after the last value has still not gone. To look at what the character is, we can use the *head* function to show us the top row only in a different format than *show*.

In [15]:
wines_df.head()

Row(value='7.4,0.7,0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5\r', fixed_acidity='7.4', volatile_acidity='0.7', citric_acid='0', residual+sugar='1.9', chlorides='0.076', free_sulfur_dioxide='11', total_sulfur_dioxide='34', density='0.9978', pH='3.51', sulphates='0.56', alcohol='9.4', quality='5\r')

We can see a '\r' which indicates an escape character. This is because I did not split the lines in the csv file as I put through the kafka topic. I will go and repeat the step of ingesting data into kafka. 

In [16]:
wines_df = wines_df.withColumn("quality_formatted",regexp_replace(col("quality"), "\r", ""))

In [17]:
wines_df.head()

Row(value='7.4,0.7,0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5\r', fixed_acidity='7.4', volatile_acidity='0.7', citric_acid='0', residual+sugar='1.9', chlorides='0.076', free_sulfur_dioxide='11', total_sulfur_dioxide='34', density='0.9978', pH='3.51', sulphates='0.56', alcohol='9.4', quality='5\r', quality_formatted='5')

We don't need the 'value' or 'quality' columns anymore so we will *drop* them.

In [18]:
wines_df.drop("value","quality")

DataFrame[fixed_acidity: string, volatile_acidity: string, citric_acid: string, residual+sugar: string, chlorides: string, free_sulfur_dioxide: string, total_sulfur_dioxide: string, density: string, pH: string, sulphates: string, alcohol: string, quality_formatted: string]

In [19]:
wines_df.head()

Row(value='7.4,0.7,0,1.9,0.076,11,34,0.9978,3.51,0.56,9.4,5\r', fixed_acidity='7.4', volatile_acidity='0.7', citric_acid='0', residual+sugar='1.9', chlorides='0.076', free_sulfur_dioxide='11', total_sulfur_dioxide='34', density='0.9978', pH='3.51', sulphates='0.56', alcohol='9.4', quality='5\r', quality_formatted='5')

Notice, the value still remained. This is because spark Dataframes, as well as datasets and RDDs (resilient distributed datasets), are considered immutable storage. Immutability is defined as unchangeable.

What you can do instead is to assign that to a new Dataframe like below:

In [20]:
wines_df_revised = wines_df.drop("value","quality")

In [21]:
wines_df_revised.first()

Row(fixed_acidity='7.4', volatile_acidity='0.7', citric_acid='0', residual+sugar='1.9', chlorides='0.076', free_sulfur_dioxide='11', total_sulfur_dioxide='34', density='0.9978', pH='3.51', sulphates='0.56', alcohol='9.4', quality_formatted='5')

We used *head* and *first* which show the top row of the Dataframe. We could also do a *show(1)* to get the same result (in a slightly different output format).

In [22]:
wines_df_revised.show(1)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----------------+
|fixed_acidity|volatile_acidity|citric_acid|residual+sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality_formatted|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----------------+
|          7.4|             0.7|          0|           1.9|    0.076|                 11|                  34| 0.9978|3.51|     0.56|    9.4|                5|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----------------+
only showing top 1 row



The *select* function let's you select certain columns, much the same way that sql did!

In [23]:
wines_df_revised.select("density", "quality_formatted")

DataFrame[density: string, quality_formatted: string]

Spark does Lazy Evaluation so the transformation above, with select, is just maintained as a record of which operation is being called (through DAG). If we did a *show()* (or a *collect()*) function after this transformation, spark would then evaluate the operation to give a result. 

In [24]:
wines_df_revised.select("density", "quality_formatted").show()

+-------+-----------------+
|density|quality_formatted|
+-------+-----------------+
| 0.9978|                5|
| 0.9968|                5|
|  0.997|                5|
|  0.998|                6|
| 0.9978|                5|
| 0.9978|                5|
| 0.9964|                5|
| 0.9946|                7|
| 0.9968|                7|
| 0.9978|                5|
| 0.9959|                5|
| 0.9978|                5|
| 0.9943|                5|
| 0.9974|                5|
| 0.9986|                5|
| 0.9986|                5|
| 0.9969|                7|
| 0.9968|                5|
| 0.9974|                4|
| 0.9969|                6|
+-------+-----------------+
only showing top 20 rows



### Spark SQL

To be able to use spark sql, we need to first register our DataFrame as a Temp Table. Then, in all the spark sql commands, this temp table will be referred to for the 'from'.

In [25]:
wines_df_revised.registerTempTable('wines')

Going forward, we will see the spark sql counterpart for all the functions we perform.

In [26]:
#select
spark.sql("select density, quality_formatted from wines").show()

+-------+-----------------+
|density|quality_formatted|
+-------+-----------------+
| 0.9978|                5|
| 0.9968|                5|
|  0.997|                5|
|  0.998|                6|
| 0.9978|                5|
| 0.9978|                5|
| 0.9964|                5|
| 0.9946|                7|
| 0.9968|                7|
| 0.9978|                5|
| 0.9959|                5|
| 0.9978|                5|
| 0.9943|                5|
| 0.9974|                5|
| 0.9986|                5|
| 0.9986|                5|
| 0.9969|                7|
| 0.9968|                5|
| 0.9974|                4|
| 0.9969|                6|
+-------+-----------------+
only showing top 20 rows



To see the unique values of a column, we can use the *distinct()* function after the *select()*.

In [27]:
wines_df_revised.select("quality_formatted").distinct().show()

+-----------------+
|quality_formatted|
+-----------------+
|                7|
|                3|
|                8|
|                5|
|                6|
|                4|
+-----------------+



In [28]:
#select distinct
spark.sql("select distinct quality_formatted from wines").show()

+-----------------+
|quality_formatted|
+-----------------+
|                7|
|                3|
|                8|
|                5|
|                6|
|                4|
+-----------------+



We saw how to select to show only some columns, to see only some rows, we can use *filter()*

In [29]:
wines_df_revised.filter(wines_df_revised.quality_formatted == "7").show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----------------+
|fixed_acidity|volatile_acidity|citric_acid|residual+sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality_formatted|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----------------+
|          7.3|            0.65|          0|           1.2|    0.065|                 15|                  21| 0.9946|3.39|     0.47|     10|                7|
|          7.8|            0.58|       0.02|             2|    0.073|                  9|                  18| 0.9968|3.36|     0.57|    9.5|                7|
|          8.5|            0.28|       0.56|           1.8|    0.092|                 35|                 103| 0.9969| 3.3|     0.75|   10.5|                7|
|          8.1|            0.38|       0

In spark sql, this is achieved with the 'where'

In [30]:
spark.sql("select * from wines where quality_formatted='7'").show(5)

+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----------------+
|fixed_acidity|volatile_acidity|citric_acid|residual+sugar|chlorides|free_sulfur_dioxide|total_sulfur_dioxide|density|  pH|sulphates|alcohol|quality_formatted|
+-------------+----------------+-----------+--------------+---------+-------------------+--------------------+-------+----+---------+-------+-----------------+
|          7.3|            0.65|          0|           1.2|    0.065|                 15|                  21| 0.9946|3.39|     0.47|     10|                7|
|          7.8|            0.58|       0.02|             2|    0.073|                  9|                  18| 0.9968|3.36|     0.57|    9.5|                7|
|          8.5|            0.28|       0.56|           1.8|    0.092|                 35|                 103| 0.9969| 3.3|     0.75|   10.5|                7|
|          8.1|            0.38|       0

### Aggregate Functions

In [31]:
wines_df_revised.agg(avg(col("density"))).show()

+------------------+
|      avg(density)|
+------------------+
|0.9967466791744831|
+------------------+



In [32]:
spark.sql("select avg(density) from wines").show()

+----------------------------+
|avg(CAST(density AS DOUBLE))|
+----------------------------+
|          0.9967466791744831|
+----------------------------+



Another way of writing an aggregate function:

In [33]:
wines_df_revised.agg({"density": "avg"}).show()

+------------------+
|      avg(density)|
+------------------+
|0.9967466791744831|
+------------------+



To be able to see aggregates per levels, we can use a group by, much like in sql. 

In [34]:
wines_df_revised.groupBy("quality_formatted").agg(avg(col("density"))).show()

+-----------------+------------------+
|quality_formatted|      avg(density)|
+-----------------+------------------+
|                7|0.9961042713567828|
|                3|0.9974640000000001|
|                8|0.9952122222222223|
|                5|0.9971036270190888|
|                6|0.9966150626959255|
|                4|0.9965424528301886|
+-----------------+------------------+



In [35]:
spark.sql("select quality_formatted, avg(density) from wines group by quality_formatted").show()

+-----------------+----------------------------+
|quality_formatted|avg(CAST(density AS DOUBLE))|
+-----------------+----------------------------+
|                7|          0.9961042713567828|
|                3|          0.9974640000000001|
|                8|          0.9952122222222223|
|                5|          0.9971036270190888|
|                6|          0.9966150626959255|
|                4|          0.9965424528301886|
+-----------------+----------------------------+



We can also *count* and see what the distribution of our dataset is by quality.

In [36]:
wines_df_revised.groupBy("quality_formatted").count().show()

+-----------------+-----+
|quality_formatted|count|
+-----------------+-----+
|                7|  199|
|                3|   10|
|                8|   18|
|                5|  681|
|                6|  638|
|                4|   53|
+-----------------+-----+



In [37]:
spark.sql("select quality_formatted, count(1) as count_per_quality from wines group by quality_formatted").show()

+-----------------+-----------------+
|quality_formatted|count_per_quality|
+-----------------+-----------------+
|                7|              199|
|                3|               10|
|                8|               18|
|                5|              681|
|                6|              638|
|                4|               53|
+-----------------+-----------------+



You can *orderBy* again very similarly to how you did in sql.

In [38]:
wines_df_revised.groupBy("quality_formatted").count().orderBy("count").show()

+-----------------+-----+
|quality_formatted|count|
+-----------------+-----+
|                3|   10|
|                8|   18|
|                4|   53|
|                7|  199|
|                6|  638|
|                5|  681|
+-----------------+-----+



In [39]:
spark.sql("select quality_formatted, count(1) as count_per_quality from wines group by quality_formatted order by count_per_quality").show()

+-----------------+-----------------+
|quality_formatted|count_per_quality|
+-----------------+-----------------+
|                3|               10|
|                8|               18|
|                4|               53|
|                7|              199|
|                6|              638|
|                5|              681|
+-----------------+-----------------+



All of the results of these functions can be saved in a separate DataFrame if you need to move forward with that transformation/aggregation. 

### Reading in a JSON format file

In [40]:
brazillian_wines = spark.read.format("kafka").option("kafka.bootstrap.servers", "kafka:29092").option("subscribe","brazillian-wines").option("startingOffsets", "earliest").option("endingOffsets", "latest").load()

In [41]:
brazillian_wines.cache()

DataFrame[key: binary, value: binary, topic: string, partition: int, offset: bigint, timestamp: timestamp, timestampType: int]

We will again cast the value as string.

In [42]:
brazillian_wines_df=brazillian_wines.selectExpr("CAST(value AS STRING)")

In [43]:
brazillian_wines_df.head()

Row(value='{"id":4801829,"name":"Luiz Valduga Corte 1","seo_name":"luiz-valduga-corte-1-vale-dos-vinhedos","type_id":1,"vintage_type":0,"is_natural":false,"region":{"id":931,"name":"Vale dos Vinhedos","name_en":"","seo_name":"vale-dos-vinhedos","country":{"code":"br","name":"Brasil","native_name":"Brasil","seo_name":"brazil","currency":{"code":"BRL","name":"Brazil Reais","prefix":"R$","suffix":null},"regions_count":24,"users_count":3642678,"wines_count":13586,"wineries_count":1377,"most_used_grapes":[{"id":2,"name":"Cabernet Sauvignon","seo_name":"cabernet-sauvignon","has_detailed_info":true,"wines_count":727728},{"id":10,"name":"Merlot","seo_name":"merlot","has_detailed_info":true,"wines_count":512171},{"id":5,"name":"Chardonnay","seo_name":"chardonnay","has_detailed_info":true,"wines_count":543051}]},"background_image":{"location":"//images.vivino.com/regions/backgrounds/XrRrK-skTsOz6SDm_2tKeA.jpg","variations":{"large":"//thumbs.vivino.com/region_backgrounds/XrRrK-skTsOz6SDm_2tKeA_1

To unwrap this JSON, we can use a *map* function and run each row of the DataFrame with the function we pass to the *map()* function. 

*flatMap()* is also a function with similar capabilities. The difference between *map()* and *flatMap()* is that *flatmap* allows returning 0, 1 or more elements from map function. So a *flatMap* can be used to flatten array elements.

In [44]:
def extract_wine_details_from_json(row):
    wines = json.loads(row.value)
    wines_details = {"id": wines["id"],
                     "name": wines["name"],
                     "seo_name": wines["seo_name"],
                     "is_natural": wines["is_natural"],
                     "winery_name": wines["winery"]["name"],
                     "ratings_average": wines["statistics"]["ratings_average"],
                     "ratinge_count": wines["statistics"]["ratings_count"]}
    return Row(**wines_details)

Can have an for loop to go through all flavor groups. 

In [45]:
brazillian_wines_details_df = brazillian_wines_df.rdd.map(extract_wine_details_from_json).toDF()

In [46]:
brazillian_wines_details_df.show(5)

+-------+----------+--------------------+-------------+---------------+--------------------+---------------+
|     id|is_natural|                name|ratinge_count|ratings_average|            seo_name|    winery_name|
+-------+----------+--------------------+-------------+---------------+--------------------+---------------+
|4801829|     false|Luiz Valduga Corte 1|         1180|            4.6|luiz-valduga-cort...|   Casa Valduga|
|1964017|     false|        Gran Reserva|          308|            4.6|        gran-reserva|      Milantino|
|1638525|     false|Seival Estate Ses...|         1291|            4.4|seival-estate-ses...|          Miolo|
|2571920|     false|Merlot Uvas Desid...|          693|            4.3|uvas-desidratadas...|   Luiz Argenta|
|4158362|     false|                Brut|          207|            3.9|                brut|Vinedos Capoani|
+-------+----------+--------------------+-------------+---------------+--------------------+---------------+
only showing top 5 

In [89]:
def extract_flavor_details_for_wines(row):
    wines = json.loads(row.value)
    flavors_list = []
    if("taste" in wines.keys()):
        if("flavor" in wines["taste"].keys()):
            flavors = wines["taste"]["flavor"]
    
    for flavor in flavors:
        flavor_row = {"id": wines["id"],
                      "wine_name": wines["name"],
                      "flavor_group": flavor["group"]}
        flavors_list.append(Row(**flavor_row))
        
    return flavors_list

In [90]:
wine_flavors_df = brazillian_wines_df.rdd.flatMap(extract_flavor_details_for_wines).toDF()

In [91]:
wine_flavors_df.show(10, False)

+------------+-------+-----------------------+
|flavor_group|id     |wine_name              |
+------------+-------+-----------------------+
|oak         |4801829|Luiz Valduga Corte 1   |
|black_fruit |4801829|Luiz Valduga Corte 1   |
|red_fruit   |4801829|Luiz Valduga Corte 1   |
|earth       |4801829|Luiz Valduga Corte 1   |
|non_oak     |4801829|Luiz Valduga Corte 1   |
|spices      |4801829|Luiz Valduga Corte 1   |
|black_fruit |1964017|Gran Reserva           |
|oak         |1964017|Gran Reserva           |
|black_fruit |1638525|Seival Estate Sesmarias|
|oak         |1638525|Seival Estate Sesmarias|
+------------+-------+-----------------------+
only showing top 10 rows



You can *join()* two dataframes, also similar to sql. You can also register multiple temp tables and use spark sql for your joins: https://spark.apache.org/docs/3.1.1/api/python/reference/api/pyspark.sql.DataFrame.join.html