In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, hour, date_format, avg, max, min, sum
import h3

def query_examples(spark, parquet_path):
    # Read the Parquet data
    df = spark.read.parquet(parquet_path)
    
    print("Schema of the Parquet data:")
    df.printSchema()
    
    print("\n1. Query data for a specific date:")
    df.filter(col("timestamp") == "2022-01-01").show(5)
    
    print("\n2. Query data for a specific time range:")
    df.filter((col("timestamp") >= "2022-01-01 06:00:00") & 
              (col("timestamp") < "2022-01-01 12:00:00")).show(5)
    
    print("\n3. Query data for a specific H3 index:")
    specific_h3 = df.select("h3_index").first()[0]  # Get a sample H3 index
    df.filter(col("h3_index") == specific_h3).show(5)
    
    print("\n4. Calculate average precipitation for each hour of the day:")
    df.groupBy(hour("timestamp").alias("hour")) \
      .agg(avg("precipitation").alias("avg_precipitation")) \
      .orderBy("hour") \
      .show()
    
    print("\n5. Find maximum precipitation for each H3 index:")
    df.groupBy("h3_index") \
      .agg(max("precipitation").alias("max_precipitation")) \
      .orderBy(col("max_precipitation").desc()) \
      .show(5)
    
    print("\n6. Query data for a specific lat/lon range:")
    df.filter((col("latitude") > 40) & (col("latitude") < 41) &
              (col("longitude") > -75) & (col("longitude") < -74)).show(5)
    
    print("\n7. Calculate total precipitation for each date:")
    df.groupBy("timestamp") \
      .agg(sum("precipitation").alias("total_precipitation")) \
      .orderBy("timestamp") \
      .show()
    
    print("\n8. Find areas with precipitation above a threshold:")
    threshold = 10.0  # Example threshold in mm
    df.filter(col("precipitation") > threshold) \
      .select("timestamp", "latitude", "longitude", "precipitation", "h3_index") \
      .orderBy(col("precipitation").desc()) \
      .show(5)
    
    print("\n9. Analyze precipitation trends by hour:")
    df.groupBy(hour("timestamp").alias("hour")) \
      .agg(avg("precipitation").alias("avg_precip"),
           max("precipitation").alias("max_precip"),
           min("precipitation").alias("min_precip")) \
      .orderBy("hour") \
      .show()

    print("\n10. Query data for multiple H3 indexes within a larger area:")
    # Get neighboring H3 indexes
    neighbors = h3.k_ring(specific_h3, 1)
    df.filter(col("h3_index").isin(neighbors)) \
      .select("timestamp", "h3_index", "precipitation") \
      .show(10)

def main(parquet_path):
    spark = SparkSession.builder \
        .appName("QueryERA5Parquet") \
        .config("spark.driver.memory", "4g") \
        .config("spark.executor.memory", "4g") \
        .getOrCreate()

    query_examples(spark, parquet_path)

    spark.stop()

In [6]:
if __name__ == "__main__":
    main(parquet_path = '../data/parquet/2022-01-01')

Schema of the Parquet data:
root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- precipitation: double (nullable = true)
 |-- h3_index: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)


1. Query data for a specific date:
+--------+---------+--------------------+---------------+-------------------+
|latitude|longitude|       precipitation|       h3_index|          timestamp|
+--------+---------+--------------------+---------------+-------------------+
|    90.0|      0.0|4.802206333767711E-7|890326233abffff|2022-01-01 00:00:00|
|    90.0|     0.25|4.802206333767711E-7|890326233abffff|2022-01-01 00:00:00|
|    90.0|      0.5|4.802206333767711E-7|890326233abffff|2022-01-01 00:00:00|
|    90.0|     0.75|4.802206333767711E-7|890326233abffff|2022-01-01 00:00:00|
|    90.0|      1.0|4.802206333767711E-7|890326233abffff|2022-01-01 00:00:00|
+--------+---------+--------------------+---------------+-------------------+
only showing top 5