This file will be used to contain data processing components

In [1]:
%pip install boto3 pyspark==3.5.1 delta-spark python-dotenv

You should consider upgrading via the '/home/nguyenviet/.pyenv/versions/3.8.10/envs/icctm_env/bin/python -m pip install --upgrade pip' command.[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import os
from pyspark.sql import SparkSession

In [3]:
# You can use the following to set the environment variables in the notebook if you don't set manually access key, secret key and endpoint in minio
# os.environ['OBJ_STORAGE_ACCESS_KEY'] = ''
# os.environ['OBJ_STORAGE_SECRET_KEY'] = ''
# os.environ['OBJ_STORAGE_ENDPOINT'] = ''

In [4]:
# Define S3 storage
obj_storage_access_key = os.getenv('OBJ_STORAGE_ACCESS_KEY')
obj_storage_secret_key = os.getenv('OBJ_STORAGE_SECRET_KEY')
obj_storage_endpoint = os.getenv('OBJ_STORAGE_ENDPOINT', 'http://localhost:9000')

In [5]:
path_1 = "s3a://data/data-raw/data.json"
path_2 = "s3a://data/data-raw/data2.json"
path_3 = "s3a://data/data-raw/data3.json"

In [6]:
# You need to more configuration if you want to use minio as object storage 
# (hint: maybe you can using .config() method to set the configuration if you want using spark to read/write data from/to minio)
# Note: Finding the correct version for hadoop and its dependencies is truly painful, especially for someone who have never had any experience with it.
# After a while, I found that Spark v3.5.1 (the latest version, which we are using) is built with a hadoop version of 3.3.4: 
# https://github.com/apache/spark/blob/v3.5.1/pom.xml#L125
# When the hadoop version is found, we find its dependencies by visiting this page:
# https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
# Below is hadoop-aws version 3.3.4 and its dependencies
dependencies = ["org.apache.hadoop:hadoop-aws:3.3.4",
                "com.amazonaws:aws-java-sdk-bundle:1.12.262",
                "org.apache.hadoop:hadoop-common:3.3.4"]
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config('spark.hadoop.fs.s3a.path.style.access', "true") \
    .config("spark.jars.packages", ",".join(dependencies)) \
    .config("spark.hadoop.fs.s3a.access.key", obj_storage_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", obj_storage_secret_key) \
    .config("spark.hadoop.fs.s3a.endpoint", obj_storage_endpoint) \
    .getOrCreate()

24/03/29 01:28:09 WARN Utils: Your hostname, nguyenviet-PC resolves to a loopback address: 127.0.1.1; using 192.168.1.16 instead (on interface wlo1)
24/03/29 01:28:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/nguyenviet/.pyenv/versions/3.8.10/envs/icctm_env/lib/python3.8/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/nguyenviet/.ivy2/cache
The jars for the packages stored in: /home/nguyenviet/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
com.amazonaws#aws-java-sdk-bundle added as a dependency
org.apache.hadoop#hadoop-common added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-4278b785-cc59-4048-be80-8df37cb21901;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
	found org.apache.hadoop#hadoop-common;3.3.4 in central
	found org.apache.hadoop.thirdparty#hadoop-shaded-protobuf_3_7;1.1.1 in central
	found org.apache.hadoop#hadoop-annotations;3.3.4 in central
	found org.apache.hadoop.thirdparty#hadoop-shaded-guava;1.1.1 in central
	found com.google.guava#guava;27.0-jre in central
	found com.google.guava#failureaccess;1.0 in central
	found com.google.guava#listenablefuture;

Read data files from Minio:

In [7]:
# We set read option `multiline` to True since our records span multiple lines per file.
df1 = spark.read.option("multiline","true").json(path_1)
df2 = spark.read.option("multiline","true").json(path_2)
df3 = spark.read.option("multiline","true").json(path_3)

24/03/29 01:28:34 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties


Check if all 3 dataframes has the same schema, if they are not, we will need additional processing steps. 

In [8]:
if (df1.schema == df3.schema) and (df1.schema == df2.schema):
    print("Schemas matched!")
else: 
    print("Schemas mismatch!")

Schemas matched!


Luckily our dataframes has the same schema!\
Let's merge our 3 dataframes together:

In [9]:
result = df1.union(df2).union(df3)
result.show()
print(f"There's {result.count()} records in our dataframe.")

+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|             batters|             filling|  id|               name| ppu|             topping| type|
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|{[{1001, Regular}...|                NULL|0004|              Jelly|0.65|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0005|     Custard-Filled|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|[{6001, None}, {6...|0006|     Cinnamon Twist|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0007|    Vanilla Frosted|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0008| Strawberry Frosted|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0009|     Chocolate Cake|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0010|     Blueberry Cake|0.55|[{5001, None}, {5.

Now we can upload our merged data back to the bucket:

In [10]:
# We set mode overwrite to overwrite any existed result.json file
result_path = "s3a://data/data-result/result.json"
result.write.mode("overwrite").json(result_path) 

24/03/29 01:28:40 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/03/29 01:28:40 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/03/29 01:28:40 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/03/29 01:28:40 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.


Download the uploaded result.json file and display it:

In [11]:
df = spark.read.json(result_path)
df.show()
print(f"There's {df.count()} records in our dataframe.")

24/03/29 01:28:41 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|             batters|             filling|  id|               name| ppu|             topping| type|
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|{[{1001, Regular}...|                NULL|0004|              Jelly|0.65|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0005|     Custard-Filled|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|[{6001, None}, {6...|0006|     Cinnamon Twist|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0007|    Vanilla Frosted|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0008| Strawberry Frosted|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0009|     Chocolate Cake|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0010|     Blueberry Cake|0.55|[{5001, None}, {5.

Check if our merged dataframe has any duplicated records:

In [12]:
df.exceptAll(df.dropDuplicates()).show()

+--------------------+-------+----+-----+----+--------------------+-----+
|             batters|filling|  id| name| ppu|             topping| type|
+--------------------+-------+----+-----+----+--------------------+-----+
|{[{1001, Regular}...|   NULL|0004|Jelly|0.65|[{5001, None}, {5...|donut|
+--------------------+-------+----+-----+----+--------------------+-----+



There's 1 duplicated record in our dataframe, we're gonna drop it and check if how many rows are left:

In [13]:
df = df.dropDuplicates()
df.show()
print(f"There's {df.count()} rows left!")
# Check if there is exactly 18 rows in our dataframe
assert df.count() == 18

+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|             batters|             filling|  id|               name| ppu|             topping| type|
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|{[{1001, Regular}...|                NULL|0004|              Jelly|0.65|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0011|  Devil's Food Cake|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0010|     Blueberry Cake|0.55|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0007|    Vanilla Frosted|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|[{6001, None}, {6...|0006|     Cinnamon Twist|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0009|     Chocolate Cake|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0005|     Custard-Filled|0.75|[{5001, None}, {5.

Finally we can upload the final result back to our bucket:

In [14]:
df.write.mode("overwrite").json(result_path)
# Read the uploaded file and check for number of rows, if it's 18 means we have uploaded the file succesfully. 
spark.read.json(result_path).count()

24/03/29 01:28:44 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
24/03/29 01:28:44 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.


18