In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.window import Window
import psycopg2

# Create a SparkSession
spark = SparkSession.builder.appName("StoreSalesAnalytics").getOrCreate()


# Read the store_sales table from Yellowbrick, limited to 10k rows, sorted by ss_sold_date_sk
#Be sure to replace the user and password to the credentials you get from your sandbox sign up.
store_sales_pyspark = (spark.read.format("postgresql")
  .option("dbtable", "(SELECT * FROM tpcds_sf1000.store_sales LIMIT 10000) AS subquery")
  .option("host", "trialsandbox.sandbox.aws.yellowbrickcloud.com")
  .option("port", "5432")
  .option("database", "sample_data")
  .option("user", "your_user")
  .option("password", "your_password")
  .load())


# Perform the analytical transformations
window = Window.partitionBy("ss_sold_date_sk", "ss_customer_sk").orderBy(desc("ss_net_profit"))
store_sales_analytics = store_sales_pyspark.select(
    "ss_sold_date_sk",
    "ss_sold_time_sk",
    "ss_item_sk",
    "ss_customer_sk",
    "ss_cdemo_sk",
    "ss_hdemo_sk",
    "ss_addr_sk",
    "ss_store_sk",
    "ss_promo_sk",
    "ss_ticket_number",
    "ss_quantity",
    "ss_wholesale_cost",
    "ss_list_price",
    "ss_sales_price",
    "ss_ext_discount_amt",
    "ss_ext_sales_price",
    "ss_ext_wholesale_cost",
    "ss_ext_list_price",
    "ss_ext_tax",
    "ss_coupon_amt",
    "ss_net_paid",
    "ss_net_paid_inc_tax",
    "ss_net_profit",
    row_number().over(window).alias("rn"),
    rank().over(window).alias("rnk"),
    dense_rank().over(window).alias("dense_rnk"),
    lead("ss_net_profit", 1).over(window).alias("next_profit"),
    lag("ss_net_profit", 1).over(window).alias("prev_profit"),
    round(col("ss_sales_price") / col("ss_list_price"), 2).alias("price_discount_pct"),
    round(col("ss_ext_sales_price") / col("ss_ext_list_price"), 2).alias("total_discount_pct"),
    round(col("ss_net_profit") / col("ss_ext_sales_price"), 2).alias("profit_margin"),
    when(col("ss_net_profit") > 0, lit(1)).otherwise(lit(0)).cast("NUMERIC").alias("profitability_status")
)

# Define Yellowbrick connection details
# Be sure to replace the user and password to the credentials you get from your sandbox sign up
driver = "org.postgresql.Driver"
database_host = "trialsandbox.sandbox.aws.yellowbrickcloud.com"
database_port = "5432"
database_name = "sample_data"
table = "tpcds_sf1000.store_sales_analytics"
user = "your_user"
password = "your_password"

# Construct the Postgres URL for the custom connector
url = f"jdbc:postgresql://{database_host}:{database_port}/{database_name}?user={user}&password={password}"


# Create the store_sales_analytics table in PostgreSQL using psycopg2
create_table_query = """
CREATE TABLE IF NOT EXISTS tpcds_sf1000.store_sales_analytics (
    ss_sold_date_sk INTEGER,
    ss_sold_time_sk INTEGER,
    ss_item_sk INTEGER,
    ss_customer_sk INTEGER,
    ss_cdemo_sk INTEGER,
    ss_hdemo_sk INTEGER,
    ss_addr_sk INTEGER,
    ss_store_sk INTEGER,
    ss_promo_sk INTEGER,
    ss_ticket_number INTEGER,
    ss_quantity INTEGER,
    ss_wholesale_cost NUMERIC,
    ss_list_price NUMERIC,
    ss_sales_price NUMERIC,
    ss_ext_discount_amt NUMERIC,
    ss_ext_sales_price NUMERIC,
    ss_ext_wholesale_cost NUMERIC,
    ss_ext_list_price NUMERIC,
    ss_ext_tax NUMERIC,
    ss_coupon_amt NUMERIC,
    ss_net_paid NUMERIC,
    ss_net_paid_inc_tax NUMERIC,
    ss_net_profit NUMERIC,
    rn INTEGER,
    rnk INTEGER,
    dense_rnk INTEGER,
    next_profit NUMERIC,
    prev_profit NUMERIC,
    price_discount_pct NUMERIC,
    total_discount_pct NUMERIC,
    profit_margin NUMERIC,
    profitability_status NUMERIC
) Distribute random;
"""

# Execute the create table command using psycopg2
conn = psycopg2.connect(host=database_host, port=database_port, dbname=database_name, user=user, password=password)
cursor = conn.cursor()
cursor.execute(create_table_query)
conn.commit()

# Write the DataFrame to the table using the Postgres Spark connector
(store_sales_analytics.write
    .format("postgresql")
    .option("host", database_host)
    .option("port", database_port)
    .option("database", database_name)
    .option("user", user)
    .option("password", password)
    .option("dbtable", table)
    .mode("append")  # Append the data to the existing table
    .save()
)

# Execute the count query
cursor.execute("SELECT COUNT(*) AS row_count FROM tpcds_sf1000.store_sales_analytics;")

# Fetch the result
result = cursor.fetchone()
row_count = result[0]

# Output the row count
print(f"Row count after insertion: {row_count}")

# Clean up by dropping the store_sales_analytics table
cursor.execute("DROP TABLE IF EXISTS tpcds_sf1000.store_sales_analytics")
conn.commit()

# Close cursor and connection
cursor.close()
conn.close()

print("Cleanup: 'store_sales_analytics' table has been dropped.")
