In [1]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

In [2]:
import os
# %pip install python-dotenv
from dotenv import load_dotenv

In [3]:
load_dotenv('./env/.env')

True

In [4]:
# Create a Spark session with your AWS Credentials
conf = (
    SparkConf()
    .setAppName("sparkQLwithS3") # replace with your desired name
    .set("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0,org.apache.hadoop:hadoop-aws:3.3.2")
    .set("spark.sql.catalog.spark_catalog","org.apache.spark.sql.delta.catalog.DeltaCatalog")
    .set("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
    # .set("spark.sql.shuffle.partitions", "4") # default is 200 partitions which is too many for local
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .set("com.amazonaws.services.s3.enableV4", "true")
    .set("spark.hadoop.fs.s3a.endpoint", "s3.amazonaws.com")
    # .set("spark.hadoop.fs.s3a.aws.credentials.provider", "com.amazonaws.auth.profile.ProfileCredentialsProvider")
    .set("spark.hadoop.fs.s3a.access.key", os.environ['aws_access_key_id'])
    .set("spark.hadoop.fs.s3a.secret.key", os.environ['aws_secret_access_key'])
    .setMaster("spark://34.125.136.103:30077")
)


In [5]:
spark = SparkSession.builder.config(conf=conf).getOrCreate()
df = spark.read.format('json').load('s3a://jolajoayo-spark-0001/spark2-sql/airports/airport-codes.csv.json')




:: loading settings :: url = jar:file:/root/miniconda3/envs/spark341/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
io.delta#delta-core_2.12 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-cb60f1bc-3d9b-406f-8891-da4651d097b7;1.0
	confs: [default]
	found io.delta#delta-core_2.12;2.3.0 in central
	found io.delta#delta-storage;2.3.0 in central
	found org.antlr#antlr4-runtime;4.8 in central
	found org.apache.hadoop#hadoop-aws;3.3.2 in central
	found com.amazonaws#aws-java-sdk-bundle;1.11.1026 in central
	found org.wildfly.openssl#wildfly-openssl;1.0.7.Final in central
:: resolution report :: resolve 676ms :: artifacts dl 62ms
	:: modules in use:
	com.amazonaws#aws-java-sdk-bundle;1.11.1026 from central in [default]
	io.delta#delta-core_2.12;2.3.0 from central in [default]
	io.delta#delta-storage;2.3.0 from central in [default]
	org.antlr#antlr4-runtime;4.8 from central in [default]
	org.apache.hadoop#hadoop-aws;3.3.2

In [6]:
df.show(5, truncate=False)


                                                                                

+---------+------------+--------+---------+-----+-----------+----------+------------+----------+-------------+------------+----------------------------------+-------------+
|continent|elevation_ft|gps_code|iata_code|ident|iso_country|iso_region|latitude_deg|local_code|longitude_deg|municipality|name                              |type         |
+---------+------------+--------+---------+-----+-----------+----------+------------+----------+-------------+------------+----------------------------------+-------------+
|NA       |11          |00A     |         |00A  |US         |US-PA     |40.07080078 |00A       |-74.93360138 |Bensalem    |Total Rf Heliport                 |heliport     |
|NA       |450         |00AK    |         |00AK |US         |US-AK     |59.94919968 |00AK      |-151.6959991 |Anchor Point|Lowell Field                      |small_airport|
|NA       |820         |00AL    |         |00AL |US         |US-AL     |34.8647995  |00AL      |-86.77030182 |Harvest     |Epps Airpark

In [7]:
df.printSchema()

root
 |-- continent: string (nullable = true)
 |-- elevation_ft: string (nullable = true)
 |-- gps_code: string (nullable = true)
 |-- iata_code: string (nullable = true)
 |-- ident: string (nullable = true)
 |-- iso_country: string (nullable = true)
 |-- iso_region: string (nullable = true)
 |-- latitude_deg: string (nullable = true)
 |-- local_code: string (nullable = true)
 |-- longitude_deg: string (nullable = true)
 |-- municipality: string (nullable = true)
 |-- name: string (nullable = true)
 |-- type: string (nullable = true)



In [8]:
df.createOrReplaceTempView("airportcodes")
viewdf = spark.sql(\
                   "SELECT iso_region, name, type "\
                   "FROM airportcodes "\
                   "WHERE iso_country = 'US' "\
                   )

ANTLR Tool version 4.8 used for code generation does not match the current runtime version 4.9.3
ANTLR Runtime version 4.8 used for parser compilation does not match the current runtime version 4.9.3
ANTLR Tool version 4.8 used for code generation does not match the current runtime version 4.9.3
ANTLR Runtime version 4.8 used for parser compilation does not match the current runtime version 4.9.3


In [9]:
viewdf.show()

[Stage 2:>                                                          (0 + 1) / 1]

+----------+--------------------+-------------+
|iso_region|                name|         type|
+----------+--------------------+-------------+
|     US-PA|   Total Rf Heliport|     heliport|
|     US-AK|        Lowell Field|small_airport|
|     US-AL|        Epps Airpark|small_airport|
|     US-AR|Newport Hospital ...|     heliport|
|     US-AZ|      Cordes Airport|small_airport|
|     US-CA|Goldstone /Gts/ A...|small_airport|
|     US-CO|          Cass Field|small_airport|
|     US-FL| Grass Patch Airport|small_airport|
|     US-FL|  Ringhaver Heliport|     heliport|
|     US-FL|   River Oak Airport|small_airport|
|     US-GA|    Lt World Airport|small_airport|
|     US-GA|    Caffrey Heliport|     heliport|
|     US-HI|  Kaupulehu Heliport|     heliport|
|     US-ID|Delta Shores Airport|small_airport|
|     US-IN|Bailey Generation...|     heliport|
|     US-IL|      Hammer Airport|small_airport|
|     US-IN|St Mary Medical C...|     heliport|
|     US-IL|Hayenga's Cant Fi...|small_a

                                                                                

In [10]:
viewdf.write.format('csv').option('header','true').\
save('s3a://jolajoayo-spark-0001/spark2-sql/airports/airportcodes.csv',mode='overwrite')

24/04/25 17:13:28 WARN AbstractS3ACommitterFactory: Using standard FileOutputCommitter to commit work. This is slow and potentially unsafe.
                                                                                

In [11]:
spark.stop()
