In [1]:
import os

# Set Spark and Java environment variables
os.environ['SPARK_HOME'] = '/opt/spark'
os.environ['JAVA_HOME'] = os.popen('dirname $(dirname $(readlink -f $(which java)))').read().strip()
os.environ['PATH'] = f"{os.environ['PATH']}:/opt/spark/bin:/opt/spark/sbin"
os.environ['PYSPARK_DRIVER_PYTHON'] = 'jupyter'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'lab'
os.environ['PYSPARK_PYTHON'] = 'python'

In [7]:
# Import PySpark
from pyspark.sql import SparkSession
# Create a SparkSession
spark = SparkSession.builder \
    .appName("PySpark-Get-Started") \
    .getOrCreate()

25/04/23 15:45:11 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [8]:
data = [("Alice", 25), ("Bob", 30), ("Charlie", 35)]
df = spark.createDataFrame(data, ["Name", "Age"])
df.show()

+-------+---+
|   Name|Age|
+-------+---+
|  Alice| 25|
|    Bob| 30|
|Charlie| 35|
+-------+---+



In [2]:
from pyspark.sql import SparkSession

# Config variables
# KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
# KAFKA_TOPIC = "spark-consume"
MINIO_ENDPOINT = "http://172.21.6.68:9000"
MINIO_ACCESS_KEY = "minioadmin"
MINIO_SECRET_KEY = "minioadmin"
MINIO_BUCKET = "vtrackingspark2"
MINIO_PATH = "parquet"

print("[INFO] Starting Spark job to read Parquet files from MinIO and print contents")
print(f"[INFO] MinIO endpoint: {MINIO_ENDPOINT}")
print(f"[INFO] MinIO bucket: {MINIO_BUCKET}")
print(f"[INFO] MinIO path: {MINIO_PATH}")

# Initialize Spark session
spark = SparkSession.builder \
    .appName("ReadParquetFromMinIO") \
    .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.3.4") \
    .config("spark.hadoop.fs.s3a.endpoint", MINIO_ENDPOINT) \
    .config("spark.hadoop.fs.s3a.access.key", MINIO_ACCESS_KEY) \
    .config("spark.hadoop.fs.s3a.secret.key", MINIO_SECRET_KEY) \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "false") \
    .getOrCreate()
spark.sparkContext.setLogLevel("WARN")
print("[INFO] Spark session姿勢 created.")

# Read Parquet files from MinIO
print(f"[INFO] Reading Parquet files from s3a://{MINIO_BUCKET}/{MINIO_PATH}...")
parquet_df = spark.read \
    .format("parquet") \
    .load(f"s3a://{MINIO_BUCKET}/{MINIO_PATH}")
print("[INFO] Parquet DataFrame loaded.")

[INFO] Starting Spark job to read Parquet files from MinIO and print contents
[INFO] MinIO endpoint: http://172.21.6.68:9000
[INFO] MinIO bucket: vtrackingspark2
[INFO] MinIO path: parquet
:: loading settings :: url = jar:file:/opt/spark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/vht/.ivy2/cache
The jars for the packages stored in: /home/vht/.ivy2/jars
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-c6f87720-d6d2-42a1-b681-fe5f1931e0ef;1.0
	confs: [default]
	found org.apache.hadoop#hadoop-aws;3.3.4 in central
	found com.amazonaws#aws-java-sdk-bundle;1.12.262 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 190ms :: artifacts dl 6ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.12.262 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.4 from central in [default]
	org.wildfly.openssl#wildfly-openssl;1.0.7.Final from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts   |
	|       conf       | number| search|dwnlded|evicted|| number|dwnlded|
	---------------------------------------

[INFO] Spark session姿勢 created.
[INFO] Reading Parquet files from s3a://vtrackingspark2/parquet...


25/04/23 15:51:32 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

[INFO] Parquet DataFrame loaded.


In [3]:
# Print the schema to verify the structure
print("[INFO] Schema of the Parquet DataFrame:")
parquet_df.printSchema()

# Print the contents of the DataFrame
print("[INFO] Contents of the Parquet DataFrame:")
parquet_df.show(truncate=False)

# Optional: Count the number of records
record_count = parquet_df.count()
print(f"[INFO] Total number of records: {record_count}")

[INFO] Schema of the Parquet DataFrame:
root
 |-- entity_type: string (nullable = true)
 |-- attribute_type: string (nullable = true)
 |-- attribute_key: string (nullable = true)
 |-- logged: boolean (nullable = true)
 |-- bool_v: boolean (nullable = true)
 |-- str_v: string (nullable = true)
 |-- long_v: long (nullable = true)
 |-- dbl_v: double (nullable = true)
 |-- json_v: struct (nullable = true)
 |    |-- direction: long (nullable = true)
 |    |-- geocoding: string (nullable = true)
 |    |-- history: boolean (nullable = true)
 |    |-- latitude: double (nullable = true)
 |    |-- longitude: double (nullable = true)
 |    |-- odometer: double (nullable = true)
 |    |-- speed: long (nullable = true)
 |    |-- status: string (nullable = true)
 |    |-- timestamp: long (nullable = true)
 |-- last_update_ts: long (nullable = true)
 |-- ts: long (nullable = true)
 |-- value_type: string (nullable = true)
 |-- value_nil: boolean (nullable = true)
 |-- new_attribute_key: string (nulla

                                                                                

+-----------+--------------+-------------+------+------+-----+------+-----+--------------------------------------------------------------------------------------------+--------------+-------------+----------+---------+-----------------+------------------------------------+-----------+------------+------------------------------------+----------+
|entity_type|attribute_type|attribute_key|logged|bool_v|str_v|long_v|dbl_v|json_v                                                                                      |last_update_ts|ts           |value_type|value_nil|new_attribute_key|project_id                          |not_send_ws|AttributeSub|entity_id                           |day       |
+-----------+--------------+-------------+------+------+-----+------+-----+--------------------------------------------------------------------------------------------+--------------+-------------+----------+---------+-----------------+------------------------------------+-----------+------------+--------



[INFO] Total number of records: 270


                                                                                

In [4]:
one_vehicle_df = spark.read \
    .format("parquet") \
    .load(f"s3a://{MINIO_BUCKET}/{MINIO_PATH}/entity_id=3fa541ad-dda7-4bd8-bdbd-8f78d2f76000")

print("[INFO] One vehicle DataFrame loaded.")

record_count = one_vehicle_df.count()
print(f"[INFO] Total number of records: {record_count}")

[INFO] One vehicle DataFrame loaded.
[INFO] Total number of records: 27
