This file will be used to contain data processing components

In [1]:
%pip install boto3 pyspark delta-spark python-dotenv pandas


[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.3.2[0m[39;49m -> [0m[32;49m24.0[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.


In [2]:
import boto3
import glob
import os
from pyspark.sql import SparkSession, functions as F
from dotenv import load_dotenv

load_dotenv()

True

In [3]:
# You can use the following to set the environment variables in the notebook if you don't set manually access key, secret key and endpoint in minio
# os.environ['OBJ_STORAGE_ACCESS_KEY'] = ''
# os.environ['OBJ_STORAGE_SECRET_KEY'] = ''
# os.environ['OBJ_STORAGE_ENDPOINT'] = ''

In [4]:
# Define S3 storage
obj_storage_access_key = os.getenv('OBJ_STORAGE_ACCESS_KEY')
obj_storage_secret_key = os.getenv('OBJ_STORAGE_SECRET_KEY')
obj_storage_endpoint = os.getenv('OBJ_STORAGE_ENDPOINT', 'http://localhost:9000')

In [5]:
path_1 = "s3a://data/data-raw/data.json"
path_2 = "s3a://data/data-raw/data2.json"
path_3 = "s3a://data/data-raw/data3.json"

In [6]:
# You need to more configuration if you want to use minio as object storage 
# (hint: maybe you can using .config() method to set the configuration if you want using spark to read/write data from/to minio)
spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("fs.s3a.access.key", obj_storage_access_key) \
    .config("fs.s3a.secret.key", obj_storage_secret_key) \
    .config("fs.s3a.endpoint", obj_storage_endpoint) \
    .config("fs.s3a.path.style.access", "true") \
    .config("fs.s3a.connection.ssl.enabled", "false") \
    .config("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()

your 131072x1 screen size is bogus. expect trouble


24/03/27 12:31:05 WARN Utils: Your hostname, minhnguyen resolves to a loopback address: 127.0.1.1; using 172.27.104.248 instead (on interface eth0)
24/03/27 12:31:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/27 12:31:06 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
# Function to upload file to minio
def upload_file_to_minio(file_path, minio_bucket, minio_object_name):
    s3c = boto3.resource('s3',
                        endpoint_url=obj_storage_endpoint,
                        aws_access_key_id=obj_storage_access_key,
                        aws_secret_access_key=obj_storage_secret_key,
                        config=boto3.session.Config(signature_version='s3v4'),
                        verify=False
                        )
    s3c.Bucket(minio_bucket).upload_file(file_path, minio_object_name)

In [8]:
# Read input data
df_1 = spark.read.json(path_1, multiLine=True)
df_2 = spark.read.json(path_2, multiLine=True)
df_3 = spark.read.json(path_3, multiLine=True)

# Merge data
df = df_1.union(df_2).union(df_3)

# Upload merged result to minio
OUTPUT_DIR = "data/result"
df.coalesce(1).write.mode("overwrite").json(OUTPUT_DIR) # write output as a single file
file_paths = glob.glob(f"{OUTPUT_DIR}/*.json")
if file_paths:
    upload_file_to_minio(file_paths[0], 'data', 'data-result/result.json')

24/03/27 12:31:12 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [9]:
# Read and show merged result
result_df = spark.read.json("s3a://data/data-result/result.json")
print("Total records:", result_df.count())
result_df.orderBy("id").show()

Total records: 19
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|             batters|             filling|  id|               name| ppu|             topping| type|
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|{[{1001, Regular}...|                NULL|0004|              Jelly|0.65|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0004|              Jelly|0.65|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0005|     Custard-Filled|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|[{6001, None}, {6...|0006|     Cinnamon Twist|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0007|    Vanilla Frosted|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0008| Strawberry Frosted|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0009|     Chocolate Cake|0.75|

In [10]:
# Remove duplicate and show result
distinct_result_df = result_df.distinct()
print("Total records:", distinct_result_df.count())
distinct_result_df.orderBy("id").show()

                                                                                

Total records: 18
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|             batters|             filling|  id|               name| ppu|             topping| type|
+--------------------+--------------------+----+-------------------+----+--------------------+-----+
|{[{1001, Regular}...|                NULL|0004|              Jelly|0.65|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0005|     Custard-Filled|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|[{6001, None}, {6...|0006|     Cinnamon Twist|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0007|    Vanilla Frosted|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0008| Strawberry Frosted|0.85|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0009|     Chocolate Cake|0.75|[{5001, None}, {5...|donut|
|{[{1001, Regular}...|                NULL|0010|     Blueberry Cake|0.55|