In [1]:
from dotenv import load_dotenv
import os
from pathlib import Path
import psycopg2 as pg


# Load environment variables
ENV_PATH = "./.env"
load_dotenv(ENV_PATH)

False

## Setup

In [2]:
from pyhive import hive
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, date_format, when
import pandas as pd
from sqlalchemy import create_engine, text
from sqlalchemy.exc import SQLAlchemyError
import shutil

db_host = os.getenv("POSTGRES_HOST","localhost")
db_port = os.getenv("POSTGRES_PORT","1000")
db_user = os.getenv("POSTGRES_USER","postgres")
db_password = os.getenv("POSTGRES_PASSWORD","postgres")
db_database = os.getenv("POSTGRES_DATABASE","test")

# Create a connection to the PostgreSQL database
pg_engine = create_engine(f'postgresql://{db_user}:{db_password}@{db_host}:{db_port}/{db_database}')

spark = SparkSession.builder \
    .appName("test") \
    .config("spark.hadoop.hive.metastore.uris", "thrift://localhost:9083") \
    .config("spark.sql.warehouse.dir", "/opt/hive/warehouse") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .enableHiveSupport() \
    .getOrCreate()


23/12/11 09:08:18 WARN Utils: Your hostname, super01 resolves to a loopback address: 127.0.1.1; using 192.168.102.63 instead (on interface bond0)
23/12/11 09:08:18 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/12/11 09:08:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/12/11 09:08:19 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## etl restaurant detail to hive

In [8]:
        
etl_restaurant_statement = """
    with 
    lastest_dt_restaurant as (
        SELECT restaurant_id, max(TO_CHAR(order_created_timestamp,'YYYYMMDD')) as dt 
        FROM public.order_detail
        GROUP BY restaurant_id
    )

    select rd.*, ldr.dt
    from public.restaurant_detail rd
    left join lastest_dt_restaurant ldr on rd.id = ldr.restaurant_id
"""


with pg_engine.connect() as pg_conn:

    df_restaurant = pd.read_sql_query(etl_restaurant_statement, pg_conn)
    spark_df_restaurant = spark.createDataFrame(df_restaurant)
    spark_df_restaurant.write.mode("overwrite").format("parquet").partitionBy("dt").saveAsTable("restaurant_detail")



23/12/11 08:23:06 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.
                                                                                

## etl order_detail to hive

In [9]:
with pg_engine.connect() as pg_conn:
    df_order_detail = pd.read_sql_query("select * from order_detail", pg_conn)
    spark_df_order = spark.createDataFrame(df_order_detail) 
    spark_df_order = spark_df_order.withColumn("dt", date_format(col("order_created_timestamp"),"yyyymmdd"))
    spark_df_order.write.mode("overwrite").format("parquet").partitionBy("dt").saveAsTable("order_detail")

23/12/11 08:24:59 WARN TaskSetManager: Stage 9 contains a task of very large size (1142 KiB). The maximum recommended task size is 1000 KiB.
23/12/11 08:25:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 98.06% for 31 writers
23/12/11 08:25:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 95.00% for 32 writers
23/12/11 08:25:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 92.12% for 33 writers
23/12/11 08:25:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 89.41% for 34 writers
23/12/11 08:25:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 86.86% for 35 writers
23/12/11 08:25:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,21

## create new order detail

In [10]:
spark_df_order_new = spark.sql("SELECT * FROM order_detail")
spark_df_order_new = spark_df_order_new.withColumn("discount_no_null", col("discount")).fillna(0)
spark_df_order_new.write.mode("overwrite").saveAsTable("__order_detail_new__")

23/12/11 08:35:00 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 98.06% for 31 writers
23/12/11 08:35:00 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 95.00% for 32 writers
23/12/11 08:35:00 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 92.12% for 33 writers
23/12/11 08:35:00 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 89.41% for 34 writers
23/12/11 08:35:00 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 86.86% for 35 writers
23/12/11 08:35:00 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 84.44% for 36 writers
23/12/11 08:35:00 WARN MemoryManager: Total allocation exceeds 9

## create new restaurant detail

In [15]:
spark_df_restaurant_new = spark.sql("SELECT * FROM restaurant_detail")
spark_df_restaurant_new = spark_df_restaurant_new.withColumn(
    "cooking_bin", 
    when(col("esimated_cooking_time").between(1,40),1)
    .when(col("esimated_cooking_time").between(41,80),2)
    .when(col("esimated_cooking_time").between(81,120),3)
    .when(col("esimated_cooking_time")>120,4)
    .otherwise(0)
)
spark_df_restaurant_new.write.mode("overwrite").saveAsTable("__restaurant_detail_new__")

23/12/11 09:01:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 98.06% for 31 writers
23/12/11 09:01:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 95.00% for 32 writers
23/12/11 09:01:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 92.12% for 33 writers
23/12/11 09:01:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 89.41% for 34 writers
23/12/11 09:01:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 86.86% for 35 writers
23/12/11 09:01:01 WARN MemoryManager: Total allocation exceeds 95.00% (4,080,218,880 bytes) of heap memory
Scaling row group sizes to 84.44% for 36 writers
23/12/11 09:01:01 WARN MemoryManager: Total allocation exceeds 9

## sql requiement

In [3]:
statement = """
    SELECT res.category, AVG(order.discount_no_null) AS avg_discount
    FROM __order_detail_new__ AS order
    LEFT JOIN restaurant_detail AS res ON order.restaurant_id = res.id
    GROUP BY res.category
"""

df = spark.sql(statement)
# df.write.csv("../sql_requirements/avg_discount_by_category", header=True)

NameError: name 'df' is not defined

In [None]:
statement = """
    SELECT cooking_bin, COUNT(*) as row_count
    FROM __restaurant_detail_new__
    GROUP BY cooking_bin
"""

df = spark.sql(statement)
df.write.csv("../sql_requirements/row_count_by_cooking_bin", header=True)

In [None]:
df.show()