In [1]:
import json

from pyspark.sql import SparkSession, Row
from opensky_api import OpenSkyApi
from noaa_sdk import NOAA
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, ArrayType, TimestampNTZType, MapType
from pyspark.sql.functions import col, explode_outer, from_json, lit, concat

from schemas.noaa import forecast_schema
from schemas.opensky import states_schema
from utils.helper_functions import get_parameters
from utils.flatten_json import flatten

noaa = NOAA()

open_sky = OpenSkyApi(username='jasminepate', password='<password>')
spark = SparkSession.builder.appName('read_json').getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/25 13:55:16 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [23]:
forecast_output = noaa.points_forecast(40.7314, -73.8656, type='forecastGridData')

with open('raw_data/noaa_data.json', 'w') as outfile:
        outfile.write(json.dumps(forecast_output))
noaa.get_observations(30281, 'US')
noaa.get_forecasts(30281, 'US')

In [9]:
os = open_sky.get_states()

# convert from defaultdict to json
os_json = json.dumps(os.states, default=lambda o: o.__dict__, 
            sort_keys=True, indent=4)

with open('raw_data/opensky_data.json', 'w') as outfile:
        outfile.write(os_json)

with open('raw_data/opensky_lat_long.txt', 'w') as outfile:
        for i in os.states:
            outfile.write(f'{i.latitude}, {i.longitude} \n')

In [None]:
from schemas.opensky import states_schema

test = spark.read.load('raw_data/opensky_data.json', format='json', header="true", schema=states_schema)
test.show(5, False) # Show the first 5 without truncating

for i in os.states:
    noaa.points_forecast(i.latitude, i.longitude, type='forecastGridData')


In [None]:
# Read Operations
df = spark.read.json('raw_data/opensky_data.json', multiLine=True).option("header", True) # Option 1
df = spark.read.format('json').option('header', True).load("raw_data/opensky_data.json", multiline=True) # Option 2
df = spark.read.load('raw_data/opensky_data.json', format='json', header="true") # Option 3


In [None]:
# Write Operations
df.select('latitude', 'longitude').write.save('lat_long.parquet', format='parquet') # Option 1
df.write.format("parquet").save('lat_long.parquet') # Option 2

# Bucket by country 
df.write.bucketBy(10, 'origin_country').sortBy('catagory').saveAsTable("country_bucketed")

In [None]:
# Temporary Views

# Register the DataFrame as a SQL temporary view. Will terminate if the session end
df.createOrReplaceTempView("forecast")

# Global view is preserved until the spark application terminates
df.createGlobalTempView('forecast')

# Global cross-session
spark.newSession().sql("SELECT * FROM global_temp.forecast").show() 

In [None]:
# SQL 

# On DataFrames
df.show()
df.select("latitude").show()
df.filter(df["vertical_rate"] > 0).show()
df.groupBy("category").count().show()

# Temp Views
spark.sql("SELECT * FROM forecast")
spark.sql("SELECT * FROM global_temp.forecast").show()

# Directly from files
spark.sql('SELECT * FROM parquet.`opensky_data.parquet`')


In [None]:
sc = spark.sparkContext

lines = sc.textFile('raw_data/opensky_lat_long.txt')
parts = lines.map(lambda l: l.split(','))
lat_long = parts.map(lambda p: Row(latitude=float(p[0]), longitude=float(p[1])))

df_lat_long = spark.createDataFrame(lat_long)
df_lat_long.createOrReplaceTempView("latitude_longitude")

spark.sql("SELECT * FROM latitude_longitude")

In [3]:
df = spark.read.option("multiline", "true").option('header', 'true').json('raw_data/noaa_data.json')
af = flatten(df)