In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *


# Removing hard coded password - using os module & open to import them from creds.txt file
import os
import sys

try:
    creds_file = (open(f"/home/{os.getenv('USER')}/creds.txt", "r")).read().strip().split(",")
    accesskey,secretkey = creds_file[0],creds_file[1]
except:
    print("File not found, you can't access minio")
    accesskey,secretkey = "",""


conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.3,mysql:mysql-connector-java:8.0.33') 

conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')

conf.set('spark.hadoop.fs.s3a.access.key', accesskey)
conf.set('spark.hadoop.fs.s3a.secret.key', secretkey)
# Configure these settings
# https://medium.com/@dineshvarma.guduru/reading-and-writing-data-from-to-minio-using-spark-8371aefa96d2
conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# https://github.com/minio/training/blob/main/spark/taxi-data-writes.py
# https://spot.io/blog/improve-apache-spark-performance-with-the-s3-magic-committer/
conf.set('spark.hadoop.fs.s3a.committer.magic.enabled','true')
conf.set('spark.hadoop.fs.s3a.committer.name','magic')
# Internal IP for S3 cluster proxy
conf.set("spark.hadoop.fs.s3a.endpoint", "http://system54.rice.iit.edu")
# Send jobs to the Spark Cluster
conf.setMaster("spark://sm.service.consul:7077")

#conf.set("spark.dynamicAllocation.enabled","true")
#conf.set("spark.dynamicAllocation.shuffleTracking.enabled","true")
conf.set("spark.driver.memory", "16g")  
conf.set("spark.executor.memory", "8g")
conf.set("spark.cores.max",'10')
conf.set('spark.executor.cores','2')
spark = SparkSession.builder.appName("lab11p2")\
    .config('spark.driver.host','spark-edge.service.consul').config(conf=conf).getOrCreate()
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")
spark.conf.set("spark.sql.debug.maxToStringFields", "10000")

In [None]:
# File paths
from pyspark.sql.types import *
from pyspark.sql.functions import *

# minio_parquet_path = "s3a://svuppu/hubtest1_50.parquet"
# db_creds_path = "/home/svuppu/database-creds.txt"


# Read database credentials
try:
    creds_file = (open(f"/home/svuppu/mysql_creds.txt", "r")).read().strip().split(",")
    dbuser,dbpass = creds_file[0],creds_file[1]
except:
    print("File not found, you can't access minio")
    dbuser,dbpass = "",""


# Database connection details
# retrieve the below url from creds_file
# db_url = f"jdbc:mysql://jrh-521-database-vm0.service.consul:3306/svuppu"
db_url = ""

define_schema = StructType([
    StructField("WeatherStation", StringType(), True),
    StructField("WBAN", StringType(), True),
    StructField("ObservationDate", DateType(), True),
    StructField("ObservationHour", IntegerType(), True),
    StructField("Latitude", FloatType(), True),
    StructField("Longitude", FloatType(), True),
    StructField("Elevation", IntegerType(), True),
    StructField("WindDirection", IntegerType(), True),
    StructField("WDQualityCode", IntegerType(), True),
    StructField("SkyCeilingHeight", IntegerType(), True),
    StructField("SCQualityCode", IntegerType(), True),
    StructField("VisibilityDistance", IntegerType(), True),
    StructField("VDQualityCode", IntegerType(), True),
    StructField("AirTemperature", FloatType(), True),
    StructField("ATQualityCode", IntegerType(), True),
    StructField("DewPoint", FloatType(), True),
    StructField("DPQualityCode", IntegerType(), True),
    StructField("AtmosphericPressure", FloatType(), True),
    StructField("APQualityCode", IntegerType(), True)
])


json_path = "s3a://svuppu/hubtest1_60.json"


# Reading 1950s data from the sql table created in part1
df_50 = spark.read \
    .format("jdbc") \
    .option("url", db_url) \
    .option("dbtable", "(SELECT WeatherStation, ObservationDate, AirTemperature FROM fifties WHERE AirTemperature IS NOT NULL) AS queryp2") \
    .option("user", dbuser) \
    .option("password", dbpass) \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("fetchsize", 1000) \
    .option("partitionColumn", "ObservationDate") \
    .option("lowerBound", "1950-01-01") \
    .option("upperBound", "1970-12-31") \
    .option("numPartitions", 10) \
    .load()


df_50_forp2 = df_50.filter(
    (col("ObservationDate").isNotNull()) &
    (col("AirTemperature").isNotNull()) &
    (year(col("ObservationDate")).between(1950, 1970)) &
    (month(col("ObservationDate")) == 2) &
    (col("AirTemperature").between(-100, 100))
)


# Reading 1960s data from hubtest1_60.json
df_60 = (spark.read.schema(define_schema)
         .option("recursiveFileLookup", "true")
         .json(json_path)
         .select("WeatherStation", "ObservationDate", "AirTemperature")
         .repartition(4))

df_60_forp2 = df_60.filter(
    (col("ObservationDate").isNotNull()) &
    (col("AirTemperature").isNotNull()) &
    (year(col("ObservationDate")).between(1950, 1970)) &
    (month(col("ObservationDate")) == 2) &
    (col("AirTemperature").between(-100, 100))
)


# Combine the DataFrames df_50 & df_60
combined_df = df_60_forp2.unionByName(df_50_forp2)
february_df = combined_df.cache()

# Delete all temp dataframes
del df_60, df_50, df_60_forp2, df_50_forp2, combined_df

results_q1 = february_df.agg(
    count("*").alias("record_count")
)

results_q234 = february_df.groupBy(year("ObservationDate").alias("YEAR")).agg(
    avg("AirTemperature").alias("avg_airtemperature"),
    expr("percentile_approx(AirTemperature, 0.5)").alias("median_temp"),
    stddev("AirTemperature").alias("stddev_temp")
)

results_q234 = results_q234.orderBy(col("Year").asc())

# Compute average temperature per WeatherStation
results_q5 = february_df.groupBy("WeatherStation", year("ObservationDate").alias("YEAR")).agg(
    avg("AirTemperature").alias("avg_temp")
)

results_q5 = results_q5.orderBy(col("WeatherStation").asc(), col("YEAR").asc())

# avg_temp_per_station.show(truncate=False)
results_q234.show(truncate=False)
results_q5.show(truncate=False)

# Save results to CSV
results_q1.write.csv("s3a://svuppu/module-11/total_record_count_q1", mode="overwrite", header=True)
results_q234.write.csv("s3a://svuppu/module-11/results_q234", mode="overwrite", header=True)

# Save average temperature per WeatherStation directly to CSV
results_q5.write.csv("s3a://svuppu/module-11/results_q5", mode="overwrite", header=True)

ModuleNotFoundError: No module named 'pyspark'

In [None]:
# Stop Spark session
spark.stop()