In [None]:
"""
Team Nova:
Arpit Palo
Arun Kumar
Nisarg Gupta
Omkar Kanade
"""

# PySpark: Data Download + Aggregate
# Code to download raw data from IOWA State's Mesonet service and aggregate the data for monthly analysis

In [None]:
!pip install reverse_geocoder pyspark

Collecting reverse_geocoder
  Downloading reverse_geocoder-1.5.1.tar.gz (2.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.2/2.2 MB[0m [31m38.9 MB/s[0m eta [36m0:00:00[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
[?25hCollecting pyspark
  Downloading pyspark-3.4.0.tar.gz (310.8 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m310.8/310.8 MB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25h  Preparing metadata (setup.py) ... [?25ldone
Building wheels for collected packages: reverse_geocoder, pyspark
  Building wheel for reverse_geocoder (setup.py) ... [?25ldone
[?25h  Created wheel for reverse_geocoder: filename=reverse_geocoder-1.5.1-py3-none-any.whl size=2268088 sha256=7ff99e8a504c89fa25c0084575efbd5a2439137306579beca094c93782b9d430
  Stored in directory: /root/.cache/pip/wheels/bd/e5/88/eb139b6d6a26b8022d370ab991f7a836802fed9871975ec6d9
  Building wheel for pyspark (setup.py) ... [?25ldone
[?25

In [None]:
import datetime
import json
import os
import sys
import time
from urllib.request import urlopen
import reverse_geocoder as rg
import pandas as pd

from pyspark.sql import SparkSession



In [None]:
spark = SparkSession.builder.appName("climate_analysis").config("spark.sql.execution.arrow.pyspark.enabled", "true").config("spark.executor.instances", 4).getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/05/13 16:15:12 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
# Number of attempts to download data
MAX_ATTEMPTS = 6
# HTTPS here can be problematic for installs that don't have Lets Encrypt CA
SERVICE = "http://mesonet.agron.iastate.edu/cgi-bin/request/asos.py?"

def download_data(uri):
    """Fetch the data from the IEM

    The IEM download service has some protections in place to keep the number
    of inbound requests in check.  This function implements an exponential
    backoff to keep individual downloads from erroring.

    Args:
      uri (string): URL to fetch

    Returns:
      string data
    """
    attempt = 0
    while attempt < MAX_ATTEMPTS:
        try:
            data = urlopen(uri, timeout=300).read().decode("utf-8")
            if data is not None and not data.startswith("ERROR"):
                return data
        except Exception as exp:
            print(f"download_data({uri}) failed with {exp}")
            time.sleep(5)
        attempt += 1

    print("Exhausted attempts to download, returning empty data")
    return ""

In [None]:
# Clear output folder
def clear_working_dir(folder_path = '/kaggle/working'):
    def remove_folder_contents(folder):
        for the_file in os.listdir(folder):
            if the_file.endswith('.txt') or the_file.endswith('.csv'):
                file_path = os.path.join(folder, the_file)
                try:
                    os.remove(file_path)
                except Exception as e:
                    print(e)
    remove_folder_contents(folder_path)
    
# Download Data
def fetch_data_for_year(year_start, year_end):
    startts = datetime.datetime(year_start, 1, 1)
    endts = datetime.datetime(year_end+1, 1, 1)
    interval = datetime.timedelta(hours=24)

    service = SERVICE + "data=all&tz=Etc/UTC&format=onlycomma&latlon=yes&elev=no&missing=empty&trace=T&direct=no&report_type=3&report_type=4&"

    now = startts
    while now < endts:
        thisurl = service
        thisurl += now.strftime("year1=%Y&month1=%m&day1=%d&")
        thisurl += (now + interval).strftime("year2=%Y&month2=%m&day2=%d&")
        print(f"Downloading: {now}")
        data = download_data(thisurl)
        outfn = f"{now:%Y%m%d}.txt"
        with open(outfn, "w", encoding="ascii") as fh:
            fh.write(data)
        now += interval

In [None]:
clear_working_dir()
fetch_data_for_year(2010, 2010)

Downloading: 2010-01-01 00:00:00
Downloading: 2010-01-02 00:00:00
Downloading: 2010-01-03 00:00:00
Downloading: 2010-01-04 00:00:00
Downloading: 2010-01-05 00:00:00
Downloading: 2010-01-06 00:00:00
Downloading: 2010-01-07 00:00:00
Downloading: 2010-01-08 00:00:00
Downloading: 2010-01-09 00:00:00
Downloading: 2010-01-10 00:00:00
Downloading: 2010-01-11 00:00:00
Downloading: 2010-01-12 00:00:00
Downloading: 2010-01-13 00:00:00
Downloading: 2010-01-14 00:00:00
Downloading: 2010-01-15 00:00:00
Downloading: 2010-01-16 00:00:00
Downloading: 2010-01-17 00:00:00
Downloading: 2010-01-18 00:00:00
Downloading: 2010-01-19 00:00:00
Downloading: 2010-01-20 00:00:00
Downloading: 2010-01-21 00:00:00
Downloading: 2010-01-22 00:00:00
Downloading: 2010-01-23 00:00:00
Downloading: 2010-01-24 00:00:00
Downloading: 2010-01-25 00:00:00
Downloading: 2010-01-26 00:00:00
Downloading: 2010-01-27 00:00:00
Downloading: 2010-01-28 00:00:00
Downloading: 2010-01-29 00:00:00
Downloading: 2010-01-30 00:00:00
Downloadin

In [None]:
df = spark.read.csv("/kaggle/working/", inferSchema=True, header=True)

                                                                                

In [None]:
df.count()

                                                                                

64085337

In [None]:
cols_to_keep = set(["valid", "lat","lon", "tmpf", "dwpf", "relh", "sknt", "p01i", "feel", "mslp", "ice_accretion_6hr"])
cols_to_drop = []

for col in df.columns:
    if col not in cols_to_keep:
        cols_to_drop.append(col)
        
df_cleaned = df.drop(*cols_to_drop)

In [None]:
df_cleaned.columns

['valid',
 'lon',
 'lat',
 'tmpf',
 'dwpf',
 'relh',
 'sknt',
 'p01i',
 'mslp',
 'ice_accretion_6hr',
 'feel']

In [None]:
def reverseGeocode(coordinates): 
    '''coordinates - latitude, lognitude'''
    result = rg.search(coordinates)
    return result[0]['cc']


from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

country_from_lat_lon_udf = udf(lambda lat, lon: reverseGeocode([(lat, lon)]), StringType())

df_country = df_cleaned.withColumn("CountryCode", country_from_lat_lon_udf("lat", "lon"))
df_country.show(5)

Loading formatted geocoded file...
[Stage 16:>                                                         (0 + 1) / 1]

+-------------------+--------+-------+----+----+-----+----+----+------+-----------------+------+-----------+
|              valid|     lon|    lat|tmpf|dwpf| relh|sknt|p01i|  mslp|ice_accretion_6hr|  feel|CountryCode|
+-------------------+--------+-------+----+----+-----+----+----+------+-----------------+------+-----------+
|2010-12-12 00:00:00|-76.7875|17.9357|77.4|66.4|68.89| 6.0|0.00|1011.8|             null|  77.4|         JM|
|2010-12-12 00:00:00| 24.7866|61.8561| 2.7| 0.1|88.59| 4.0|0.00|1007.6|             null|  -6.8|         FI|
|2010-12-12 00:00:00| 24.9633|60.3172|17.1|12.6|82.16|10.0|0.00|1004.8|             null|  4.28|         FI|
|2010-12-12 00:00:00| 27.4053|68.6073|24.4|20.8|85.94|14.0|0.00| 991.7|             null| 11.37|         FI|
|2010-12-12 00:00:00| 29.6333|62.6667| 1.4|-1.8|86.05| 8.0|0.00|1003.7|             null|-13.58|         FI|
+-------------------+--------+-------+----+----+-----+----+----+------+-----------------+------+-----------+
only showing top 5 

                                                                                

In [None]:
from pyspark.sql.functions import month, year

df_with_month = df_country.withColumns({'Month': month(df_country.valid), 'Year': year(df_country.valid)})

In [None]:
df_with_month.show()

Loading formatted geocoded file...
[Stage 17:>                                                         (0 + 1) / 1]

+-------------------+--------+-------+-----+----+-----+----+----+------+-----------------+------+-----------+-----+----+
|              valid|     lon|    lat| tmpf|dwpf| relh|sknt|p01i|  mslp|ice_accretion_6hr|  feel|CountryCode|Month|Year|
+-------------------+--------+-------+-----+----+-----+----+----+------+-----------------+------+-----------+-----+----+
|2010-12-12 00:00:00|-76.7875|17.9357| 77.4|66.4|68.89| 6.0|0.00|1011.8|             null|  77.4|         JM|   12|2010|
|2010-12-12 00:00:00| 24.7866|61.8561|  2.7| 0.1|88.59| 4.0|0.00|1007.6|             null|  -6.8|         FI|   12|2010|
|2010-12-12 00:00:00| 24.9633|60.3172| 17.1|12.6|82.16|10.0|0.00|1004.8|             null|  4.28|         FI|   12|2010|
|2010-12-12 00:00:00| 27.4053|68.6073| 24.4|20.8|85.94|14.0|0.00| 991.7|             null| 11.37|         FI|   12|2010|
|2010-12-12 00:00:00| 29.6333|62.6667|  1.4|-1.8|86.05| 8.0|0.00|1003.7|             null|-13.58|         FI|   12|2010|
|2010-12-12 00:00:00| 25.6752|62

                                                                                

In [None]:
from pyspark.sql.functions import avg

df_country_agg = df_with_month.groupBy('CountryCode', 'Month', 'Year') \
    .agg(avg("tmpf").alias("tmpf"), \
         avg("dwpf").alias("dwpf"), \
         avg("relh").alias("relh"), \
         avg("sknt").alias("sknt"), \
         avg("p01i").alias("p01i"), \
         avg("mslp").alias("mslp"), \
         avg("ice_accretion_6hr").alias("ice_accretion_6hr"), \
         avg("feel").alias("feel")
     )

In [None]:
df_country_agg.toPandas().to_csv("df_2010.csv",index=False)