In [2]:
# set working directory to same place as main.py to import programs the same way as the app
import os
current_directory = os.getcwd()
if '/etl' in current_directory:
    parent_directory = os.path.abspath(os.path.join(current_directory, os.pardir))
    os.chdir(parent_directory)
os.getcwd()

'/Users/elisealstad/code/liqour-sales-spark-etl'

In [62]:
from pyspark.sql import SparkSession
from pyspark.sql.types import * 
from pyspark.sql.functions import round, col, abs, concat, lit
from pyspark.sql import functions as F


In [3]:


# Set GCP project ID and dataset details
project_id = 'bigquery-public-data'
dataset = 'iowa_liquor_sales'
table = 'sales'



spark = SparkSession.builder \
    .appName('BigQuery Iowa Liquor Sales') \
    .getOrCreate()

# # Read the BigQuery data into a DataFrame
# df = spark.read \
#     .format('bigquery') \
#     .option('project', project_id) \
#     .option('dataset', dataset) \
#     .option('table', table) \
#     .load()

# Show a few rows from the dataset
# Optionally stop the Spark session
# spark.stop()


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/09/22 17:16:21 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [4]:
file_path = 'test_data/bquxjob_3567bd65_1921a4602ae.csv'

# Read the CSV file using PySpark
df = spark.read.option("header", "true").csv(file_path)

# Show the first 10 rows
df.show(10)

+-----------------------+----------+------------+--------------------+--------------------+---------------+--------+--------------------+-------------+----------+---------+--------------------+-------------+--------------------+-----------+--------------------+----+----------------+-----------------+-------------------+------------+------------+------------------+-------------------+
|invoice_and_item_number|      date|store_number|          store_name|             address|           city|zip_code|      store_location|county_number|    county| category|       category_name|vendor_number|         vendor_name|item_number|    item_description|pack|bottle_volume_ml|state_bottle_cost|state_bottle_retail|bottles_sold|sale_dollars|volume_sold_liters|volume_sold_gallons|
+-----------------------+----------+------------+--------------------+--------------------+---------------+--------+--------------------+-------------+----------+---------+--------------------+-------------+-------------------

In [73]:
df_filtered = df.select(['store_name',
                        'category_name',
                        'item_description',
                        'bottle_volume_ml',
                        'vendor_number',                        
                        'state_bottle_cost',
                        'state_bottle_retail',
                        'sale_dollars',
                        'bottles_sold'])

In [74]:
# change dtypes 

dtypes= {
        'bottle_volume_ml' : 'double',
        'vendor_number' : 'int',
        'state_bottle_retail' : 'double',
        'state_bottle_cost' : 'double',
        'sale_dollars' : 'double',
        'bottles_sold' : 'int',
        'bottle_volume_ml' : 'int'
}

# Loop through the dictionary to cast each column
for col_name, new_dtype in dtypes.items():
    df_filtered = df_filtered.withColumn(col_name, col(col_name).cast(new_dtype))
    
df_filtered.printSchema()

root
 |-- store_name: string (nullable = true)
 |-- category_name: string (nullable = true)
 |-- item_description: string (nullable = true)
 |-- bottle_volume_ml: integer (nullable = true)
 |-- vendor_number: integer (nullable = true)
 |-- state_bottle_cost: double (nullable = true)
 |-- state_bottle_retail: double (nullable = true)
 |-- sale_dollars: double (nullable = true)
 |-- bottles_sold: integer (nullable = true)



In [75]:
# create columns
df_filtered = df_filtered.withColumn('sale_dollars', abs(col('sale_dollars'))) \
                         .withColumn('bottles_sold', abs(col('bottles_sold')))

df_filtered = df_filtered.withColumn('cost_dollars',round(col('state_bottle_cost') * col('bottles_sold'), 2))
df_filtered = df_filtered.withColumn('revenue_dollars', round(col('sale_dollars') - col('cost_dollars'), 2))

df_filtered = df_filtered.withColumn('item_description', concat(col('item_description'), lit(' '), col('bottle_volume_ml'), lit('ml')))

In [76]:
df_filtered.show()

+--------------------+--------------------+--------------------+----------------+-------------+-----------------+-------------------+------------+------------+------------+---------------+
|          store_name|       category_name|    item_description|bottle_volume_ml|vendor_number|state_bottle_cost|state_bottle_retail|sale_dollars|bottles_sold|cost_dollars|revenue_dollars|
+--------------------+--------------------+--------------------+----------------+-------------+-----------------+-------------------+------------+------------+------------+---------------+
|NEIGHBORHOOD FOOD...|     AMERICAN VODKAS|HAWKEYE VODKA 1750ml|            1750|          434|              7.5|              11.25|       405.0|          36|       270.0|          135.0|
|     H & A MINI MART|     IMPORTED VODKAS|ABSOLUT SWEDISH V...|             750|          370|             9.99|              14.99|       899.4|          60|       599.4|          300.0|
|ANOTHER ROUND / D...|     IMPORTED VODKAS|   GREY GOOS

In [78]:
df_grouped = df_filtered.groupBy(
    'store_name', 'item_description', 'vendor_number', 'state_bottle_cost', 'state_bottle_retail', 'category_name'
).agg(
    F.round(F.sum('sale_dollars'), 2).alias('total_sale_dollars'),
    F.sum('bottles_sold').alias('total_bottles_sold'),  # No need to round bottles_sold if it's an integer
    F.round(F.sum('cost_dollars'), 2).alias('total_cost_dollars'),
    F.round(F.sum('revenue_dollars'), 2).alias('total_revenue_dollars')
).orderBy(F.col('total_revenue_dollars').desc())

df_grouped.show()


+--------------------+--------------------+-------------+-----------------+-------------------+--------------------+------------------+------------------+------------------+---------------------+
|          store_name|    item_description|vendor_number|state_bottle_cost|state_bottle_retail|       category_name|total_sale_dollars|total_bottles_sold|total_cost_dollars|total_revenue_dollars|
+--------------------+--------------------+-------------+-----------------+-------------------+--------------------+------------------+------------------+------------------+---------------------+
|COSTCO WHOLESALE ...|MACALLAN DOUBLE C...|          266|            42.52|              63.78|  SINGLE MALT SCOTCH|           1530.72|                24|           1020.48|               510.24|
|     H & A MINI MART|ABSOLUT SWEDISH V...|          370|             9.99|              14.99|     IMPORTED VODKAS|             899.4|                60|             599.4|                300.0|
|ANOTHER ROUND / D..