In [None]:
# Should install java at the machine before using Spark
sudo apt update
# Execute the following command to install the JRE from OpenJDK 11:
sudo apt install default-jre
# The JRE will allow you to run almost all Java software.
# Verify the installation with:
java -version
# You may need the JDK in addition to the JRE in order to compile and run some specific Java-based software. To install the JDK, execute the following command, which will also install the JRE:
sudo apt install default-jdk
# Verify that the JDK is installed by checking the version of javac, the Java compiler:
javac -version

In [ ]:
import boto3
import os
from pyspark.sql import SparkSession
from delta import *

In [ ]:
class FILE_TRANSFORM():
    def __init__(self):
        self.minio_bucket = "data"
        self.minio_object = "data-result"
        self.obj_storage_access_key = os.getenv('OBJ_STORAGE_ACCESS_KEY', 'access-key')
        self.obj_storage_secret_key = os.getenv('OBJ_STORAGE_SECRET_KEY', 'secret-key')
        self.obj_storage_endpoint = os.getenv('OBJ_STORAGE_ENDPOINT', 'http://localhost:9000')
        self.paths = ["s3a://data/data-raw/data.json",
                     "s3a://data/data-raw/data2.json",
                     "s3a://data/data-raw/data3.json"]

    def transform_file(self):
        # REFERENCE FROM THIS THREAD https://stackoverflow.com/a/77592717 
        builder = SparkSession.builder.appName("Python Spark SQL Sample Transformer") \
                    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
                    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
                    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
                    .config("spark.hadoop.fs.s3a.access.key", self.obj_storage_access_key) \
                    .config("spark.hadoop.fs.s3a.secret.key", self.obj_storage_secret_key) \
                    .config("spark.hadoop.fs.s3a.endpoint", self.obj_storage_endpoint) \
                    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
                    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
                    .config("spark.databricks.delta.retentionDurationCheck.enabled", "false") \
                    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \

        # spark.hadoop.fs.s3a.path.style.access
        # see https://stackoverflow.com/questions/61552054/spark-path-style-access-with-fs-s3a-path-style-access-property-is-not-working
        # see https://github.com/minio/minio/issues/7020

        # https://mvnrepository.com/artifact/org.apache.hadoop/hadoop-aws
        # and also https://github.com/delta-io/delta/issues/895
        # and also https://stackoverflow.com/questions/44411493/java-lang-noclassdeffounderror-org-apache-hadoop-fs-storagestatistics
        # and also https://stackoverflow.com/questions/76858933/noclassdeffounderror-raised-when-reading-minio-data-using-pyspark
        my_packages = ["org.apache.hadoop:hadoop-aws:3.3.4",
                       "org.apache.hadoop:hadoop-client-runtime:3.3.4",
                       "org.apache.hadoop:hadoop-client-api:3.3.4",
                       "io.delta:delta-contribs_2.12:3.0.0",
                       "io.delta:delta-hive_2.12:3.0.0",
                       "com.amazonaws:aws-java-sdk-bundle:1.12.603",
                       ]

        # Create a Spark instance with the builder
        # As a result, you now can read and write Delta tables
        spark = configure_spark_with_delta_pip(builder, extra_packages=my_packages).getOrCreate()

        # Read data from each path and union them
        dfs = []
        for path in self.paths:
            df = spark.read.option("multiline","true").json(path)
            dfs.append(df)

        # Union the DataFrames
        combined_df = dfs[0]
        for df in dfs[1:]:
            combined_df = combined_df.union(df).dropDuplicates()

        # Show a DataFrames
        combined_df.show()

        # Write the combined DataFrame to a bucket but the result would be a folder instead of single file
        # combined_df.write.mode('overwrite').json("s3a://data/data-result/result.json")
        # Write the combined DataFrame to a single file and pushing by upload function
        combined_df.toPandas().to_json('result.json', orient='records', force_ascii=False, lines=True)
        self.upload_file_to_minio('result.json', self.minio_bucket, f"{self.minio_object}/result.json")

In [ ]:
    def upload_file_to_minio(self, file_path, minio_bucket, minio_object_name):
        s3c = boto3.resource('s3',
                             endpoint_url=self.obj_storage_endpoint,
                             aws_access_key_id=self.obj_storage_access_key,
                             aws_secret_access_key=self.obj_storage_secret_key,
                             config=boto3.session.Config(signature_version='s3v4'),
                             verify=False
                             )
        s3c.Bucket(minio_bucket).upload_file(file_path, minio_object_name)

In [ ]:
if __name__ == '__main__':#
    file_transform = FILE_TRANSFORM()
    downloading = file_transform.transform_file()
    print('Finished')