In [20]:
from pyspark.sql import SparkSession

from pyspark.sql.functions import col, explode
import os

In [21]:
# Create a Spark session
spark = SparkSession.builder \
    .appName("PySpark Minio") \
    .getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

In [22]:
bucket_name = "datalake"
json_file_path = "raw_data/*/*"

# Construct the S3 URI
s3_uri = f"s3a://{bucket_name}/{json_file_path}"


In [23]:
# Read the JSON file
df = spark.read.json(s3_uri)

# Show the DataFrame
df.show()

                                                                                

+--------------------+--------------------+--------------------+-------+--------+--------------------+----------------+---------------+
|             current|               daily|              hourly|    lat|     lon|            minutely|        timezone|timezone_offset|
+--------------------+--------------------+--------------------+-------+--------+--------------------+----------------+---------------+
|{100, 301.34, 172...|[{98, 297.36, 172...|[{100, 300.68, 17...|21.0278|105.8342|[{1722937620, 0.5...|    Asia/Bangkok|          25200|
|{40, 297.9, 17229...|[{34, 296.77, 172...|[{52, 297.84, 172...|10.8231|106.6297|[{1722937620, 0.0...|Asia/Ho_Chi_Minh|          25200|
|{75, 299.1, 17229...|[{25, 296.56, 172...|[{74, 298.79, 172...|10.0333|105.7667|[{1722937620, 0.0...|Asia/Ho_Chi_Minh|          25200|
|{20, 298.94, 1722...|[{51, 295.15, 172...|[{28, 298.55, 172...|16.0675|108.2203|[{1722937620, 0.0...|Asia/Ho_Chi_Minh|          25200|
+--------------------+--------------------+-----

In [None]:
df.printSchema()

In [24]:
flattened_df = df.select(
    col("lat"),
    col("lon"),
    col("timezone"),
    col("current.*"),
    col("current.weather").getItem(0).alias("current_weather")
)

# Select all fields from the current struct and flatten the first element of the weather array
flattened_df = flattened_df.select(
    col("lat"),
    col("lon"),
    col("timezone"),
    col("clouds"),
    col("dew_point"),
    col("dt"),
    col("feels_like"),
    col("humidity"),
    col("pressure"),
    col("sunrise"),
    col("sunset"),
    col("temp"),
    col("uvi"),
    col("visibility"),
    col("wind_deg"),
    col("wind_gust"),
    col("wind_speed"),
    col("current_weather.*")
)

# Show the flattened DataFrame
flattened_df.show()

+-------+--------+----------------+------+---------+----------+----------+--------+--------+----------+----------+------+----+----------+--------+---------+----------+----------------+----+---+------+
|    lat|     lon|        timezone|clouds|dew_point|        dt|feels_like|humidity|pressure|   sunrise|    sunset|  temp| uvi|visibility|wind_deg|wind_gust|wind_speed|     description|icon| id|  main|
+-------+--------+----------------+------+---------+----------+----------+--------+--------+----------+----------+------+----+----------+--------+---------+----------+----------------+----+---+------+
|21.0278|105.8342|    Asia/Bangkok|   100|   301.34|1722937597|    315.14|      68|    1002|1722897143|1722943963|308.14|0.57|     10000|     123|     3.91|      2.22|      light rain| 10d|500|  Rain|
|10.8231|106.6297|Asia/Ho_Chi_Minh|    40|    297.9|1722937597|    313.16|      62|    1006|1722897763|1722942962|306.16|0.37|     10000|     241|     8.05|      5.36|scattered clouds| 03d|802|Clo

                                                                                

In [None]:
flattened_df.printSchema()

In [25]:
oracle_url = f'jdbc:oracle:thin:@{os.getenv("DB_HOST")}:{os.getenv("DB_PORT")}/{os.getenv("DB_SERVICE")}'
oracle_properties = {
    "user": os.getenv("DB_NAME"),
    "password": os.getenv("DB_PASSWORD"),
    "driver": "oracle.jdbc.driver.OracleDriver"
}
oracle_url

'jdbc:oracle:thin:@host.docker.internal:1521/xepdb1'

In [26]:

# Write the DataFrame to the Oracle table
flattened_df.write \
    .format("jdbc") \
    .option("url", oracle_url) \
    .option("dbtable", "weather_data") \
    .option("user", oracle_properties["user"]) \
    .option("password", oracle_properties["password"]) \
    .option("driver", oracle_properties["driver"]) \
    .mode("append") \
    .save()

In [None]:
spark.stop()