## This is the final processing of data from Solar radiation and photovoltaic data

### import pyspark
from uszipcode import SearchEngine
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import pygeohash as pgh

In [18]:
spark = (SparkSession
        .builder
        .appName('captsone')
        .getOrCreate())

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [19]:
# read the file from s3 bucket
input_solar_s3 = "s3://data-est2-cap/solar/input_nrel/input-nrel.parquet/"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [20]:
solar_input_ingest = spark.read.parquet(input_solar_s3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [21]:
solar_input_ingest.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- ghi: double (nullable = true)

# Let convert the latitude and longitude into 2 decimal places (error~1.11 km)

In [22]:
solar_input_ingest_1= (solar_input_ingest.withColumn("latitude", F.round( "latitude", 2))
                     .withColumn("longitude", F.round( "longitude", 2))
                      .groupBy("latitude", "longitude").sum("ghi")
                      .withColumnRenamed('sum(ghi)', 'ghi')
                      )


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

## Precision of 6 is taken which make a  Grid of  1.2km x 609.4m for calculating the Geohash

In [23]:
# convert latitude and longitude to hash map
def LatLongToGeohash(latitude,longitude, precision=6):
    """change latitude and longitude to geohash
    """
    res= pgh.encode(latitude,longitude, precision=precision)
    return res
    

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [24]:
udf_geohash_lat_lo = F.udf(lambda lat, long : LatLongToGeohash(lat, long, precision=6))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [25]:
solar_geohash_final = solar_input_ingest_1.withColumn('geohash',udf_geohash_lat_lo('latitude', 'longitude'))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [26]:
rooftop_albany_ny_s3 = 's3://data-est2-cap/rooftop/data_output_rooftop/data_output_rooftop.parquet'

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [28]:
rooftop_albany_ingest = spark.read.parquet(rooftop_albany_ny_s3)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [29]:
rooftop_albany_ingest.show(5)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------+--------+-------------+
|geohash|    city|total_area_m2|
+-------+--------+-------------+
| dr5rur|New York|     73972.64|
| dr72tc|New York|     31677.52|
| dr72hk|New York|    100240.93|
| dr72jt|New York|    127189.83|
| dr5rst|New York|    119926.89|
+-------+--------+-------------+
only showing top 5 rows

## Join the two data set using Geohash.   Solar power (MWH) (GHI *  area) is calculated. Only New York and Albany is taken

In [30]:
final_roof_solar = (rooftop_albany_ingest.join(solar_geohash_final,solar_geohash_final.geohash ==  rooftop_albany_ingest.geohash,"inner")
            .withColumn("power_MWH", F.col("ghi") * F.col("total_area_m2")/ 10**6)
            .select(["latitude", "longitude", "ghi", "city", "power_MWH", "total_area_m2"])
            .withColumn("power_MWH", F.round( "power_MWH", 2))
            )


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [31]:
final_roof_solar.show(4)

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------+---------+---------+--------+---------+-------------+
|latitude|longitude|      ghi|    city|power_MWH|total_area_m2|
+--------+---------+---------+--------+---------+-------------+
|   42.77|   -73.74|2894029.0|  Albany| 40148.43|     13872.85|
|   40.85|    -73.9|2998934.0|New York|566719.44|    188973.63|
|   42.73|   -73.78|2895605.0|  Albany| 58688.93|     20268.28|
|   42.69|   -73.82|2888035.0|  Albany| 71129.96|     24629.19|
+--------+---------+---------+--------+---------+-------------+
only showing top 4 rows

In [32]:
#S3 bucket output file save location
output_file = "s3://data-est2-cap/final_data/final_data"

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

# Save file in S3 bucket 

In [33]:
final_roof_solar.write.mode("overwrite").parquet(f"{output_file}.parquet")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…