In [1]:
from pyspark.sql import SparkSession, functions, types
from pyspark.sql.functions import col,isnan,when,count, radians, asin, sin, sqrt, cos,min,year,max
from pyspark.sql import Window as W
import uuid

spark = SparkSession.builder.appName('weather_ebird_etl').getOrCreate()
spark.sparkContext.setLogLevel('WARN')
sc = spark.sparkContext

weather_schema = types.StructType([
        types.StructField('station_id', types.StringType()),
        types.StructField('date', types.StringType()),
        types.StructField('PRCP', types.FloatType()),
        types.StructField('SNOW', types.FloatType()),
        types.StructField('SNWD', types.FloatType()),
        types.StructField('TMAX', types.FloatType()),
        types.StructField('TMIN', types.FloatType()),
        types.StructField("Latitude", types.FloatType()),
        types.StructField("Longitude", types.FloatType()),
        types.StructField("Elevation", types.FloatType()),
        types.StructField("State", types.StringType())
])

ebird_schema = types.StructType([
        types.StructField("speciesCode",types.StringType()),
        types.StructField("comName",types.StringType()),
        types.StructField("sciName",types.StringType()),
        types.StructField("locId",types.StringType()),
        types.StructField("locName",types.StringType()),
        types.StructField("obsDt",types.DateType()),
        types.StructField("howMany",types.StringType()),
        types.StructField("lat",types.FloatType()),
        types.StructField("lng",types.FloatType()),
        types.StructField("obsValid",types.StringType()),
        types.StructField("obsReviewed",types.StringType()),
        types.StructField("locationPrivate",types.StringType()),
        types.StructField("subId",types.StringType()),
    ])

ebird = spark.read.format('csv').schema(ebird_schema).load("gs://big-data-1-project-storage/cleaned-data/ebird_nonull.csv")
weather = spark.read.format('csv').schema(weather_schema).load("gs://big-data-1-project-storage/cleaned-data/weather_stations.csv")

weather = weather.filter(weather['station_id'] != 'null')
weather = weather.filter(weather['state'] == 'BC')
weather = weather.filter( (weather['TMAX'].isNotNull())  & (weather['TMIN'].isNotNull()) )

weather = weather.withColumn("year", functions.substring(weather['date'], 0,4))\
    .withColumn('month', functions.substring(weather['date'], 5,2))\
    .withColumn('day', functions.substring(weather['date'], 7,2))

weather = weather.withColumn("date_final", functions.concat_ws("-",functions.col("year"),functions.col("month"),functions.col("day")).cast("date"))

weather = weather.drop('year','month','day')

ebird = ebird.filter( (ebird['lat'].isNotNull())  & (ebird['lng'].isNotNull()) )


ebird = ebird.withColumn("ebird_id", functions.expr("uuid()"))

savingEbird = ebird
savingWeather = weather

# savingEbird and saving Weather is saved here 



In [2]:
start = 1959
end = 2021 

In [3]:
# yr = 1959
for yr in range(start,end+1):
    ebird_1959 = savingEbird.filter(year("ObsDt")==yr)
    weather_1959 = savingWeather.filter(year("date_final")==yr)

#     ebird_1959.count()


    join_1959 = ebird_1959.crossJoin(weather_1959).withColumn("dist_longit", radians(weather_1959["Longitude"]) - radians(ebird_1959["lng"])).withColumn("dist_latit", radians(weather_1959["Latitude"]) - radians(ebird_1959["lat"]))

    join_1959 = join_1959.withColumn("haversine_distance_kms", asin(sqrt(
                                             sin(join_1959["dist_latit"] / 2) ** 2 + cos(radians(join_1959["lat"]))
                                             * cos(radians(join_1959["Latitude"])) * sin(join_1959["dist_longit"] / 2) ** 2
                                             )
                                        ) * 2 * 6371).drop("dist_longit","dist_latit",
                                                          "obsValid","obsReviewed",
                                                          "locationPrivate","subId")


    join_1959= join_1959.filter(join_1959['obsDt'] == join_1959['date_final'])

    min_dist_1959 = join_1959.groupBy(['ebird_id']).agg({'haversine_distance_kms':'min'}).withColumnRenamed('min(haversine_distance_kms)','min_dist')


    min_dist_1959 = min_dist_1959.withColumnRenamed('ebird_id','ebird_id_min')

    condition = [min_dist_1959['ebird_id_min'] == join_1959['ebird_id'] , min_dist_1959['min_dist'] ==join_1959['haversine_distance_kms'] ] 

    join_1959 = join_1959.join(min_dist_1959, condition, 'inner')
    print(yr)
#     join_1959.count()

    join_1959= join_1959.drop('ebird_id_min','haversine_distance_kms' ) 
    join_1959.write.save("gs://big-data-1-project-storage/cleaned-data/joined-data-final/"+str(yr), format='csv',header=True)

1963
1964
1965


In [None]:
#combining all the multiple year files to one 
inputs = "gs://big-data-1-project-storage/cleaned-data/joined-data-final"


schema = types.StructType([
            types.StructField("speciesCode",types.StringType()),
            types.StructField("comName",types.StringType()),
            types.StructField("sciName",types.StringType()),
            types.StructField("locId",types.StringType()),
            types.StructField("locName",types.StringType()),
            types.StructField("obsDt",types.DateType()),
            types.StructField("howMany",types.StringType()),
            types.StructField("lat",types.FloatType()),
            types.StructField("lng",types.FloatType()),
            types.StructField("ebird_id",types.StringType()),

            types.StructField('station_id', types.StringType()),
            types.StructField('date', types.StringType()),
            types.StructField('PRCP', types.FloatType()),
            types.StructField('SNOW', types.FloatType()),
            types.StructField('SNWD', types.FloatType()),
            types.StructField('TMAX', types.FloatType()),
            types.StructField('TMIN', types.FloatType()),
            types.StructField("Latitude", types.FloatType()),
            types.StructField("Longitude", types.FloatType()),
            types.StructField("Elevation", types.FloatType()),
            types.StructField("State", types.StringType()),
            types.StructField("date_final",types.DateType()),
            types.StructField("min_dist", types.FloatType()),

     
           
        ])




finalsaver = spark.read.format('csv').option("recursiveFileLookup","true").schema(schema).load(inputs)


finalsaver = finalsaver.filter( finalsaver['lat'].isNotNull() )

filename.write.save("gs://big-data-1-project-storage/cleaned-data/joined-data-final-final", format='csv',header=True)

print(finalsaver.show(3))
print(finalsaver.count())
