In [None]:
import pyspark
import os
import json
import argparse

from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType
from pyspark.sql.functions import to_timestamp,col,when

## Initialization

In [None]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

In [None]:
postgres_host = os.getenv('POSTGRES_CONTAINER_NAME')
postgres_dw_db = os.getenv('POSTGRES_DW_DB')
postgres_user = os.getenv('POSTGRES_USER')
postgres_password = os.getenv('POSTGRES_PASSWORD')

In [None]:
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
        .set("spark.jars", "/opt/postgresql-42.2.18.jar")
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [None]:
spark

## Basic

In [None]:
schema = "invoice_no long, stock_code string, description string, quantity int, invoice_date string, unit_price float, customer_id int, country string"

df = spark.read.csv("/resources/data/online-retail-dataset.csv", header=True, schema=schema)
df = df.withColumn("invoice_date", to_timestamp(df["invoice_date"], "M/d/yyyy H:mm"))

In [None]:
# == Viewing Data ==
# Display top 5 rows
df.show(5)

# Display the schema of the DataFrame
df.printSchema()

# List all column names
df.columns

In [None]:
# == Selecting Columns ==
# Select specific columns
df.select("invoice_no", "stock_code").show()

# Using alias to rename a column
df.select(df.invoice_no.alias("invoice_number")).show()

In [None]:
# == Filtering Rows (WHERE) ==
# Filter rows using a condition
df.filter(df.unit_price > 5).show()

# Filter using multiple conditions
df.filter((df.unit_price > 5) & (df.customer_id == "16503")).show()


In [None]:
# Cell 6: Sorting Rows (ORDER BY)
# Sort by a single column
df.orderBy("invoice_date").show()

# Sort by multiple columns
df.orderBy(df.invoice_no.asc(), df.invoice_date.desc()).show()


In [None]:
# Cell 7: Aggregations (GROUP BY)
# Group by and aggregate using count, sum, avg, etc.
df.groupBy("customer_id").count().show()
df.groupBy("customer_id").sum("unit_price").show()
df.groupBy("customer_id").agg({"unit_price": "mean", "unit_price": "max"}).show()


In [None]:
# == CASE WHEN (Conditional Statements) ==
# Create a new column with conditional values
df = df.withColumn("price_status", when(df.unit_price > 7, "High").otherwise("Low"))
df.show()


In [None]:
# == Handling Missing Data == 
# Drop rows with any nulls
df = df.na.drop()

# Fill nulls in a specific column with a value
df = df.na.fill({"description": "no description", "unit_price": 0})


## UDF

In [None]:
from pyspark.sql.functions import col, udf, pandas_udf
from pyspark.sql.types import IntegerType

# Create a sample dataframe
df = spark.createDataFrame([(1, "apple"), (2, "banana"), (3, "orange"), 
                            (4, "apple"), (5, "banana"), (6, "orange"),
                            (7, "apple"), (8, "banana"), (9, "orange"),
                            (10, "apple")], ["id", "fruit"])

In [None]:
# Define the UDF logic
def string_length(s):
    return len(s)

# Define the Python UDF
string_length_udf = udf(string_length, IntegerType())

# Apply the Python UDF and display the result
df1 = df.withColumn("length", string_length_udf(col("fruit")))
df1.show()

In [None]:
import pandas as pd

# Define the Pandas UDF
@pandas_udf(IntegerType())
def string_length_pandas_udf(s: pd.Series) -> pd.Series:
    return s.str.len()
    
# Apply the Pandas UDF and display the result
df2 = df.withColumn("length", string_length_pandas_udf(col("fruit")))
df2.show()

## Join

In [None]:
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [None]:
# define schema for purchases dataset
purchases_schema = "order_id int, customer_id int, product_id int, quantity int, price float"

# create purchases dataframe
purchases_data = [
    (101, 1, 1, 2, 19.99), # Worker 1
    (102, 2, 2, 1, 9.99), # Worker 1
    (103, 3, 3, 1, 15.99), # Worker 1
    (104, 1, 4, 1, 5.99), # Worker 2
    (105, 2, 5, 3, 12.99), # Worker 2
    (106, 3, 6, 2, 9.99), # Worker 2
    (107, 4, 7, 1, 11.99), # Worker 2
    (108, 1, 8, 2, 14.99), # Worker 3
    (109, 2, 9, 1, 9.99), # Worker 3
    (110, 3, 10, 1, 19.99) # Worker 3
]
purchases_df = spark.createDataFrame(purchases_data, schema=purchases_schema)
# 10.000 row

# define schema for customers dataset
customers_schema = "customer_id int, name string, email string"

# create customers dataframe
customers_data = [
    (1, "John Doe", "johndoe@example.com"), # Worker 1
    (2, "Jane Smith", "janesmith@example.com"), # Worker 2
    (3, "Bob Johnson", "bobjohnson@example.com"), # Worker 3
    (4, "Sue Lee", "suelee@example.com"), # Worker 3
]

customers_df = spark.createDataFrame(customers_data, schema=customers_schema)
# 1000 row

# define schema for products dataset
products_schema = "product_id int, name string, price float"

# create products dataframe
products_data = [
    (1, "Product A", 19.99),
    (2, "Product B", 9.99),
    (3, "Product C", 15.99),
    (4, "Product D", 5.99),
    (5, "Product E", 12.99),
    (6, "Product F", 9.99),
    (7, "Product G", 11.99),
    (8, "Product H", 14.99),
    (9, "Product I", 9.99),
    (10, "Product J", 19.99)
]
products_df = spark.createDataFrame(products_data, schema=products_schema)

In [None]:

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# perform sort merge join
merged_df = (
    purchases_df
    .join(customers_df, "customer_id")
    .join(products_df, "product_id")
)

In [None]:
merged_df.show()

In [None]:
merged_df.show()

In [None]:
from pyspark.sql.functions import broadcast

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")

# perform broadcast hash join
broadcast_df = purchases_df.join(broadcast(customers_df), "customer_id").join(broadcast(products_df), "product_id")
broadcast_df.show(5)

In [None]:
broadcast_df.show(1)

## Cache & Persist

In [None]:
broadcast_df.show()

In [None]:
# cache the purchases DataFrame
# broadcast_df.cache()
broadcast_df.unpersist()

In [None]:
from pyspark.sql.functions import col

# calculate the total purchase amount for each store using the cached DataFrame
store_purchase_totals = (
    purchases_df
    .withColumn("total_price",col("quantity")*col("price"))
    .groupBy("customer_id")
    .agg({"total_price":"sum"}).alias("total_purchase_amount")
)

In [None]:
# persist the store_purchase_totals DataFrame to disk
# store_purchase_totals.persist(pyspark.StorageLevel.DISK_ONLY)
store_purchase_totals.persist(pyspark.StorageLevel.MEMORY_ONLY)


In [None]:
# print the results
store_purchase_totals.show()

In [None]:
# print the results
store_purchase_totals.show()

In [None]:
# unpersist the store_purchase_totals DataFrame to free up memory
store_purchase_totals.unpersist()

Balik lagi ke collab

# JDBC

In [None]:
jdbc_url = f'jdbc:postgresql://{postgres_host}/{postgres_dw_db}'
jdbc_properties = {
    'user': postgres_user,
    'password': postgres_password,
    'driver': 'org.postgresql.Driver',
    'stringtype': 'unspecified'
}

In [None]:
retail_df = spark.read.jdbc(
    jdbc_url,
    'public.retail',
    properties=jdbc_properties
)

In [None]:
retail_df.show(5)

In [None]:
(
    retail_df
    .limit(10)
    .write
    .mode("append")
    .option("truncate", "true")
    .jdbc(
        jdbc_url,
        'public.sample_retail',
        properties=jdbc_properties
    )
)

In [None]:
(
    spark
    .read
    .jdbc(
        jdbc_url,
        'public.sample_retail',
        properties=jdbc_properties
    )
    .show()
)