In [1]:
import os
from pyspark.sql import SparkSession

In [2]:
spark = SparkSession \
    .builder \
    .appName("PySpark App") \
    .config("spark.jars", "/data/postgresql.jar") \
    .getOrCreate()

## Connect to Postgres & Read

In [3]:
postgres_uri = "jdbc:postgresql://database.cargnt6y2u5f.eu-west-2.rds.amazonaws.com/winedb"
user = "postgres"
password = "postgres"
dbtableWine = "wine_database.wine"
dbtableGrape = "wine_database.grapes"
dbtableWinery = "wine_database.winery"
dbtablePrice = "wine_database.price"

In [4]:
db_wine = spark.read \
    .format("jdbc") \
    .option("url", postgres_uri) \
    .option("dbtable", dbtableWine) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .load()

db_wine.printSchema()
db_wine.show()

root
 |-- wine_id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- winery_id: string (nullable = true)
 |-- vintage: integer (nullable = true)
 |-- bottle_size: integer (nullable = true)
 |-- grapes: string (nullable = true)
 |-- rating: double (nullable = true)

+---------+--------------------+---------+-------+-----------+--------+------+
|  wine_id|                name|winery_id|vintage|bottle_size|  grapes|rating|
+---------+--------------------+---------+-------+-----------+--------+------+
|  1749808|Cristal Rosé Brut...|     ZMVL|   2012|        750|P737B5Y6|   4.6|
|  2442059|Cristal Brut Cham...|     ZMVL|   2013|        750|P737B5Y6|   4.6|
|  4051453|Cristal Brut Cham...|     ZMVL|   2014|        750|P737B5Y6|   4.6|
|163946037|  Puligny-Montrachet|     A7AV|   2020|        750|P737B5Y6|   4.2|
|156134190|Brut Champagne (V...|     ZMVL|   2013|        750|P737B5Y6|   4.3|
| 21161696|Rosé Brut Champag...|     ZMVL|   2015|        750|P737B5Y6|   4.3|
|14498

## Populating

In [15]:
vivino = spark.read.parquet("/data/src/parquet/vivino_new.parquet", encoding="UTF-8")
vivino.show(5)

+---------+--------------------+---------+--------------------+-------+-----------+-----------------+--------+--------------+---------------+--------------+------------------+--------------------+------+--------+
|       id|              winery|winery_id|                wine|vintage|bottle_size|            price|discount|general_grapes|specific_grapes|general_region|       wine_region|        type_of_wine|rating|grape_id|
+---------+--------------------+---------+--------------------+-------+-----------+-----------------+--------+--------------+---------------+--------------+------------------+--------------------+------+--------+
|  1749808|Champagne Roedere...|     ZMVL|Cristal Rosé Brut...|   2012|        750|585.6632691195389|     0.0|          null|     Chardonnay|        French|         Champagne|           Champagne|   4.6|P737B5Y6|
|  2442059|Champagne Roedere...|     ZMVL|Cristal Brut Cham...|   2013|        750| 645.520637306105|     0.0|          null|     Chardonnay|       

In [16]:
grape = spark.read.parquet("/data/src/parquet/grapes_new.parquet", encoding="UTF-8")
grape.show()

+--------+----------+--------------------+--------------------+--------------------+-----------+---------------+------+------+
|grape_id|     grape|     primary_flavors|       taste_profile|        food_pairing|temperature|     glass_type|decant|cellar|
+--------+----------+--------------------+--------------------+--------------------+-----------+---------------+------+------+
|ZMVLA7AV|  Grenache|[Stewed Strawberr...|[Dry, Medium-full...|[The high intensi...|    15-20°C|      Universal|    30|  10+ |
|3IOHIL20|     Syrah|[Blueberry, Black...|[Dry, Full Body, ...|[Darker meats and...|    15-20°C|      Universal|    60|  10+ |
|U1ITKH3V|  Viognier|[Tangerine, Peach...|[Dry, Medium Body...|[A wine best pair...|     7-12°C|          White|     0|  3–5 |
|DJP2MOLA|  Cabernet|[Strawberry, Rasp...|[Dry, Medium Body...|[Higher acidity m...|    15-20°C|      Universal|    30| 5–10 |
|P737B5Y6|Chardonnay|[Yellow Apple, St...|[Dry, Medium Body...|[Chardonnay wine ...|     7-12°C|Aroma Collector

In [17]:
winery = spark.read.parquet("/data/src/parquet/winery_new.parquet", encoding = "UTF-8")
winery.show(3)

+----+-------------------+-------------------+--------------------+--------+-------+---------------+--------------------+--------------------+
|  id|        winery_name|             winery|             address|    city|country|year_foundation|    corporation_type|            activity|
+----+-------------------+-------------------+--------------------+--------+-------+---------------+--------------------+--------------------+
|3IOH|           Le Delas|           LE DELAS|1 Avenue De Norma...|   Cedex| France|           1969|Siège social – Ma...|           Grossiste|
|OB14|      Château Tayac|      CHÂTEAU TAYAC|              Tayac,|Soussans| France|           1930|Siège social – Ma...|Fabricant/ Produc...|
|A7AV|Maison Louis Latour|MAISON LOUIS LATOUR|18 Rue Des Tonnel...|  Beaune| France|           1945|                 NaN|Fabricant/ Produc...|
+----+-------------------+-------------------+--------------------+--------+-------+---------------+--------------------+--------------------+

## Write

### table: winery

In [18]:
df_winery = winery.selectExpr("id as winery_id","winery_name as winery_name","winery as company_name","address as address","city as city","country as country", "year_foundation as year_of_foundation")
df_winery.show(3)

+---------+-------------------+-------------------+--------------------+--------+-------+------------------+
|winery_id|        winery_name|       company_name|             address|    city|country|year_of_foundation|
+---------+-------------------+-------------------+--------------------+--------+-------+------------------+
|     3IOH|           Le Delas|           LE DELAS|1 Avenue De Norma...|   Cedex| France|              1969|
|     OB14|      Château Tayac|      CHÂTEAU TAYAC|              Tayac,|Soussans| France|              1930|
|     A7AV|Maison Louis Latour|MAISON LOUIS LATOUR|18 Rue Des Tonnel...|  Beaune| France|              1945|
+---------+-------------------+-------------------+--------------------+--------+-------+------------------+
only showing top 3 rows



In [19]:
df_winery.select("winery_id","winery_name","company_name","address","city","country","year_of_foundation").write.format("jdbc") \
    .option("url", postgres_uri) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", 'wine_database.winery') \
    .option("user", user) \
    .mode("append") \
    .option("password", password).save()

In [20]:
winery = spark.read \
    .format("jdbc") \
    .option("url", postgres_uri) \
    .option("dbtable", "wine_database.winery") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .load()

winery.show(3)

+---------+-------------------+-------------------+--------------------+--------+-------+------------------+
|winery_id|        winery_name|       company_name|             address|    city|country|year_of_foundation|
+---------+-------------------+-------------------+--------------------+--------+-------+------------------+
|     3IOH|           Le Delas|           LE DELAS|1 Avenue De Norma...|   Cedex| France|              1969|
|     OB14|      Château Tayac|      CHÂTEAU TAYAC|              Tayac,|Soussans| France|              1930|
|     A7AV|Maison Louis Latour|MAISON LOUIS LATOUR|18 Rue Des Tonnel...|  Beaune| France|              1945|
+---------+-------------------+-------------------+--------------------+--------+-------+------------------+
only showing top 3 rows



### table: grapes

In [21]:
grape.select("grape_id","grape","primary_flavors","taste_profile","food_pairing","temperature","glass_type","decant","cellar").write.format("jdbc") \
    .option("url", postgres_uri) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", 'wine_database.grapes') \
    .option("user", user) \
    .mode("append") \
    .option("password", password).save()

In [22]:
grapes = spark.read \
    .format("jdbc") \
    .option("url", postgres_uri) \
    .option("dbtable", "wine_database.grapes") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .load()

grapes.show(3)

+--------+--------+--------------------+--------------------+--------------------+-----------+----------+------+------+
|grape_id|   grape|     primary_flavors|       taste_profile|        food_pairing|temperature|glass_type|decant|cellar|
+--------+--------+--------------------+--------------------+--------------------+-----------+----------+------+------+
|ZMVLA7AV|Grenache|{"Stewed Strawber...|{Dry,"Medium-full...|{"The high intens...|    15-20°C| Universal|    30|  10+ |
|3IOHIL20|   Syrah|{Blueberry,"Black...|{Dry,"Full Body",...|{"Darker meats an...|    15-20°C| Universal|    60|  10+ |
|U1ITKH3V|Viognier|{Tangerine,Peach,...|{Dry,"Medium Body...|{"A wine best pai...|     7-12°C|     White|     0|  3–5 |
+--------+--------+--------------------+--------------------+--------------------+-----------+----------+------+------+
only showing top 3 rows



### table: wine

In [23]:
df_wine = vivino.selectExpr("id as wine_id","wine as name","winery_id as winery_id","vintage as vintage","bottle_size as bottle_size","grape_id as grapes", "rating as rating")
df_wine.show(3)

+-------+--------------------+---------+-------+-----------+--------+------+
|wine_id|                name|winery_id|vintage|bottle_size|  grapes|rating|
+-------+--------------------+---------+-------+-----------+--------+------+
|1749808|Cristal Rosé Brut...|     ZMVL|   2012|        750|P737B5Y6|   4.6|
|2442059|Cristal Brut Cham...|     ZMVL|   2013|        750|P737B5Y6|   4.6|
|4051453|Cristal Brut Cham...|     ZMVL|   2014|        750|P737B5Y6|   4.6|
+-------+--------------------+---------+-------+-----------+--------+------+
only showing top 3 rows



In [24]:
df_wine.select("wine_id","name","winery_id","vintage","bottle_size","grapes","rating").write.format("jdbc") \
    .option("url", postgres_uri) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", 'wine_database.wine') \
    .option("user", user) \
    .mode("append") \
    .option("password", password).save()

In [25]:
wine = spark.read \
    .format("jdbc") \
    .option("url", postgres_uri) \
    .option("dbtable", "wine_database.wine") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .load()

wine.show(3)

+-------+--------------------+---------+-------+-----------+--------+------+
|wine_id|                name|winery_id|vintage|bottle_size|  grapes|rating|
+-------+--------------------+---------+-------+-----------+--------+------+
|1749808|Cristal Rosé Brut...|     ZMVL|   2012|        750|P737B5Y6|   4.6|
|2442059|Cristal Brut Cham...|     ZMVL|   2013|        750|P737B5Y6|   4.6|
|4051453|Cristal Brut Cham...|     ZMVL|   2014|        750|P737B5Y6|   4.6|
+-------+--------------------+---------+-------+-----------+--------+------+
only showing top 3 rows



### table: price

In [26]:
df_price = vivino.selectExpr("id as wine_id","wine as name","price as price","discount as discount")
df_price.show(3)

+-------+--------------------+-----------------+--------+
|wine_id|                name|            price|discount|
+-------+--------------------+-----------------+--------+
|1749808|Cristal Rosé Brut...|585.6632691195389|     0.0|
|2442059|Cristal Brut Cham...| 645.520637306105|     0.0|
|4051453|Cristal Brut Cham...|268.7713198965419|     0.0|
+-------+--------------------+-----------------+--------+
only showing top 3 rows



In [27]:
df_price.select("wine_id","name","price","discount").write.format("jdbc") \
    .option("url", postgres_uri) \
    .option("driver", "org.postgresql.Driver") \
    .option("dbtable", 'wine_database.price') \
    .option("user", user) \
    .mode("append") \
    .option("password", password).save()

In [28]:
price = spark.read \
    .format("jdbc") \
    .option("url", postgres_uri) \
    .option("dbtable", "wine_database.price") \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "org.postgresql.Driver") \
    .load()

price.show(3)

+-------+--------------------+-----------------+--------+
|wine_id|                name|            price|discount|
+-------+--------------------+-----------------+--------+
|1749808|Cristal Rosé Brut...|585.6632691195389|     0.0|
|2442059|Cristal Brut Cham...| 645.520637306105|     0.0|
|4051453|Cristal Brut Cham...|268.7713198965419|     0.0|
+-------+--------------------+-----------------+--------+
only showing top 3 rows

