This file will be used to contain data processing components

In [None]:
%pip install boto3 pyspark delta-spark python-dotenv

In [1]:
import os
from pyspark.sql import SparkSession

In [2]:
# You can use the following to set the environment variables in the notebook if you don't set manually access key, secret key and endpoint in minio
os.environ['OBJ_STORAGE_ACCESS_KEY'] = ''
os.environ['OBJ_STORAGE_SECRET_KEY'] = ''
os.environ['OBJ_STORAGE_ENDPOINT'] = 'http://localhost:9000'

In [3]:
# Define S3 storage
obj_storage_access_key = os.getenv('OBJ_STORAGE_ACCESS_KEY')
obj_storage_secret_key = os.getenv('OBJ_STORAGE_SECRET_KEY')
obj_storage_endpoint = os.getenv('OBJ_STORAGE_ENDPOINT', 'http://localhost:9000')
bucket_name = os.getenv('BUCKET_NAME', 'data')
folder_name = os.getenv('FOLDER_NAME', 'data-raw')

In [4]:
path_1 = "s3a://data/data-raw/data.json"
path_2 = "s3a://data/data-raw/data2.json"
path_3 = "s3a://data/data-raw/data3.json"

In [None]:
# You need to more configuration if you want to use minio as object storage 
# (hint: maybe you can using .config() method to set the configuration if you want using spark to read/write data from/to minio)

print("Configuring Spark...")
spark: SparkSession = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.1,io.delta:delta-core_2.12:2.1.0") \
    .config("spark.hadoop.fs.s3a.endpoint", obj_storage_endpoint) \
    .config("spark.hadoop.fs.s3a.access.key", obj_storage_access_key) \
    .config("spark.hadoop.fs.s3a.secret.key", obj_storage_secret_key) \
    .config("spark.hadoop.fs.s3a.attempts.maximum", "3") \
    .config("spark.hadoop.fs.s3a.path.style.access", 'true') \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
print("Done Spark configuration!")

Read the JSON file inside the 'data-raw' bucket, group them together and write into 'data-result' the grouped `Dataframe`

In [None]:
# Setup to read files from MinIO storage
s3c = boto3.resource('s3',
                    endpoint_url=obj_storage_endpoint,
                    aws_access_key_id=obj_storage_access_key,
                    aws_secret_access_key=obj_storage_secret_key,
                    config=boto3.session.Config(signature_version='s3v4'),
                    verify=False)
bucket = s3c.Bucket(bucket_name)

# Append all Dataframe read from bucket to this list
df_list = []

# List files inside the bucket
# Read JSON file one by one and append it to a Dataframe list
for object_summary in bucket.objects.filter(Prefix=f"{folder_name}/"):
    current_df = spark.read.option("multiline", "true").json(f"s3a://{bucket_name}/{object_summary.key}")
    df_list.append(current_df)

# Union them together to form a single Dataframe
df: DataFrame = reduce(lambda l, r: l.unionByName(other=r), df_list)

destination_folder = "data-result"
destination_filename = "result.json"

# Write to bucket
df.write.json(f"s3a://{bucket_name}/{destination_folder}/{destination_filename}", mode="overwrite")

Deduplicate the data inside `result.json` file

In [None]:
# Read the result.json file
df = spark.read.json(path=f"s3a://{bucket_name}/{destination_folder}/{destination_filename}")
df.show()
print(f"Number of rows before deduplication: {df.count()}")
assert df.count() == 19, "Number of rows before deduplication should be 19"
# Deduplicate the dataframe by their id
df = df.dropDuplicates(subset=['id'])
print(f"Number of rows after deduplication: {df.count()}")
assert df.count() == 18, "Number of rows after deduplication should be 18"

Flatten the `Dataframe` so that it can be written and read in `CSV` file format

In [None]:
import pyspark.sql.functions as F

print("Write deduplicated Dataframe to MinIO storage")
df = df.selectExpr("id", "type", "name", "ppu", "batters.*", "topping", "filling")

array_cols = {
    "batter" : ["id", "type"], 
    "topping" : ["id", "type"],
    "filling" : ["id", "name"]
}

# Explode the array columns first, then select
# all columns, with the array column renamed
# to have its parent column name be the prefix
df = df.withColumn("batter", F.explode("batter"))\
       .withColumn("topping", F.explode("topping"))\
       .withColumn("filling", F.explode("filling"))\
       .select([col for col in df.columns if col not in array_cols.keys()] \
               + [F.col(f"{col}.{c}").alias(f"{col}_{c}") \
                  for col in array_cols.keys() \
                  for c in df.withColumn(col, F.explode(col)).selectExpr(f"{col}.*").columns])
df.show()
df.printSchema()

# We only want one CSV file, so repartition it to 1 and write to storage
df.repartition(1).write.csv(f"s3a://{bucket_name}/{destination_folder}/deduplicated-result.csv", mode="overwrite")