In [2]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
conf = SparkConf()
conf.set("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
conf.set("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true")
spark = SparkSession.builder.config(conf=conf).getOrCreate()

In [3]:
folderPath = r'C:\Users\102222\Downloads\paytm-weather-challenge\paytmteam-de-weather-challenge-beb4fc53605c'

In [4]:
data = spark.read.option("header","true").csv(folderPath+'/data/2019')

In [5]:
country = spark.read.option("header","true").csv(folderPath+'/countrylist.csv')
station = spark.read.option("header","true").csv(folderPath+'/stationlist.csv')

In [6]:
names = station.join(country,["COUNTRY_ABBR"],"left_outer")

In [7]:
from pyspark.sql.functions import *
names.filter(col('COUNTRY_FULL').isNull()).show(5)

+------------+------+------------+
|COUNTRY_ABBR|STN_NO|COUNTRY_FULL|
+------------+------+------------+
|          MJ|134630|        null|
|          TT|973900|        null|
|          AE|412180|        null|
|          RI|132600|        null|
|          MJ|134610|        null|
+------------+------+------------+
only showing top 5 rows



In [33]:
df = data.withColumnRenamed('STN---','STN_NO').join(names,['STN_NO'],'left_outer')

In [92]:
data_to_ignore = {
    'TEMP' : 9999.9,
    'DEWP' : 9999.9,
    'SLP' : 9999.9,
    'STP' : 9999.9,
    'VISIB' : 999.9,
    'WDSP' : 999.9,
    'MXSPD' : 999.9,
    'GUST' : 999.9,
    'MAX' : 9999.9,
    'MIN' : 9999.9,
    'PRCP' : 99.99,
    'SNDP' : 999.9,
    
}

In [29]:
def changeToFloat(df):
    colstoChange = [*data_to_ignore.keys()]
    cols = list(
        filter(
            None.__ne__,
            list(
                map(
                    lambda name: col(name).cast(FloatType())
                    if name in colstoChange
                    else col(name),
                    df.columns,
                )
            ),
        )
    )
    return df.select(*cols)

In [94]:
df = changeToFloat(df)

In [45]:
import pandas as pd
from pyspark.sql.functions import pandas_udf
from pyspark.sql.types import *

@pandas_udf("double")
def temp_mean_withRemoving(v: pd.Series) -> float:
    return v[v!=data_to_ignore['MAX']].mean()

@pandas_udf("double")
def temp_mean(v: pd.Series) -> float:
    return v.mean()

In [53]:
df.groupby(col('COUNTRY_FULL')).agg(temp_mean_withRemoving(df['MAX']))\
.orderBy(col('temp_mean_withRemoving(MAX)').desc()).show()

+-------------+---------------------------+
| COUNTRY_FULL|temp_mean_withRemoving(MAX)|
+-------------+---------------------------+
|        SUDAN|          104.5230712890625|
|        NIGER|           97.8834228515625|
|      SENEGAL|          97.71339416503906|
|         CHAD|          97.24052429199219|
|         MALI|           95.4217529296875|
|GUINEA-BISSAU|                      93.75|
| BURKINA FASO|          93.40684509277344|
|         TOGO|          92.59318542480469|
|     THAILAND|          92.33601379394531|
|  GAMBIA  THE|          92.22368621826172|
|  EL SALVADOR|          92.19072723388672|
|       PANAMA|          92.05693817138672|
|        BENIN|           92.0526123046875|
|       KUWAIT|           91.5665054321289|
|        HAITI|          91.39999389648438|
| SAUDI ARABIA|           91.2256851196289|
|      NIGERIA|          91.15251159667969|
|        GHANA|          90.84201049804688|
|         OMAN|          90.56419372558594|
|      JAMAICA|          90.5107

In [47]:
df.groupby(col('COUNTRY_FULL')).agg(temp_mean(df['MAX'])).show()

+--------------------+------------------+
|        COUNTRY_FULL|    temp_mean(MAX)|
+--------------------+------------------+
|             ARMENIA|    63.87255859375|
|        SOUTH AFRICA| 86.85189819335938|
|               BURMA|  91.1471939086914|
|            CAMBODIA|              null|
|          BANGLADESH| 89.19732666015625|
|               JAPAN| 72.23062896728516|
|              UGANDA|102.84613800048828|
|SOUTH GEORGIA AND...| 39.22018051147461|
|          CAPE VERDE| 85.16928100585938|
|NORTHERN MARIANA ...| 87.91114807128906|
|FALKLAND ISLANDS ...| 289.8817443847656|
|          MAURITANIA|              null|
|              JERSEY|  59.3869514465332|
|            MALDIVES| 88.69937133789062|
|SAO TOME AND PRIN...|    9999.900390625|
|            TANZANIA| 87.91410064697266|
|              JORDAN| 80.31826782226562|
|             MAYOTTE|  87.7521743774414|
|        MAN  ISLE OF| 56.83333206176758|
|             LESOTHO|              null|
+--------------------+------------

In [49]:
df.filter(col('COUNTRY_FULL')=='MAURITANIA').select(col('MAX')).show()

+----+
| MAX|
+----+
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
|null|
+----+
only showing top 20 rows



In [54]:
df.select(col('COUNTRY_FULL'),col('FRSHTT')).show()

+------------+------+
|COUNTRY_FULL|FRSHTT|
+------------+------+
|      NORWAY|011010|
|      NORWAY|010000|
|      NORWAY|000000|
|      NORWAY|010000|
|      NORWAY|010000|
|      NORWAY|010000|
|      NORWAY|010000|
|      NORWAY|010000|
|      NORWAY|010000|
|      NORWAY|110000|
|      NORWAY|010000|
|      NORWAY|010000|
|      NORWAY|111000|
|      NORWAY|011000|
|      NORWAY|010000|
|      NORWAY|010000|
|      NORWAY|011000|
|      NORWAY|010000|
|      NORWAY|000000|
|      NORWAY|010000|
+------------+------+
only showing top 20 rows



In [69]:
tornodo = df.select(col('COUNTRY_FULL'),col('YEARMODA'),col('FRSHTT')).withColumn('Tornodo',substring(col('FRSHTT'),-1,1))

In [70]:
tornodo.show()

+------------+--------+------+-------+
|COUNTRY_FULL|YEARMODA|FRSHTT|Tornodo|
+------------+--------+------+-------+
|      NORWAY|20190101|011010|      0|
|      NORWAY|20190102|010000|      0|
|      NORWAY|20190103|000000|      0|
|      NORWAY|20190104|010000|      0|
|      NORWAY|20190105|010000|      0|
|      NORWAY|20190106|010000|      0|
|      NORWAY|20190107|010000|      0|
|      NORWAY|20190108|010000|      0|
|      NORWAY|20190109|010000|      0|
|      NORWAY|20190110|110000|      0|
|      NORWAY|20190111|010000|      0|
|      NORWAY|20190112|010000|      0|
|      NORWAY|20190113|111000|      0|
|      NORWAY|20190114|011000|      0|
|      NORWAY|20190115|010000|      0|
|      NORWAY|20190116|010000|      0|
|      NORWAY|20190117|011000|      0|
|      NORWAY|20190118|010000|      0|
|      NORWAY|20190119|000000|      0|
|      NORWAY|20190120|010000|      0|
+------------+--------+------+-------+
only showing top 20 rows



In [89]:
from pyspark.sql import Window

windowing = Window.partitionBy(col('COUNTRY_FULL')).orderBy(col('YEARMODA'))

tornodo.withColumn('nextDay',lead(col('Tornodo')).over(windowing))\
.withColumn('valid',when(
(col('Tornodo') == '1') & (col('nextDay') == '1'),1
).otherwise(0))\
.select(col('COUNTRY_FULL'),col('valid'))\
.groupby(col('COUNTRY_FULL')).agg(sum(col('valid'))).orderBy(col('sum(valid)').desc()).show()

+--------------------+----------+
|        COUNTRY_FULL|sum(valid)|
+--------------------+----------+
|      CAYMAN ISLANDS|         2|
|         BAHAMAS THE|         1|
|         WAKE ISLAND|         0|
|              JERSEY|         0|
|SOUTH GEORGIA AND...|         0|
|        MAN  ISLE OF|         0|
|             LESOTHO|         0|
|FALKLAND ISLANDS ...|         0|
|          MAURITANIA|         0|
|   EQUATORIAL GUINEA|         0|
|          CAPE VERDE|         0|
|NORTHERN MARIANA ...|         0|
|              JORDAN|         0|
|             MAYOTTE|         0|
| ANTIGUA AND BARBUDA|         0|
|           MAURITIUS|         0|
|              KUWAIT|         0|
|           LITHUANIA|         0|
|            MALDIVES|         0|
|SAO TOME AND PRIN...|         0|
+--------------------+----------+
only showing top 20 rows



In [96]:
@pandas_udf("double")
def mean_wind(v: pd.Series) -> float:
    return v[v!=data_to_ignore['WDSP']].mean()

df.groupby(col('COUNTRY_FULL')).agg(mean_wind(df['WDSP']))\
.orderBy(col('mean_wind(WDSP)').desc()).show()

+--------------------+------------------+
|        COUNTRY_FULL|   mean_wind(WDSP)|
+--------------------+------------------+
|FALKLAND ISLANDS ...| 17.87783432006836|
|               ARUBA|15.975682258605957|
|       FAROE ISLANDS|15.280671119689941|
|FRENCH SOUTHERN A...| 14.20372200012207|
|            BARBADOS|14.097541809082031|
|ST. PIERRE AND MI...|13.907670974731445|
|          CAPE VERDE|13.615221977233887|
|     TROMELIN ISLAND|13.005277633666992|
|          ST. HELENA|12.730517387390137|
|          MAURITANIA|12.723465919494629|
|          ANTARCTICA|12.275172233581543|
|             SOMALIA|12.274880409240723|
|COCOS (KEELING) I...|12.054546356201172|
|            GUERNSEY| 12.01801872253418|
|        MAN  ISLE OF|11.893424034118652|
|          MONTSERRAT| 11.80881404876709|
|             ICELAND| 11.73477554321289|
|      WESTERN SAHARA|11.647546768188477|
|           ST. LUCIA|11.575239181518555|
|              JERSEY| 11.47726058959961|
+--------------------+------------

In [95]:
df

DataFrame[STN_NO: string, WBAN: string, YEARMODA: string, TEMP: float, DEWP: float, SLP: float, STP: float, VISIB: float, WDSP: float, MXSPD: float, GUST: float, MAX: float, MIN: float, PRCP: float, SNDP: float, FRSHTT: string, COUNTRY_ABBR: string, COUNTRY_FULL: string]