In [0]:
spark.conf.set("fs.azure.account.key.azuredataengstorage.dfs.core.windows.net",dbutils.secrets.get(scope="accessscope",key="storageaccountaccesskey"))

In [0]:
location = "abfss://<container>@<storageaccount>.dfs.core.windows.net/Near_Earth_Objects/"

In [0]:
##Getting Raw JSON into DF
df_neojson = spark.read.option("multiline", True).option("infer_Schema", True).json(location + "Raw/NEO.json")

In [0]:
from pyspark.sql.functions import explode, col, substring, udf, avg, sum, min, max, to_date, current_date, explode

##Exploding Array Type Column
dfneoexp = df_neojson.select(explode(col("near_earth_objects")).alias("neoexp"))

In [0]:
dfexp = dfneoexp.select("*",
                        col("neoexp.id").alias("neo_id"),
                        col("neoexp.neo_reference_id").alias("neo_reference_id"),
                        col("neoexp.name").alias("name"),
                        col("neoexp.name_limited").alias("name_limited"),
                        col("neoexp.designation").alias("designation"),
                        col("neoexp.absolute_magnitude_h").alias("absolute_magnitude_h"),
                        col("neoexp.estimated_diameter.kilometers.estimated_diameter_min").alias("estimated_diameter_minkm"),
                        col("neoexp.estimated_diameter.kilometers.estimated_diameter_max").alias("estimated_diameter_maxkm"),
                        col("neoexp.is_potentially_hazardous_asteroid").alias("potentially_hazardous"),
                        col("neoexp.orbital_data.orbit_id").alias("orbit_id"),
                        col("neoexp.orbital_data.orbit_determination_date").alias("orbit_determination_date"),
                        col("neoexp.orbital_data.first_observation_date").alias("first_observation_date"),
                        col("neoexp.orbital_data.last_observation_date").alias("last_observation_date"),
                        col("neoexp.orbital_data.data_arc_in_days").alias("data_arc_in_days"),
                        col("neoexp.orbital_data.eccentricity").alias("eccentricity"),
                        col("neoexp.orbital_data.inclination").alias("inclination"),
                        col("neoexp.orbital_data.orbital_period").alias("orbital_period"),
                        col("neoexp.orbital_data.orbit_class.orbit_class_type").alias("orbit_class_type"),
                        col("neoexp.orbital_data.orbit_class.orbit_class_description").alias("orbit_class_description"),
                        col("neoexp.is_sentry_object").alias("is_sentry_object"))

In [0]:
##Exploding Nested Array Struct Column
dfexp_close = dfexp.select("*").withColumn("approachexp", explode(col("neoexp.close_approach_data")))

In [0]:
dfexp_neo = dfexp_close.select("*",
                       col("approachexp.close_approach_date").alias("close_approach_date"),
                       col("approachexp.close_approach_date_full").alias("close_approach_date_full"),
col("approachexp.relative_velocity.kilometers_per_hour").alias("relative_velocity_kilometers_per_hour"),
col("approachexp.miss_distance.kilometers").alias("miss_distance_kilometers"),
col("approachexp.orbiting_body").alias("orbiting_body")
                       ).drop("neoexp","approachexp")

In [0]:
##Saving DFs as Delimitted Files

dfneo_attr = dfexp_neo.select("neo_id", "neo_reference_id", "name", "name_limited", "absolute_magnitude_h",
                           "estimated_diameter_minkm", "estimated_diameter_maxkm", "potentially_hazardous","is_sentry_object").distinct()

dfneo_attr.coalesce(1).write.format("csv").save(location + "Developed/Temp",header = True)

filenames = dbutils.fs.ls(location + "Developed/Temp")
name = ''

for filename in filenames:
    if filename.name.endswith('.csv'):
        name = filename.name

dbutils.fs.cp(location + "Developed/Temp/" + name, location + "Developed/Neo_Attributes.csv")
dbutils.fs.rm(location + "Developed/Temp/",recurse = True)

In [0]:
dfneo_orb = dfexp_neo.select("neo_id","orbit_id","orbit_determination_date","last_observation_date", "data_arc_in_days",
                          "inclination", "orbital_period","orbit_class_type","orbit_class_description").distinct()

dfneo_orb.coalesce(1).write.format("csv").save(location + "Developed/Temp",header = True)

filenames = dbutils.fs.ls(location + "Developed/Temp")
name = ''

for filename in filenames:
    if filename.name.endswith('.csv'):
        name = filename.name

dbutils.fs.cp(location + "Developed/Temp/" + name, location + "Developed/Neo_Orbits.csv")
dbutils.fs.rm(location + "Developed/Temp/",recurse = True)

In [0]:
dfneo_dates = dfexp_neo.select("neo_id","orbiting_body","close_approach_date","close_approach_date_full","relative_velocity_kilometers_per_hour",
                            "miss_distance_kilometers")

In [0]:
dfneo_dates_mod = dfneo_dates.withColumn("approach_time", substring("close_approach_date_full",12,16)).drop("close_approach_date_full")

In [0]:
dfneo_dates_mod = dfneo_dates_mod.withColumn("close_approach_date_mod", to_date("close_approach_date")).drop("close_approach_date")

dfneo_dates_mod.coalesce(1).write.format("csv").save(location + "Developed/Temp",header = True)

filenames = dbutils.fs.ls(location + "Developed/Temp")
name = ''

for filename in filenames:
    if filename.name.endswith('.csv'):
        name = filename.name

dbutils.fs.cp(location + "Developed/Temp/" + name, location + "Developed/Neo_Dates.csv")
dbutils.fs.rm(location + "Developed/Temp/",recurse = True)

In [0]:
df_id_date = dfneo_dates_mod.select("neo_id","close_approach_date_mod")

In [0]:
#Converting "Neo_Id" column into List
ls_ids = list(dict.fromkeys(df_id_date.rdd.map(lambda x: x.neo_id).collect()))

In [0]:
from pyspark.sql.types import StringType, StructType, DateType, StructField

#Creating an Empty Dataframe, which will hold the Previous and Next Approach dates of NEO

schemas = StructType([StructField("neo_id", StringType(), True),
                      StructField("Previous_Approach_Date", DateType(), True),
                      StructField("Next_Approach_Date", DateType(), True)])

dfprev_nxt_dates = spark.createDataFrame([], schemas)

In [0]:
from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead

#Using Window function to obtain Previous and Next Approach dates of NEO with respect to Current Date
windowinfo = Window.partitionBy("neo_id").orderBy(col("close_approach_date_mod").asc())

for i in ls_ids:
    df_only_id = df_id_date.select("*").filter(col("neo_id") == i)
    dfcur = df_only_id.select("neo_id",current_date().alias("Currentdate")).distinct()
    
    dfalldates = df_only_id.union(dfcur)

    dfalldates = dfalldates.withColumn("lag", lag("close_approach_date_mod").over(windowinfo)).withColumn("lead",lead("close_approach_date_mod").over(windowinfo))

    df_curr_dates = dfalldates.select("*").filter(col("close_approach_date_mod") == current_date()).drop("close_approach_date_mod")
    dfprev_nxt_dates = dfprev_nxt_dates.union(df_curr_dates)

In [0]:
#Performing Grouping and Aggregations for analyses
dfneo_dates_time_grouped = dfneo_dates_mod.groupBy("neo_id").agg(avg("relative_velocity_kilometers_per_hour"), max("relative_velocity_kilometers_per_hour"), min("relative_velocity_kilometers_per_hour"), avg("miss_distance_kilometers"),
                                                                 max("miss_distance_kilometers"), min("miss_distance_kilometers"),
                                                            max("close_approach_date_mod"), min("close_approach_date_mod"))

In [0]:
##Joining
dfneo_dates_time_grouped = dfneo_dates_time_grouped.join(dfprev_nxt_dates, dfneo_dates_time_grouped["neo_id"]==dfprev_nxt_dates["neo_id"]).drop(dfprev_nxt_dates["neo_id"])

In [0]:
dfneo_dates_time_grouped.coalesce(1).write.format("csv").save(location + "Developed/Temp",header = True)

filenames = dbutils.fs.ls(location + "Developed/Temp")
name = ''

for filename in filenames:
    if filename.name.endswith('.csv'):
        name = filename.name

dbutils.fs.cp(location + "Developed/Temp/" + name, location + "Developed/Neo_Grouped.csv")
dbutils.fs.rm(location + "Developed/Temp/",recurse = True)