# Create Dummy Data for the process

## Import modules for Dummy Data Creation

In [None]:
%load_ext autoreload
%autoreload 2

# Adds additional directories for importing custom modules
import sys
sys.path.append('../generate_dummy_data')

from data_generator import generate_dummy_data, write_csv, write_json

## Generate dummy data and write to a CSV file

In [None]:
write_csv(generate_dummy_data(100), "../data/dummy_data.csv")

## Generate dummy data and multiple JSON files

In [None]:
write_json(generate_dummy_data(100), output_folder="../data/json_batches")

# Uploading files to MinIO

## Pre-requisite for MinIO Upload

In [None]:
# Adds additional directories for importing custom modules
import sys
sys.path.append('../file_uploader')

from minio import Minio
from minio.error import S3Error
import os
import time
from minio_util import get_minio_client, upload_batch_file, upload_json_files

# Initialize the MinIO client.
client = get_minio_client(endpoint="minio:9000")

## Upload a single CSV file.

In [None]:
csv_source_file = "../data/dummy_data.csv"
upload_batch_file(client, csv_source_file, "python-batch-bucket")

## Upload all JSON batch files from a directory.

In [None]:
json_directory = "../data/json_batches"
upload_json_files(client, json_directory, "python-process-bucket")

# Process batch file with Spark into Delta Lake Format and Saving in MinIO

## Pre-requisites for reading Batch file in MinIO and converting it to Delta Lake using PySpark

In [None]:
from pyspark.sql import SparkSession
from delta import *

# Set up SparkSession with Delta and MinIO
spark = SparkSession.builder \
    .appName("DeltaLakeOnMinIO") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")  # use the Docker service name or IP
hadoop_conf.set("fs.s3a.access.key", "ROOTNAME")
hadoop_conf.set("fs.s3a.secret.key", "CHANGEME123")
hadoop_conf.set("fs.s3a.path.style.access", "true")  # Required for MinIO


## Read CSV batch file from MinIO 
> "**minio_csv_batch_file_full_path**" value, may need to be manually updated

In [None]:
minio_csv_batch_file_full_path = "s3a://python-batch-bucket/1741409275634_dummy_data.csv"
df = spark.read\
    .option("header", "true")\
    .option("delimiter", ",")\
    .option("ignoreLeadingWhiteSpace", "true")\
    .option("ignoreTrailingWhiteSpace", "true")\
    .option("inferSchema", "true")\
    .csv(minio_csv_batch_file_full_path)

## Data Exploration (Optional)

In [None]:
df.printSchema()

In [None]:
df.head()

In [None]:
df.show(5, truncate=False)

## Clean Data

In [None]:
# Fill null values with a specified value
df_clean = df.na.fill({"is_active": False})

In [None]:
# Drop rows with any null values
df_clean = df_clean.na.drop()

In [None]:
# Filter based upon Salary
df_clean = df_clean.filter(df_clean["salary"] > 10000)

### Overwriting variable to re-use with data exploration segment (Optional)

In [None]:
df = df_clean

## Writes DataFrame in Delta Lake format to MinIO

In [None]:
df_clean.write.format("delta").mode("overwrite").save("s3a://python-batch-bucket/delta_output")

# Process JSON Stream with Spark into Delta Lake Format and Saving in MinIO

## Pre-requisites for reading Stream in MinIO and converting them to Delta Lake using PySpark

In [None]:
# Adds additional directories for importing custom modules
import sys
sys.path.append('../file_uploader')

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DateType, DoubleType, BooleanType
from delta import *
from minio_util import get_minio_client, ensure_bucket

# Initialize the MinIO client.
client = get_minio_client(endpoint="minio:9000")
ensure_bucket(client, "checkpoints")

# Set up SparkSession with Delta and MinIO
spark = SparkSession.builder \
    .appName("DeltaLakeOnMinIO") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")  # use the Docker service name or IP
hadoop_conf.set("fs.s3a.access.key", "ROOTNAME")
hadoop_conf.set("fs.s3a.secret.key", "CHANGEME123")
hadoop_conf.set("fs.s3a.path.style.access", "true")  # Required for MinIO


## Listening to Directory to process Stream (Interrupt the kernel using GUI to stop this process)

In [None]:
# Define the directory in MinIO where new JSON files will arrive
input_path = "s3a://python-process-bucket/"

# Defining schema
schema = StructType() \
    .add("id", "integer") \
    .add("name", "string") \
    .add("email", "string") \
    .add("address", "string") \
    .add("phone", "string") \
    .add("date_of_birth", "date") \
    .add("company", "string") \
    .add("salary", "double") \
    .add("is_active", "boolean")

# Read the stream of JSON files as they are added
df_stream = spark.readStream \
    .schema(schema) \
    .option("multiline","true") \
    .json(input_path)

# Process/Clean the streaming DataFrame
df_transformed = df_stream
# Fill null values with a specified value
df_transformed = df_transformed.na.fill({"is_active": False})
# Drop rows with any null values
df_transformed = df_transformed.na.drop()
# Filter based upon Salary
df_transformed = df_transformed.filter(df_transformed["salary"] > 10000)

# Write the processed stream to Delta Lake format on MinIO.
# Note: A checkpoint location is required to track progress.
output_path = "s3a://python-process-bucket/delta_output"
checkpoint_path = "s3a://checkpoints/delta_streaming"

query = df_stream.writeStream \
    .format("delta") \
    .outputMode("append") \
    .option("checkpointLocation", checkpoint_path) \
    .start(output_path)

console_query = df_stream.writeStream \
    .format("console") \
    .outputMode("append") \
    .start()

In [None]:
query.stop()
console_query.stop()

# Data Warehouse using Delta Lake files

## Pre-requisites for Reading Delta Lakes

In [None]:
from pyspark.sql import SparkSession
from delta import *

spark = SparkSession.builder \
    .appName("DeltaLakeOnMinIO") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()
hadoop_conf = spark._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3a.endpoint", "http://minio:9000")  # use the Docker service name or IP
hadoop_conf.set("fs.s3a.access.key", "ROOTNAME")
hadoop_conf.set("fs.s3a.secret.key", "CHANGEME123")
hadoop_conf.set("fs.s3a.path.style.access", "true")  # Required for MinIO


## Reading from python-process-bucket

### Create Delta Table

In [None]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS json_stream_table
    USING DELTA
    LOCATION 's3a://python-process-bucket/delta_output'
""")

In [None]:
spark.sql("SELECT * FROM json_stream_table limit 10").show()

In [None]:
df_check = spark.read.format("delta").load("s3a://python-process-bucket/delta_output")
df_check.show()

## Reading from python-batch-bucket

### Create Delta Table

In [None]:
spark.sql("""
    CREATE TABLE IF NOT EXISTS csv_batch_table
    USING DELTA
    LOCATION 's3a://python-batch-bucket/delta_output'
""")

### Read from csv_batch_table

In [None]:
spark.sql("SELECT * FROM csv_batch_table limit 10").show()

### Directly read Delta Lake file by loading it into a DataFrame

In [None]:
df_check = spark.read.format("delta").load("s3a://python-batch-bucket/delta_output")
df_check.show()

# Delete MinIO Object(s)

## Remove object
https://github.com/minio/minio-py/blob/88f4244fe89fb9f23de4f183bdf79524c712deaa/examples/remove_object.py#L25

### Import modules for MinIO deletion

In [None]:
from minio import Minio

In [None]:
client.remove_object("python-batch-bucket","file_name")

## Remove a prefix recursively
https://github.com/minio/minio-py/blob/88f4244fe89fb9f23de4f183bdf79524c712deaa/examples/remove_objects.py#L38

### Import modules for MinIO multi-deletion

In [None]:
from minio import Minio
from minio.deleteobjects import DeleteObject

### Delete file(s) with prefix (Prefix = full directory path to delete)

In [None]:
delete_object_list = map(
    lambda x: DeleteObject(x.object_name),
    client.list_objects("python-batch-bucket", "delta_output/", recursive=True),
)
errors = client.remove_objects("python-batch-bucket", delete_object_list)
for error in errors:
    print("error occurred when deleting object", error)