In [1]:
import pyspark
import os
import json
import argparse

from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType
from pyspark.sql.functions import col, sum as _sum, to_date, sum, format_number


In [2]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [3]:
postgres_host = os.getenv('POSTGRES_CONTAINER_NAME')
postgres_dw_db = os.getenv('POSTGRES_DW_DB')
postgres_user = os.getenv('POSTGRES_USER')
postgres_password = os.getenv('POSTGRES_PASSWORD')

In [4]:
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Assignment_Day15')
        .setMaster('local')
        .set("spark.jars", "/opt/postgresql-42.2.18.jar")
    ))
sparkcontext.setLogLevel("WARN")
spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [5]:
spark

In [6]:
jdbc_url = f'jdbc:postgresql://{postgres_host}/{postgres_dw_db}'
table_name = 'public.retail'
prop = {
    'user': postgres_user,
    'password': postgres_password,
    'driver': 'org.postgresql.Driver',
    'stringtype': 'unspecified'
}
retail_df = spark.read.jdbc(
    jdbc_url,
    table_name,
    properties=prop
)

In [7]:
retail_df.printSchema()

root
 |-- invoiceno: string (nullable = true)
 |-- stockcode: string (nullable = true)
 |-- description: string (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- invoicedate: date (nullable = true)
 |-- unitprice: double (nullable = true)
 |-- customerid: string (nullable = true)
 |-- country: string (nullable = true)



In [8]:
retail_df.show(10)

+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|invoiceno|stockcode|         description|quantity|invoicedate|unitprice|customerid|       country|
+---------+---------+--------------------+--------+-----------+---------+----------+--------------+
|   536365|   85123A|WHITE HANGING HEA...|       6| 2010-12-01|     2.55|     17850|United Kingdom|
|   536365|    71053| WHITE METAL LANTERN|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|   84406B|CREAM CUPID HEART...|       8| 2010-12-01|     2.75|     17850|United Kingdom|
|   536365|   84029G|KNITTED UNION FLA...|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|   84029E|RED WOOLLY HOTTIE...|       6| 2010-12-01|     3.39|     17850|United Kingdom|
|   536365|    22752|SET 7 BABUSHKA NE...|       2| 2010-12-01|     7.65|     17850|United Kingdom|
|   536365|    21730|GLASS STAR FROSTE...|       6| 2010-12-01|     4.25|     17850|United Kingdom|


In [9]:
# # Calculate the total spending per invoice line
# spending_df = retail_df.withColumn('total_spending', col('quantity') * col('unitprice'))

# # Group by country and calculate the total spending per country
# country_spending = spending_df.groupBy('country').agg(_sum('total_spending').alias('total_spending'))

# # Format the total_spending column to 2 decimal places
# country_spending = country_spending.withColumn('total_spending', format_number('total_spending', 2))

# # Show the result
# country_spending.show()

In [10]:
# Convert invoicedate to date type
DateInvoice_df = retail_df.withColumn('invoicedate', to_date(col('invoicedate')))

# Aggregate data by customerid and invoicedate
Date_analysis = DateInvoice_df.groupBy('invoicedate').agg(
    _sum('quantity').alias('Total_Item_Purchased'),
    _sum('unitprice').alias('Total_Customer_Spent')
)
# Format the total_quantity and total_unitprice columns to 2 decimal places
Date_analysis = Date_analysis.withColumn('Total_Item_Purchased', format_number('Total_Item_Purchased', 2)) \
                             .withColumn('Total_Customer_Spent', format_number('Total_Customer_Spent', 2))

# Show the result
Date_analysis.show()

+-----------+--------------------+--------------------+
|invoicedate|Total_Item_Purchased|Total_Customer_Spent|
+-----------+--------------------+--------------------+
| 2011-01-30|            3,367.00|            2,321.72|
| 2011-05-06|           19,632.00|            7,276.01|
| 2011-10-07|           30,431.00|           13,428.76|
| 2011-01-23|            5,194.00|            2,611.96|
| 2011-07-07|           18,956.00|            6,634.44|
| 2011-07-18|           14,366.00|           27,456.49|
| 2011-08-21|            8,164.00|            3,326.30|
| 2011-11-18|           23,535.00|           12,070.65|
| 2010-12-15|           18,229.00|            4,819.31|
| 2010-12-01|           26,814.00|           12,904.25|
| 2011-11-14|           45,959.00|           15,262.54|
| 2011-02-21|           21,456.00|           17,596.67|
| 2011-04-06|            8,576.00|            3,770.26|
| 2011-06-21|            7,846.00|            6,712.19|
| 2011-09-04|           10,941.00|            3,

In [11]:
Date_analysis.write.jdbc(
    url=jdbc_url,
    table="StockSpent_analysis_jupyter",
    mode="append",
    properties=prop
)