# ETL Pipeline with Spark

In [1]:
from pyspark.sql import SparkSession

# create Spark session with mysql connector jar
spark = SparkSession.builder \
    .master('local') \
    .config("spark.jars", "/home/phinguyen/lib/mysql-connector-j-8.0.33.jar") \
    .appName('ETL_Pipeline_Testing') \
    .getOrCreate()
spark

24/05/26 02:35:47 WARN Utils: Your hostname, desktop resolves to a loopback address: 127.0.1.1; using 172.18.52.176 instead (on interface eth0)
24/05/26 02:35:47 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
24/05/26 02:35:49 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [2]:
from pyspark import SparkConf

# Initialize SparkConf
conf = SparkConf()

# Get the executor memory
executor_memory = conf.get("spark.executor.memory")
print(f"Executor Memory: {executor_memory}")

# Get the driver memory
driver_memory = conf.get("spark.driver.memory")
print(f"Driver Memory: {driver_memory}")

Executor Memory: None
Driver Memory: None


In [3]:
# Assuming spark is your SparkSession
spark_context = spark.sparkContext

# Get the executor memory
executor_memory = spark_context.getConf().get("spark.executor.memory")
print(f"Executor Memory: {executor_memory}")

# Get the driver memory
driver_memory = spark_context.getConf().get("spark.driver.memory")
print(f"Driver Memory: {driver_memory}")

Executor Memory: None
Driver Memory: None


In [2]:
# get table Customer from mysql
customer_df = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://localhost:3306/retail_db") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "Customer") \
    .option("user", "admin") \
    .option("password", "adminpassword") \
    .load()

In [3]:
customer_df

DataFrame[CustomerID: int, Name: string, Email: string, Age: int, ModifiedDate: timestamp]

In [4]:
# create SQL view
customer_df.createOrReplaceTempView("mySQL_customer_tbl")

In [5]:
# run SQL query on the view
spark.sql("""
    SELECT 
        *
    FROM mySQL_customer_tbl
    LIMIT 3
""").show()

                                                                                

+----------+----------------+--------------------+---+-------------------+
|CustomerID|            Name|               Email|Age|       ModifiedDate|
+----------+----------------+--------------------+---+-------------------+
|         1|Christina Savage| vgreene@example.com| 59|2022-10-20 00:00:00|
|         2|   Zachary Green|costajohn@example...| 85|2022-10-20 00:00:00|
|         3|   Andrea Wilson|  john53@example.org| 73|2022-10-20 00:00:00|
+----------+----------------+--------------------+---+-------------------+



In [12]:
max_cus = spark.sql("""
    SELECT
        MAX(CustomerID)
    FROM mySQL_customer_tbl
""")

In [15]:
max_cus.collect()[0][0]

103358

In [11]:
customer_df.selectExpr("MAX(CustomerID)").collect()[0][0]

103358

In [6]:
# transform the spark dataframe
customer_output_df = spark.sql("""
    SELECT
        *,
        YEAR(ModifiedDate) AS year,
        MONTH(ModifiedDate) AS month,
        DAY(ModifiedDate) AS day
    FROM mySQL_customer_tbl
""")
print(type(customer_output_df))
customer_output_df.show(3)

<class 'pyspark.sql.dataframe.DataFrame'>
+----------+----------------+--------------------+---+-------------------+----+-----+---+
|CustomerID|            Name|               Email|Age|       ModifiedDate|year|month|day|
+----------+----------------+--------------------+---+-------------------+----+-----+---+
|         1|Christina Savage| vgreene@example.com| 59|2022-10-20 00:00:00|2022|   10| 20|
|         2|   Zachary Green|costajohn@example...| 85|2022-10-20 00:00:00|2022|   10| 20|
|         3|   Andrea Wilson|  john53@example.org| 73|2022-10-20 00:00:00|2022|   10| 20|
+----------+----------------+--------------------+---+-------------------+----+-----+---+
only showing top 3 rows



In [4]:
from py4j.java_gateway import java_import

# Import necessary classes from Java
java_import(spark._jvm, 'org.apache.hadoop.fs.FileSystem')
java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
java_import(spark._jvm, 'org.apache.hadoop.fs.FileStatus')

# Get the Hadoop configuration
hadoop_conf = spark._jsc.hadoopConfiguration() # current fs.defaultFS is 'file:///' which pointing to local file system.
hadoop_conf.set("fs.defaultFS", "hdfs://localhost:9000") # This ensures that Spark uses HDFS instead of the local file system.

# Create a FileSystem object
fs = spark._jvm.FileSystem.get(hadoop_conf)

# Define the HDFS directory path
hdfs_directory_path = spark._jvm.Path("/")

# List the contents of the directory
file_statuses = fs.listStatus(hdfs_directory_path)

# Iterate through the statuses and print them
for file_status in file_statuses:
    print(file_status.getPath())

hdfs://localhost:9000/tmp
hdfs://localhost:9000/user


In [5]:
datalake_directory_path = spark._jvm.Path("/datalake")

# List the contents of the directory
file_statuses = fs.listStatus(datalake_directory_path)

# Iterate through the statuses and print them
for file_status in file_statuses:
    print(file_status.getPath())

Py4JJavaError: An error occurred while calling o114.listStatus.
: java.io.FileNotFoundException: File /datalake does not exist.
	at org.apache.hadoop.hdfs.DistributedFileSystem.listStatusInternal(DistributedFileSystem.java:1104)
	at org.apache.hadoop.hdfs.DistributedFileSystem.access$600(DistributedFileSystem.java:147)
	at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1175)
	at org.apache.hadoop.hdfs.DistributedFileSystem$24.doCall(DistributedFileSystem.java:1172)
	at org.apache.hadoop.fs.FileSystemLinkResolver.resolve(FileSystemLinkResolver.java:81)
	at org.apache.hadoop.hdfs.DistributedFileSystem.listStatus(DistributedFileSystem.java:1182)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)


In [None]:
customer_directory_path = spark._jvm.Path("/datalake/Customer")
# Check if the directory exists
exists = fs.exists(customer_directory_path)
exists

In [None]:
if exists:
    # Convert Path to string
    customer_directory_str = str(customer_directory_path.toString())
    print(customer_directory_str)
    datalake_customer_df = spark.read.parquet(customer_directory_str)
    datalake_customer_df.show(3)

In [None]:
datalake_customer_df.selectExpr("max(CustomerID)").collect()[0]

In [None]:
print(type(spark))

# Spark OOM handling

In [1]:
import os
from dotenv import load_dotenv
from pyspark.sql import SparkSession
import argparse
import logging

In [2]:
# Setup logging
logs_dir = "logs"
if not os.path.exists(logs_dir):
    os.makedirs(logs_dir)

logging.basicConfig(
    filename=os.path.join(logs_dir, "ingestion.log"),
    level=logging.INFO,
    format='%(asctime)s %(levelname)s %(message)s')
logger = logging.getLogger(__name__)

# Add ANSI escape sequences for bold yellow text
BOLD_YELLOW = "\033[1m\033[93m"
RESET = "\033[0m"

In [3]:
def get_hdfs_FileSystem_obj(spark: SparkSession, hdfs_uri: str):
    from py4j.java_gateway import java_import
    
    # Import necessary classes from Java
    java_import(spark._jvm, 'org.apache.hadoop.fs.FileSystem')
    java_import(spark._jvm, 'org.apache.hadoop.fs.Path')
    java_import(spark._jvm, 'org.apache.hadoop.fs.FileStatus')
    
    # Get the Hadoop configuration
    hadoop_conf = spark._jsc.hadoopConfiguration()
    hadoop_conf.set("fs.defaultFS", hdfs_uri)

    # Create a FileSystem object
    return spark._jvm.FileSystem.get(hadoop_conf)

In [4]:
table_name = "SalesOrderDetail"

# Define table-specific details
table_details = {
    "Customer": {"primary_col": "CustomerID", "date_col": "ModifiedDate"},
    "SalesOrderHeader": {"primary_col": "SalesOrderID", "date_col": "OrderDate"},
    "SalesOrderDetail": {"primary_col": "SalesOrderID", "date_col": "ModifiedDate"}
}

primary_col = table_details[table_name]["primary_col"]
date_col = table_details[table_name]["date_col"]

# Load environment variables
load_dotenv()
mysql_user = os.getenv('MYSQL_USER')
mysql_password = os.getenv('MYSQL_PASSWORD')
mysql_host = os.getenv('MYSQL_HOST')
mysql_database_name = os.getenv('MYSQL_DATABASE')

# Create Spark session
spark = SparkSession.builder \
    .config("spark.driver.memory", "2g") \
    .config("spark.driver.cores", "1") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.cores", "4") \
    .config("spark.jars", "/home/phinguyen/lib/mysql-connector-j-8.0.33.jar") \
    .master("yarn") \
    .appName("Ingestion - from OLTP Database (MySQL) to DataLake (Hadoop HDFS)") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble
24/05/23 04:16:33 WARN Utils: Your hostname, Phii resolves to a loopback address: 127.0.1.1; using 172.19.26.206 instead (on interface eth0)
24/05/23 04:16:33 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Exception in thread "main" org.apache.spark.SparkException: When running with master 'yarn' either HADOOP_CONF_DIR or YARN_CONF_DIR must be set in the environment.
	at org.apache.spark.deploy.SparkSubmitArguments.error(SparkSubmitArguments.scala:650)
	at org.apache.spark.deploy.SparkSubmitArguments.validateSubmitArguments(SparkSubmitArguments.scala:281)
	at org.apache.spark.deploy.SparkSubmitArguments.validateArguments(SparkSubmitArguments.scala:237)
	at org.apache.spark.deploy.SparkSubmitArguments.<init>(SparkSubmitArguments.scala:122)
	at org.apache.spark.deploy.SparkSubmit$$anon$2$$anon$3.<init>(SparkSubmit.scala:1103)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.parseArguments(SparkSubmit.scala:1103)
	at

PySparkRuntimeError: [JAVA_GATEWAY_EXITED] Java gateway process exited before sending its port number.

In [None]:
fs = get_hdfs_FileSystem_obj(spark, hdfs_uri="hdfs://localhost:9900")
table_dir_str = "/datalake/" + table_name
hdfs_table_dir_path = spark._jvm.Path(table_dir_str)

# Check for existing data in HDFS
if fs.exists(hdfs_table_dir_path):
    hdfs_df = spark.read.parquet(table_dir_str)
    hdfs_df.createOrReplaceTempView("hdfs_table")
    latest_record = spark.sql(f"SELECT MAX({primary_col}) FROM hdfs_table").collect()[0][0]
else:
    latest_record = 0

# Load new data from MySQL
mysql_df = spark.read.format("jdbc") \
    .option("url", f"jdbc:mysql://{mysql_host}:3306/{mysql_database_name}") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", table_name) \
    .option("user", mysql_user) \
    .option("password", mysql_password) \
    .load()

mysql_df.createOrReplaceTempView("mysql_table")
output_df = spark.sql(f"""
                        SELECT
                            *,
                            YEAR({date_col}) AS year,
                            MONTH({date_col}) AS month,
                            DAY({date_col}) AS day
                        FROM mysql_table
                        WHERE {primary_col} > {latest_record}
                        """)

new_records_cnt = output_df.selectExpr(f"COUNT({primary_col})").collect()[0][0]

                                                                                

In [None]:
new_records_cnt

7783525

In [None]:
total_rows = 7783525
estimated_row_size_bytes = 24
total_memory_consumption_bytes = total_rows * estimated_row_size_bytes
total_memory_consumption_gb = total_memory_consumption_bytes / (1024**3)  # Convert bytes to GB
total_memory_consumption_gb

0.17397534102201462

In [None]:
output_df.write.partitionBy("year", "month", "day").mode("append").parquet(table_dir_str)

24/05/23 04:09:50 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
24/05/23 04:09:52 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
24/05/23 04:09:55 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
24/05/23 04:09:57 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
24/05/23 04:10:00 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
24/05/23 04:10:02 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
24/05/23 04:10:06 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
24/05/23 04:10:09 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
24/05/23 04:10:12 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
24/05/23 04:10:14 WARN TaskMemoryManager: Failed to allocate a page (67108864 bytes), try again.
24/05/23 04:10:17 WARN TaskMem

In [6]:
customer_df = spark.read.parquet("/datalake/Customer")

                                                                                

In [7]:
customer_df.createOrReplaceTempView("customer_datalake_view")
count_last_date = spark.sql("""
    SELECT COUNT(1)
    FROM customer_datalake_view
    WHERE ModifiedDate = '2024-02-13'
""")

In [8]:
count_last_date

DataFrame[count(1): bigint]

In [9]:
count_last_date.show()

                                                                                

+--------+
|count(1)|
+--------+
|  200265|
+--------+



In [10]:
df1 = spark.range(1, 100, 2)
df1.show()

+---+
| id|
+---+
|  1|
|  3|
|  5|
|  7|
|  9|
| 11|
| 13|
| 15|
| 17|
| 19|
| 21|
| 23|
| 25|
| 27|
| 29|
| 31|
| 33|
| 35|
| 37|
| 39|
+---+
only showing top 20 rows



In [11]:
df2 = df1.selectExpr("id * 5 as 5id")
df2.show()

+---+
|5id|
+---+
|  5|
| 15|
| 25|
| 35|
| 45|
| 55|
| 65|
| 75|
| 85|
| 95|
|105|
|115|
|125|
|135|
|145|
|155|
|165|
|175|
|185|
|195|
+---+
only showing top 20 rows



In [12]:
df1 = spark.range(2, 10000000, 2)
df2 = spark.range(2, 10000000, 4)
step1 = df1.repartition(5)
step12 = df2.repartition(6)
step2 = step1.selectExpr("id * 5 as id")
step3 = step2.join(step12, ["id"])
step4 = step3.selectExpr("sum(id)")
step4.collect() # 2500000000000

                                                                                

[Row(sum(id)=2500000000000)]

In [13]:
step4.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(7) HashAggregate(keys=[], functions=[sum(id#48L)])
   +- ShuffleQueryStage 4
      +- Exchange SinglePartition, ENSURE_REQUIREMENTS, [plan_id=342]
         +- *(6) HashAggregate(keys=[], functions=[partial_sum(id#48L)])
            +- *(6) Project [id#48L]
               +- *(6) SortMergeJoin [id#48L], [id#42L], Inner
                  :- *(4) Sort [id#48L ASC NULLS FIRST], false, 0
                  :  +- AQEShuffleRead coalesced
                  :     +- ShuffleQueryStage 2
                  :        +- Exchange hashpartitioning(id#48L, 200), ENSURE_REQUIREMENTS, [plan_id=213]
                  :           +- *(3) Project [(id#40L * 5) AS id#48L]
                  :              +- ShuffleQueryStage 0
                  :                 +- Exchange RoundRobinPartitioning(5), REPARTITION_BY_NUM, [plan_id=146]
                  :                    +- *(1) Range (2, 10000000, step=2, splits=1)
            