In [8]:
import pyspark
import os
import json
import argparse

from dotenv import load_dotenv
from pathlib import Path
from pyspark.sql.types import StructType
from pyspark.sql.functions import to_timestamp,col,when

## Initialization

In [9]:
dotenv_path = Path('/resources/.env')
load_dotenv(dotenv_path=dotenv_path)

True

In [10]:
postgres_host = os.getenv('POSTGRES_CONTAINER_NAME')
postgres_dw_db = os.getenv('POSTGRES_DW_DB')
postgres_user = os.getenv('POSTGRES_USER')
postgres_password = os.getenv('POSTGRES_PASSWORD')

In [11]:
sparkcontext = pyspark.SparkContext.getOrCreate(conf=(
        pyspark
        .SparkConf()
        .setAppName('Dibimbing')
        .setMaster('local')
        .set("spark.jars", "/opt/postgresql-42.2.18.jar")
    ))
sparkcontext.setLogLevel("WARN")

spark = pyspark.sql.SparkSession(sparkcontext.getOrCreate())

In [12]:
spark

## UDF

In [15]:
from pyspark.sql.functions import col, udf, pandas_udf
from pyspark.sql.types import IntegerType

# Create a sample dataframe
df = spark.createDataFrame([(1, "apple"), (2, "banana"), (3, "orange"), 
                            (4, "apple"), (5, "banana"), (6, "orange"),
                            (7, "apple"), (8, "banana"), (9, "orange"),
                            (10, "apple")], ["id", "fruit"])

In [16]:
# Define the UDF logic
def string_length(s):
    return len(s)

# Define the Python UDF
string_length_udf = udf(string_length, IntegerType())

# Apply the Python UDF and display the result
df1 = df.withColumn("length", string_length_udf(col("fruit")))
df1.show()

+---+------+------+
| id| fruit|length|
+---+------+------+
|  1| apple|     5|
|  2|banana|     6|
|  3|orange|     6|
|  4| apple|     5|
|  5|banana|     6|
|  6|orange|     6|
|  7| apple|     5|
|  8|banana|     6|
|  9|orange|     6|
| 10| apple|     5|
+---+------+------+



In [17]:
import pandas as pd

# Define the Pandas UDF
@pandas_udf(IntegerType())
def string_length_pandas_udf(s: pd.Series) -> pd.Series:
    return s.str.len()
    
# Apply the Pandas UDF and display the result
df2 = df.withColumn("length", string_length_pandas_udf(col("fruit")))
df2.show()

+---+------+------+
| id| fruit|length|
+---+------+------+
|  1| apple|     5|
|  2|banana|     6|
|  3|orange|     6|
|  4| apple|     5|
|  5|banana|     6|
|  6|orange|     6|
|  7| apple|     5|
|  8|banana|     6|
|  9|orange|     6|
| 10| apple|     5|
+---+------+------+



## Join

In [18]:
spark.conf.set("spark.sql.adaptive.enabled", "false")

In [22]:
# define schema for purchases dataset
purchases_schema = "order_id int, customer_id int, product_id int, quantity int, price float"

# create purchases dataframe
purchases_data = [
    (101, 1, 1, 2, 19.99),
    (102, 2, 2, 1, 9.99),
    (103, 3, 3, 1, 15.99),
    (104, 1, 4, 1, 5.99),
    (105, 2, 5, 3, 12.99),
    (106, 3, 6, 2, 9.99),
    (107, 4, 7, 1, 11.99),
    (108, 1, 8, 2, 14.99),
    (109, 2, 9, 1, 9.99),
    (110, 3, 10, 1, 19.99)
]
purchases_df = spark.createDataFrame(purchases_data, schema=purchases_schema)

# define schema for customers dataset
customers_schema = "customer_id int, name string, email string"

# create customers dataframe
customers_data = [
    (1, "John Doe", "johndoe@example.com"),
    (2, "Jane Smith", "janesmith@example.com"),
    (3, "Bob Johnson", "bobjohnson@example.com"),
    (4, "Sue Lee", "suelee@example.com"),
    (5, "Suikim Lee", "suekimlee@example.com")
]
customers_df = spark.createDataFrame(customers_data, schema=customers_schema)

# define schema for products dataset
products_schema = "product_id int, name string, price float"

# create products dataframe
products_data = [
    (1, "Product A", 19.99),
    (2, "Product B", 9.99),
    (3, "Product C", 15.99),
    (4, "Product D", 5.99),
    (5, "Product E", 12.99),
    (6, "Product F", 9.99),
    (7, "Product G", 11.99),
    (8, "Product H", 14.99),
    (9, "Product I", 9.99),
    (10, "Product J", 19.99)
]
products_df = spark.createDataFrame(products_data, schema=products_schema)

In [23]:
customers_df.show()

+-----------+-----------+--------------------+
|customer_id|       name|               email|
+-----------+-----------+--------------------+
|          1|   John Doe| johndoe@example.com|
|          2| Jane Smith|janesmith@example...|
|          3|Bob Johnson|bobjohnson@exampl...|
|          4|    Sue Lee|  suelee@example.com|
|          5| Suikim Lee|suekimlee@example...|
+-----------+-----------+--------------------+



In [24]:

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# perform sort merge join
merged_df = (
    purchases_df
    .join(customers_df, "customer_id")
    .join(products_df, "product_id")
)

In [26]:
merged_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         9|          2|     109|       1| 9.99| Jane Smith|janesmith@example...|Product I| 9.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         8|          1|     108|       2|14.99|   John Doe| johndoe@example.com|Product H|14.99|
|         7|        

In [27]:
from pyspark.sql.functions import broadcast

# set join preferences
spark.conf.set("spark.sql.join.preferSortMergeJoin", "false")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "1000000000")

# perform broadcast hash join
broadcast_df = purchases_df.join(broadcast(customers_df), "customer_id").join(broadcast(products_df), "product_id")
broadcast_df.show(5)

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         2|          2|     102|       1| 9.99| Jane Smith|janesmith@example...|Product B| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
only showing top 5 rows



In [28]:
broadcast_df.show(1)

+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|    name|              email|     name|price|
+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
|         1|          1|     101|       2|19.99|John Doe|johndoe@example.com|Product A|19.99|
+----------+-----------+--------+--------+-----+--------+-------------------+---------+-----+
only showing top 1 row



## Cache & Persist

In [16]:
broadcast_df.show()

+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|product_id|customer_id|order_id|quantity|price|       name|               email|     name|price|
+----------+-----------+--------+--------+-----+-----------+--------------------+---------+-----+
|         1|          1|     101|       2|19.99|   John Doe| johndoe@example.com|Product A|19.99|
|         2|          2|     102|       1| 9.99| Jane Smith|janesmith@example...|Product B| 9.99|
|         3|          3|     103|       1|15.99|Bob Johnson|bobjohnson@exampl...|Product C|15.99|
|         4|          1|     104|       1| 5.99|   John Doe| johndoe@example.com|Product D| 5.99|
|         5|          2|     105|       3|12.99| Jane Smith|janesmith@example...|Product E|12.99|
|         6|          3|     106|       2| 9.99|Bob Johnson|bobjohnson@exampl...|Product F| 9.99|
|         7|          4|     107|       1|11.99|    Sue Lee|  suelee@example.com|Product G|11.99|
|         8|        

In [17]:
# cache the purchases DataFrame
# broadcast_df.cache()
broadcast_df.unpersist()

DataFrame[product_id: int, customer_id: int, order_id: int, quantity: int, price: float, name: string, email: string, name: string, price: float]

In [18]:
from pyspark.sql.functions import col

# calculate the total purchase amount for each store using the cached DataFrame
store_purchase_totals = (
    purchases_df
    .withColumn("total_price",col("quantity")*col("price"))
    .groupBy("customer_id")
    .agg({"total_price":"sum"}).alias("total_purchase_amount")
)

In [19]:
# persist the store_purchase_totals DataFrame to disk
store_purchase_totals.persist(pyspark.StorageLevel.DISK_ONLY)
store_purchase_totals.persist(pyspark.StorageLevel.MEMORY_ONLY)


DataFrame[customer_id: int, sum(total_price): double]

In [20]:
# print the results
store_purchase_totals.show()

+-----------+------------------+
|customer_id|  sum(total_price)|
+-----------+------------------+
|          1| 75.94999885559082|
|          3|55.959999084472656|
|          4|11.989999771118164|
|          2| 58.95000076293945|
+-----------+------------------+



In [21]:
# print the results
store_purchase_totals.show()

+-----------+------------------+
|customer_id|  sum(total_price)|
+-----------+------------------+
|          1| 75.94999885559082|
|          3|55.959999084472656|
|          4|11.989999771118164|
|          2| 58.95000076293945|
+-----------+------------------+



In [17]:
# unpersist the store_purchase_totals DataFrame to free up memory
store_purchase_totals.unpersist()

DataFrame[customer_id: int, sum(total_price): double]

Balik lagi ke collab

In [31]:
customers_df.show()

+-----------+-----------+--------------------+
|customer_id|       name|               email|
+-----------+-----------+--------------------+
|          1|   John Doe| johndoe@example.com|
|          2| Jane Smith|janesmith@example...|
|          3|Bob Johnson|bobjohnson@exampl...|
|          4|    Sue Lee|  suelee@example.com|
+-----------+-----------+--------------------+



# JDBC

In [38]:
jdbc_url = f'jdbc:postgresql://{postgres_host}/{postgres_dw_db}'
jdbc_properties = {
    'user': postgres_user,
    'password': postgres_password,
    'driver': 'org.postgresql.Driver',
    'stringtype': 'unspecified'
}

In [39]:
customer_df = spark.read.jdbc(
    jdbc_url,
    'public.retail',
    properties=jdbc_properties
)

In [40]:
retail_df = spark.read.jdbc(
    jdbc_url,
    'public.sample_retail',
    properties=jdbc_properties
)

Py4JJavaError: An error occurred while calling o264.jdbc.
: org.postgresql.util.PSQLException: ERROR: relation "public.sample_retail" does not exist
  Position: 15
	at org.postgresql.core.v3.QueryExecutorImpl.receiveErrorResponse(QueryExecutorImpl.java:2553)
	at org.postgresql.core.v3.QueryExecutorImpl.processResults(QueryExecutorImpl.java:2285)
	at org.postgresql.core.v3.QueryExecutorImpl.execute(QueryExecutorImpl.java:323)
	at org.postgresql.jdbc.PgStatement.executeInternal(PgStatement.java:473)
	at org.postgresql.jdbc.PgStatement.execute(PgStatement.java:393)
	at org.postgresql.jdbc.PgPreparedStatement.executeWithFlags(PgPreparedStatement.java:164)
	at org.postgresql.jdbc.PgPreparedStatement.executeQuery(PgPreparedStatement.java:114)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.getQueryOutputSchema(JDBCRDD.scala:68)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation$.getSchema(JDBCRelation.scala:242)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:37)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:350)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:228)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:210)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:210)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:171)
	at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:248)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)


In [34]:
(
    customer_df
    .limit(5)
    .write
    .mode("append")
    .option("truncate", "true")
    .jdbc(
        jdbc_url,
        'public.customer',
        properties=jdbc_properties
    )
)

In [36]:
(
    retail_df
    .limit(10)
    .write
    .mode("append")
    .option("truncate", "true")
    .jdbc(
        jdbc_url,
        'public.sample_retail',
        properties=jdbc_properties
    )
)

NameError: name 'retail_df' is not defined

In [35]:
retail_df.show(5)

NameError: name 'retail_df' is not defined

In [41]:
customers_df.show()

+-----------+-----------+--------------------+
|customer_id|       name|               email|
+-----------+-----------+--------------------+
|          1|   John Doe| johndoe@example.com|
|          2| Jane Smith|janesmith@example...|
|          3|Bob Johnson|bobjohnson@exampl...|
|          4|    Sue Lee|  suelee@example.com|
|          5| Suikim Lee|suekimlee@example...|
+-----------+-----------+--------------------+



In [42]:
customers_df

DataFrame[customer_id: int, name: string, email: string]