In [1]:
import os

from pyspark.sql.types import DoubleType, IntegerType
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

s3_url: str = "http://minio:9000"
s3_access_key: str = "minioadmin"
s3_secret_key: str ="minioadmin"
spark_master_url: str = "spark://spark-master:7077"

conf = SparkConf()

conf.set('spark.hadoop.fs.s3a.endpoint', s3_url)
conf.set('spark.hadoop.fs.s3a.access.key', s3_access_key)
conf.set('spark.hadoop.fs.s3a.secret.key', s3_secret_key)
conf.set('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem')
conf.set('spark.hadoop.fs.s3a.path.style.access', 'true')
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.4,com.amazonaws:aws-java-sdk-bundle:1.12.262,io.delta:delta-spark_2.12:3.2.0,io.delta:delta-storage:3.2.0')
conf.set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
conf.set("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
conf.set("spark.sql.warehouse.dir", "s3a://data/warehouse")
conf.set("spark.hadoop.fs.s3a.connection.ssl.enabled", "false")
conf.set("hive.metastore.uris", "thrift://hive-metastore:9083")
spark = (
    SparkSession.builder
    .appName('spark-minio2')
    .master(spark_master_url)
    .config(conf=conf)
    .enableHiveSupport()
    .getOrCreate()
)

print(f'spark version: {spark.version}')
print(spark._jsc.hadoopConfiguration().get('fs.s3a.endpoint'))


spark version: 3.5.0
http://minio:9000


In [2]:
source_bucket = "data"

input_path = f"s3a://{source_bucket}/air_pollution_china.csv"
delta_path = f"s3a://{source_bucket}/delta/data/tables/"
df = spark.read.option("header", "true").csv(input_path)
# Print the schema
print("Schema:")
df.printSchema()

Schema:
root
 |-- PM2.5 (µg/m³): string (nullable = true)
 |-- PM10 (µg/m³): string (nullable = true)
 |-- NO2 (µg/m³): string (nullable = true)
 |-- SO2 (µg/m³): string (nullable = true)
 |-- CO (mg/m³): string (nullable = true)
 |-- O3 (µg/m³): string (nullable = true)
 |-- Temperature (°C): string (nullable = true)
 |-- Humidity (%): string (nullable = true)
 |-- Wind Speed (m/s): string (nullable = true)
 |-- Wind Direction (°): string (nullable = true)
 |-- Pressure (hPa): string (nullable = true)
 |-- Precipitation (mm): string (nullable = true)
 |-- Visibility (km): string (nullable = true)
 |-- AQI: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Day of Week: string (nullable = true)
 |-- Hour: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Weather Condition: string (nullable = true)
 |--

In [3]:
spark.sql("CREATE DATABASE IF NOT EXISTS chinaAirPolution")

DataFrame[]

In [6]:
# Create a clean column name mapping
source_columns = df.columns
clean_columns = [col_name.replace(" ", "_").replace("(", "").replace(")", "").replace(".", "").replace("/", "_").replace("%", "pct") for col_name in source_columns]

# Create a new dataframe with clean column names
df_clean = df
for i, col_name in enumerate(source_columns):
    df_clean = df_clean.withColumnRenamed(col_name, clean_columns[i])

# Show the new schema
print("Clean schema:")
df_clean.printSchema()

# List of columns that should be numeric
numeric_cols = [
    'PM25_µg_m³', 'PM10_µg_m³', 'NO2_µg_m³', 'SO2_µg_m³', 
    'CO_mg_m³', 'O3_µg_m³', 'Temperature_°C', 'Humidity_pct', 
    'Wind_Speed_m_s', 'Wind_Direction_°', 'Pressure_hPa', 
    'Precipitation_mm', 'Visibility_km', 'AQI', 
    'Latitude', 'Longitude', 'Hour', 'Month', 'Year'
]

# Convert to appropriate types
for column in numeric_cols:
    df_clean = df_clean.withColumn(column, F.col(column).cast(DoubleType()))

# Now write to Delta format
df_clean.write.format("delta") \
    .mode("overwrite") \
    .option("path", f'{delta_path}/china_air_pollution') \
    .saveAsTable("chinaAirPolution.china_air_pollution")

print("Data successfully written to Delta Lake table 'china_air_pollution'")

Clean schema:
root
 |-- PM25_µg_m³: string (nullable = true)
 |-- PM10_µg_m³: string (nullable = true)
 |-- NO2_µg_m³: string (nullable = true)
 |-- SO2_µg_m³: string (nullable = true)
 |-- CO_mg_m³: string (nullable = true)
 |-- O3_µg_m³: string (nullable = true)
 |-- Temperature_°C: string (nullable = true)
 |-- Humidity_pct: string (nullable = true)
 |-- Wind_Speed_m_s: string (nullable = true)
 |-- Wind_Direction_°: string (nullable = true)
 |-- Pressure_hPa: string (nullable = true)
 |-- Precipitation_mm: string (nullable = true)
 |-- Visibility_km: string (nullable = true)
 |-- AQI: string (nullable = true)
 |-- Season: string (nullable = true)
 |-- City: string (nullable = true)
 |-- Latitude: string (nullable = true)
 |-- Longitude: string (nullable = true)
 |-- Day_of_Week: string (nullable = true)
 |-- Hour: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- Year: string (nullable = true)
 |-- Weather_Condition: string (nullable = true)
 |-- Station_ID: string

In [7]:
spark.sql("select * from chinaAirPolution.china_air_pollution").limit(2).show(truncate=False)

+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+------------------+----+------+--------+------------------+------------------+-----------+----+-----+------+-----------------+----------+
|PM25_µg_m³       |PM10_µg_m³        |NO2_µg_m³        |SO2_µg_m³         |CO_mg_m³          |O3_µg_m³          |Temperature_°C   |Humidity_pct      |Wind_Speed_m_s   |Wind_Direction_°  |Pressure_hPa     |Precipitation_mm  |Visibility_km     |AQI |Season|City    |Latitude          |Longitude         |Day_of_Week|Hour|Month|Year  |Weather_Condition|Station_ID|
+-----------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+-----------------+------------------+-----------------+------------------+------------------+----+------+--------+------------

In [13]:
spark.stop()