### Project Prerequisites
#### Hihlighted below are the prerequisites
- pyspark
- gp
- The dataset is Iowa Liquor sales data for Jan 2021 - Jan 2022. Link ---> 'https://www.kaggle.com/gabrielramos87/iowa-sales-liquor-jan-2021jan-2022?select=Iowa+Liquor+Sales+%28Jan+2021-Jan+2022%29.csv'

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf, SQLContext
from pyspark.sql import functions as f
from sqlalchemy import create_engine
from getpass import getpass as gp
import os

### Configuration to create a Spark Session

In [5]:
appName = "PySpark Session" 
master = "local[*]"
conf = SparkConf() \
    .setAppName(appName) \
    .setMaster(master) \
    .set("spark.driver.extraClassPath","C:\\spark-3.2.1\jars" #This is the path of spark jars folder
        )

In [6]:
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
spark = sqlContext.sparkSession



### Loading the csv file to a spark dataframe

In [7]:
df = spark.read \
     .option("header","true") \
     .csv('C:\PySpark_Demo\Data-Engineering-101\Iowa_liquor_sales.csv')

In [8]:
df.show()

+-----------------------+----------+------------+--------------------+--------------------+---------------+--------+--------------------+-------------+----------+---------+--------------------+-------------+--------------------+-----------+--------------------+----+----------------+-----------------+-------------------+------------+------------+------------------+-------------------+
|invoice_and_item_number|      date|store_number|          store_name|             address|           city|zip_code|      store_location|county_number|    county| category|       category_name|vendor_number|         vendor_name|item_number|    item_description|pack|bottle_volume_ml|state_bottle_cost|state_bottle_retail|bottles_sold|sale_dollars|volume_sold_liters|volume_sold_gallons|
+-----------------------+----------+------------+--------------------+--------------------+---------------+--------+--------------------+-------------+----------+---------+--------------------+-------------+-------------------

### Creating a Temporary view so I can query the table

In [9]:
df.createOrReplaceTempView("Iowa_sales")

In [10]:
df.printSchema()

root
 |-- invoice_and_item_number: string (nullable = true)
 |-- date: string (nullable = true)
 |-- store_number: string (nullable = true)
 |-- store_name: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- zip_code: string (nullable = true)
 |-- store_location: string (nullable = true)
 |-- county_number: string (nullable = true)
 |-- county: string (nullable = true)
 |-- category: string (nullable = true)
 |-- category_name: string (nullable = true)
 |-- vendor_number: string (nullable = true)
 |-- vendor_name: string (nullable = true)
 |-- item_number: string (nullable = true)
 |-- item_description: string (nullable = true)
 |-- pack: string (nullable = true)
 |-- bottle_volume_ml: string (nullable = true)
 |-- state_bottle_cost: string (nullable = true)
 |-- state_bottle_retail: string (nullable = true)
 |-- bottles_sold: string (nullable = true)
 |-- sale_dollars: string (nullable = true)
 |-- volume_sold_liters: string (nulla

###  Performing some transformation ie Converting data types

In [11]:
Iowa_table_1 = spark.sql("""
SELECT  CAST(invoice_and_item_number as STRING), 
        CAST(date as DATE), 
        CAST(store_number as INT),
        CAST(store_name as STRING), 
        CAST(address as STRING),
        CAST(city as STRING), 
        CAST(zip_code as FLOAT), 
        CAST(store_location as STRING),
        CAST(county_number as INT), 
        CAST(county as STRING), 
        CAST(category as INT),
        CAST(category_name as STRING), 
        CAST(vendor_number as NUMERIC), 
        CAST(vendor_name as STRING), 
        CAST(item_number as INT),
        CAST(item_description as STRING),
        CAST(pack as INT), 
        CAST(bottle_volume_ml as INT), 
        CAST(state_bottle_cost as FLOAT), 
        CAST(state_bottle_retail as FLOAT), 
        CAST(bottles_sold as INT), 
        CAST(sale_dollars as FLOAT), 
        CAST(volume_sold_liters as FLOAT),
        CAST(volume_sold_gallons as FLOAT)

FROM Iowa_sales
""")

### Writing the transformed table to a postgres database

In [12]:
Iowa_table_1.write.format("jdbc") \
   .mode("overwrite") \
   .option("url", "jdbc:postgresql://localhost:5432/postgres") \
   .option("dbtable", "public.iowa_table_1") \
   .option("user", user) \
   .option("password", password) \
   .save()

### Writing aggregated tables to the database

In [17]:
monthly_sales_gallons = spark.sql("""
SELECT 
    date,
    SUM(volume_sold_gallons) as monthly_sales_gallons
FROM Iowa_sales
GROUP BY date
ORDER BY date
""")

In [18]:
monthly_sales_gallons.write.format("jdbc") \
   .mode("overwrite") \
   .option("url", "jdbc:postgresql://localhost:5432/postgres") \
   .option("dbtable", "public.monthly_sales_agg") \
   .option("user", user) \
   .option("password", password) \
   .save()

In [19]:
sales_by_store = spark.sql("""
SELECT 
    store_name,
    SUM(volume_sold_gallons) as sales_by_store
FROM Iowa_sales
GROUP BY store_name
ORDER BY SUM(volume_sold_gallons) DESC
""")

In [20]:
sales_by_store.write.format("jdbc") \
   .mode("overwrite") \
   .option("url", "jdbc:postgresql://localhost:5432/postgres") \
   .option("dbtable", "public.store_sales_agg") \
   .option("user", user) \
   .option("password", password) \
   .save()

In [21]:
#Sales 
top_sales_per_day_store = spark.sql("""
SELECT invoice_and_item_number,store_name, date, sale_dollars,
        ROW_NUMBER() OVER (PARTITION BY store_name,date ORDER BY sale_dollars DESC) rank_
FROM Iowa_sales
""")

In [23]:
top_sales_per_day_store.write.format("jdbc") \
   .mode("overwrite") \
   .option("url", "jdbc:postgresql://localhost:5432/postgres") \
   .option("dbtable", "public.top_sales_per_day") \
   .option("user", user) \
   .option("password", password) \
   .save()