In [1]:
# innstall java
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

# install spark (change the version number if needed)
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz

# unzip the spark file to the current folder
!tar xf spark-3.0.0-bin-hadoop3.2.tgz

# set your spark folder to your system path environment. 
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"


# install findspark using pip
!pip install -q findspark

In [2]:
 !pip install -q findspark

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
        .master("local")\
        .appName("Colab")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

In [5]:
spark

In [6]:
!wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
!unzip ngrok-stable-linux-amd64.zip
get_ipython().system_raw('./ngrok http 4050 &')
!curl -s http://localhost:4040/api/tunnels

--2021-04-15 00:56:14--  https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
Resolving bin.equinox.io (bin.equinox.io)... 3.223.71.232, 3.217.26.81, 3.229.173.44, ...
Connecting to bin.equinox.io (bin.equinox.io)|3.223.71.232|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 14746350 (14M) [application/octet-stream]
Saving to: ‘ngrok-stable-linux-amd64.zip’


2021-04-15 00:56:15 (13.6 MB/s) - ‘ngrok-stable-linux-amd64.zip’ saved [14746350/14746350]

Archive:  ngrok-stable-linux-amd64.zip
  inflating: ngrok                   
{"tunnels":[],"uri":"/api/tunnels"}


# Solution


In [7]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import Window
# Required for StructField, StringType, IntegerType, etc.
from pyspark.sql.types import *

In [8]:
csvCountrySchema = StructType([
      StructField("COUNTRY_ABBR", StringType(), False),
      StructField("COUNTRY_FULL", StringType(), False),
                                  ])

csvStationSchema = StructType([
        StructField("STN_NO", IntegerType(), False),
        StructField("COUNTRY_ABBR", StringType(), False)
                              ])
csvWeatherSchema = StructType([           
      StructField("STN_NO", IntegerType(), False),        
      StructField("WBAN", IntegerType(), False), 
      StructField("YEARMODA", IntegerType(), False), 
      StructField("TEMP", FloatType(), False),
      StructField("DEWP", FloatType(), False),
      StructField("SLP", FloatType(), False), 
      StructField("STP", FloatType(), False), 
      StructField("VISB", FloatType(), False), 
      StructField("WDSP", FloatType(), False),  
      StructField("MXSPD", FloatType(), False),
      StructField("GUST", FloatType(), False), 
      StructField("MAX", FloatType(), False),  
      StructField("MIN", FloatType(), False),  
      StructField("PRCP", FloatType(), False), 
      StructField("SNDP", FloatType(), False), 
      StructField("FRSHTT", IntegerType(), False)                                             
                                ])

                                 
        # A reference to our csv-seperated-file
csvCountryFile = "/content/data/countrylist.csv"
csvStationFile = "/content/data/stationlist.csv"
csvWeatherFile = "/content/data/2019/*.gz"

#Inputs
sdf_country = spark.read \
                .option('header', 'true') \
                .option('sep', ",") \
                .option("ignoreLeadingWhiteSpace", True) \
                .option("ignoreTrailingWhiteSpace", True) \
                .schema(csvCountrySchema)\
                .csv(csvCountryFile)    


sdf_station = spark.read \
                .option('header', 'true') \
                .option('sep', ",") \
                .option("ignoreLeadingWhiteSpace", True) \
                .option("ignoreTrailingWhiteSpace", True) \
                .schema(csvStationSchema)\
                .csv(csvStationFile) 


sdf_wthr19 = spark.read \
                .option("header", "true") \
                .schema(csvWeatherSchema)\
                .option("ignoreLeadingWhiteSpace", True) \
                .option("ignoreTrailingWhiteSpace", True) \
                .option('sep', ",") \
                .csv(csvWeatherFile) 

sdf_wthrstn19 =     sdf_station.join(sdf_country, on = 'COUNTRY_ABBR')  \
                             .join(sdf_wthr19, on = 'STN_NO')          

In [9]:
sdf_wthrstn19.show()

+------+------------+------------+-----+--------+----+----+------+------+-----+----+-----+-----+----+----+----+-----+------+
|STN_NO|COUNTRY_ABBR|COUNTRY_FULL| WBAN|YEARMODA|TEMP|DEWP|   SLP|   STP| VISB|WDSP|MXSPD| GUST| MAX| MIN|PRCP| SNDP|FRSHTT|
+------+------------+------------+-----+--------+----+----+------+------+-----+----+-----+-----+----+----+----+-----+------+
| 13840|          NO|      NORWAY|99999|20190101|38.6|24.8|9999.9|9999.9|999.9|11.7| 33.6| 44.1|null|null|null|999.9|     0|
| 13840|          NO|      NORWAY|99999|20190102|30.5|16.6|9999.9|9999.9|999.9| 8.0| 23.7| 35.0|null|null|null|999.9|     0|
| 13840|          NO|      NORWAY|99999|20190103|32.2|21.7|9999.9|9999.9|999.9| 3.5| 13.0|999.9|null|null|null|999.9|     0|
| 13840|          NO|      NORWAY|99999|20190104|34.2|27.2|9999.9|9999.9|999.9| 2.6|  6.0|999.9|null|null|null|999.9|     0|
| 13840|          NO|      NORWAY|99999|20190105|30.6|27.4|9999.9|9999.9| 15.5| 2.0|  6.4|999.9|null|null|null|999.9|100000|


In [10]:
sdf_wthrstn19.createOrReplaceTempView("wthr19");

Which Country had the hottest average temperature of the year 

In [11]:
spark.sql('''SELECT  COUNTRY_FULL, AVG(TEMP) AS AVG_TEMP FROM wthr19 WHERE TEMP != 9999.9 GROUP BY COUNTRY_FULL ORDER BY AVG(TEMP) DESC LIMIT 1''').show(1)

+------------+-----------------+
|COUNTRY_FULL|         AVG_TEMP|
+------------+-----------------+
|    DJIBOUTI|90.06114474836602|
+------------+-----------------+



Found this interesting : https://www.amazon.com/Welcome-Djibouti-survive-hottest-country/dp/1794559418


Q) Which country has the second highest average mean wind speed over the year ? 

In [14]:
sdf_windspeed = spark.sql('''SELECT  COUNTRY_FULL, AVG(WDSP) AS AVG_TEMP FROM wthr19 WHERE WDSP != 9999.9 GROUP BY COUNTRY_FULL ORDER BY AVG(WDSP) DESC''') 
print(sdf_windspeed.collect()[1])

Row(COUNTRY_FULL='ARMENIA', AVG_TEMP=457.3659429499847)


Q)Which country had the most consecutive days of tornades  funnel cloud formations ? 

---



In [64]:
w1 = Window.partitionBy("COUNTRY_FULL").orderBy(["COUNTRY_FULL", "YEARMODA"])
w2 = Window.partitionBy("DIFF").orderBy("COUNTRY_FULL")

sdf_tornadoflag = spark.sql('''SELECT  COUNTRY_FULL,   YEARMODA , FRSHTT   , int(substring(string(FRSHTT),6,1)) AS `flag`
             FROM wthr19  WHERE  char_length(string(FRSHTT)) = 6   ORDER BY COUNTRY_FULL, YEARMODA DESC
                    ''')
sdf_tornadoflag = sdf_tornadoflag.withColumn("PREVIOUS_DAY", F.lag(sdf_tornadoflag.YEARMODA).over(w1)) 
sdf_tornadoflag = sdf_tornadoflag.withColumn("DIFF_DAY", F.when(F.isnull(sdf_tornadoflag.YEARMODA - sdf_tornadoflag.PREVIOUS_DAY), 0) \
                                  .otherwise(sdf_tornadoflag.YEARMODA - sdf_tornadoflag.PREVIOUS_DAY))\
                                  

sdf_tornadoflag.createOrReplaceTempView("tornadoflag"); 

In [68]:
spark.sql('''SELECT  * FROM tornadoflag WHERE flag =1  ORDER BY COUNTRY_FULL, YEARMODA''').show(50)

+-------------+--------+------+----+------------+--------+
| COUNTRY_FULL|YEARMODA|FRSHTT|flag|PREVIOUS_DAY|DIFF_DAY|
+-------------+--------+------+----+------------+--------+
|      AUSTRIA|20191014|100001|   1|    20191014|       0|
|      AUSTRIA|20191019|110001|   1|    20191019|       0|
|  BAHAMAS THE|20190327|110001|   1|    20190319|       8|
|       CANADA|20190616|110001|   1|    20190616|       0|
|       CANADA|20190811|110001|   1|    20190810|       1|
|       CANADA|20191020|100001|   1|    20191020|       0|
|     COLOMBIA|20191205|110011|   1|    20191205|       0|
|         CUBA|20190214|110001|   1|    20190213|       1|
|         CUBA|20190807|110011|   1|    20190807|       0|
|      GEORGIA|20191130|110001|   1|    20191129|       1|
|        GHANA|20190322|100001|   1|    20190308|      14|
|      ICELAND|20190128|101001|   1|    20190127|       1|
|         IRAN|20191124|100001|   1|    20191124|       0|
|        ITALY|20191201|110001|   1|    20191201|       