In [None]:
from pyspark import SparkConf
from pyspark.sql import SparkSession

# Removing hard coded password - using os module & open to import them from creds.txt file
import os
import sys

# Your job run a data cleaning code below for 00.txt 10.txt 20.txt ....   200.txt and 210.txt

try:
    creds_file = (open(f"/home/{os.getenv('USER')}/creds.txt", "r")).read().strip().split(",")
    accesskey,secretkey = creds_file[0],creds_file[1]
except:
    print("File not found, you can't access minio")
    accesskey,secretkey = "",""

# code to retrieve credentials from creds-mysql.txt

try:
    mysql_creds_file = (open(f"/home/{os.getenv('USER')}/creds-mysql.txt", "r")).read().strip().split(",")
    mysqlaccesskey,mysqlsecretkey = mysql_creds_file[0],mysql_creds_file[1]
except:
    print("File not found, you can't access the mysql server...")
    mysqlaccesskey,mysqlsecretkey = "",""


conf = SparkConf()
conf.set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.2.3,com.mysql:mysql-connector-j:8.3.0')
conf.set('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider')

conf.set('spark.hadoop.fs.s3a.access.key', accesskey)
conf.set('spark.hadoop.fs.s3a.secret.key', secretkey)
# Configure these settings
# https://medium.com/@dineshvarma.guduru/reading-and-writing-data-from-to-minio-using-spark-8371aefa96d2
conf.set("spark.hadoop.fs.s3a.path.style.access", "true")
conf.set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
# https://github.com/minio/training/blob/main/spark/taxi-data-writes.py
# https://spot.io/blog/improve-apache-spark-performance-with-the-s3-magic-committer/
conf.set('spark.hadoop.fs.s3a.committer.magic.enabled','true')
conf.set('spark.hadoop.fs.s3a.committer.name','magic')
# Internal IP for S3 cluster proxy
conf.set("spark.hadoop.fs.s3a.endpoint", "http://infra-minio-proxy-vm0.service.consul")
# Send jobs to the Spark Cluster
conf.setMaster("spark://sm.service.consul:7077")
#Configuration Documentation Link
#https://spark.apache.org/docs/latest/configuration.html
# Set driver memory
conf.set("spark.driver.memory","4g")
# Set memory request per executor
conf.set("spark.executor.memory","8g")
# Set number of cores per executor to 1
conf.set("spark.executor.cores","1")
# Set number of cores total max per job
conf.set("spark.cores.max","36")

spark = SparkSession.builder.appName("KKcode")\
    .config('spark.driver.host','spark-edge.service.consul').config(conf=conf).getOrCreate()

spark.conf.set("spark.sql.debug.maxToStringFields", 100)

In [None]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Read 50.txt from MinIO
df = spark.read.csv("s3a://itmd521/50.txt")

# Clean and transform data
splitDF = df.withColumn('WeatherStation', df['_c0'].substr(5, 6)) \
    .withColumn('WBAN', df['_c0'].substr(11, 5)) \
    .withColumn('ObservationDate', to_date(df['_c0'].substr(16, 8), 'yyyyMMdd')) \
    .withColumn('ObservationHour', df['_c0'].substr(24, 4).cast(IntegerType())) \
    .withColumn('Latitude', df['_c0'].substr(29, 6).cast('float') / 1000) \
    .withColumn('Longitude', df['_c0'].substr(35, 7).cast('float') / 1000) \
    .withColumn('Elevation', df['_c0'].substr(47, 5).cast(IntegerType())) \
    .withColumn('WindDirection', df['_c0'].substr(61, 3).cast(IntegerType())) \
    .withColumn('WDQualityCode', df['_c0'].substr(64, 1).cast(IntegerType())) \
    .withColumn('SkyCeilingHeight', df['_c0'].substr(71, 5).cast(IntegerType())) \
    .withColumn('SCQualityCode', df['_c0'].substr(76, 1).cast(IntegerType())) \
    .withColumn('VisibilityDistance', df['_c0'].substr(79, 6).cast(IntegerType())) \
    .withColumn('VDQualityCode', df['_c0'].substr(86, 1).cast(IntegerType())) \
    .withColumn('AirTemperature', df['_c0'].substr(88, 5).cast('float') / 10) \
    .withColumn('ATQualityCode', df['_c0'].substr(93, 1).cast(IntegerType())) \
    .withColumn('DewPoint', df['_c0'].substr(94, 5).cast('float')) \
    .withColumn('DPQualityCode', df['_c0'].substr(99, 1).cast(IntegerType())) \
    .withColumn('AtmosphericPressure', df['_c0'].substr(100, 5).cast('float') / 10) \
    .withColumn('APQualityCode', df['_c0'].substr(105, 1).cast(IntegerType())) \
    .drop('_c0')

splitDF.printSchema()
splitDF.show(10)

In [None]:
# Write cleaned data to MinIO as Parquet
splitDF.write.mode("overwrite").parquet("s3a://ksaravanan1/fifty.parquet")

# Write cleaned data to MySQL
splitDF.write.mode("overwrite").format("jdbc") \
    .option("url", "jdbc:mysql://infra-database-vm0.service.consul:3306/ksaravanan1") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "fifty") \
    .option("user", mysqlaccesskey) \
    .option("password", mysqlsecretkey) \
    .save()

In [None]:

# Read back from Parquet (MinIO)
minioDF = spark.read.parquet("s3a://ksaravanan1/fifty.parquet")

# Read back from MySQL
mysqlDF = spark.read.format("jdbc") \
    .option("url", "jdbc:mysql://infra-database-vm0.service.consul:3306/ksaravanan1") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "fifty") \
    .option("user", mysqlaccesskey) \
    .option("password", mysqlsecretkey) \
    .load()

In [None]:
# Split data by hemisphere
northDF = splitDF.filter(col("Latitude") >= 0)
southDF = splitDF.filter(col("Latitude") < 0)

In [None]:
# Write to MinIO
northDF.write.mode("overwrite").parquet("s3a://ksaravanan1/fifty-northern.parquet")
southDF.write.mode("overwrite").parquet("s3a://ksaravanan1/fifty-southern.parquet")

# Write to MySQL
northDF.write.mode("overwrite").format("jdbc") \
    .option("url", "jdbc:mysql://infra-database-vm0.service.consul:3306/ksaravanan1") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "fifty-northern") \
    .option("user", mysqlaccesskey) \
    .option("password", mysqlsecretkey) \
    .save()

southDF.write.mode("overwrite").format("jdbc") \
    .option("url", "jdbc:mysql://infra-database-vm0.service.consul:3306/ksaravanan1") \
    .option("driver", "com.mysql.cj.jdbc.Driver") \
    .option("dbtable", "fifty-southern") \
    .option("user", mysqlaccesskey) \
    .option("password", mysqlsecretkey) \
    .save()

In [None]:
# Stop Spark session
spark.stop()