In [None]:
pip install "dask[complete]" sklearn

In [None]:
import os
import numpy as np
import glob
import dask.dataframe as dd
from dask.distributed import Client

client = Client()

### To pay attention to
- missing fields -> all 9's, do something about this

## Setup data

In [2]:
relative_folder_path_glob =  'paytm-weather-challenge\paytmteam*'

### Weather

In [3]:
weather_filepaths = os.path.join(relative_folder_path_glob, 'data', '2019', 'part*gz')
# print(weather_filepaths)
weather = dd.read_csv(
    weather_filepaths, compression='gzip',
    dtype={'STN---': str,}
)

Please ensure that each individual file can fit in memory and
use the keyword ``blocksize=None to remove this message``
Setting ``blocksize=None``
  warn(


In [4]:
weather.head()

Unnamed: 0,STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT
0,10260,99999,20190101,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5,1000
1,10260,99999,20190102,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8,1000
2,10260,99999,20190103,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9,11000
3,10260,99999,20190104,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9,1000
4,10260,99999,20190105,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6,10000


### Stations, countries

In [5]:
stations = dd.read_csv(
    os.path.join(relative_folder_path_glob, "stationlist.csv"),
    dtype={'STN_NO': str,
    'COUNTRY_ABBR': str,}
)
stations.head()

Unnamed: 0,STN_NO,COUNTRY_ABBR
0,12240,NO
1,20690,SW
2,20870,SW
3,21190,SW
4,32690,UK


In [6]:
# Countries might contain comma's, need to handle this
countries = dd.read_csv(
    os.path.join(relative_folder_path_glob, "countrylist.csv"), 
    sep="^([^,]+),",
    dtype={'COUNTRY_ABBR': str,
    'COUNTRY_FULL': str,}
)
countries.head()

  head = reader(BytesIO(b_sample), **kwargs)


Unnamed: 0.1,Unnamed: 0,COUNTRY_ABBR,COUNTRY_FULL
0,,AA,ARUBA
1,,AC,ANTIGUA AND BARBUDA
2,,AF,AFGHANISTAN
3,,AG,ALGERIA
4,,AI,ASCENSION ISLAND


In [7]:
stations_merged = stations.merge(
    countries[['COUNTRY_ABBR', 'COUNTRY_FULL']], 
    left_on="COUNTRY_ABBR", 
    right_on="COUNTRY_ABBR"
)
stations_merged.head()

Unnamed: 0,STN_NO,COUNTRY_ABBR,COUNTRY_FULL
0,12240,NO,NORWAY
1,10580,NO,NORWAY
2,11520,NO,NORWAY
3,11751,NO,NORWAY
4,11060,NO,NORWAY


### Global weather joining

In [8]:
weather.dtypes

STN---       object
WBAN          int64
YEARMODA      int64
TEMP        float64
DEWP        float64
SLP         float64
STP         float64
VISIB       float64
WDSP        float64
MXSPD       float64
GUST        float64
MAX          object
MIN          object
PRCP         object
SNDP        float64
FRSHTT        int64
dtype: object

In [9]:
stations_merged.dtypes

STN_NO          object
COUNTRY_ABBR    object
COUNTRY_FULL    object
dtype: object

In [10]:
# Making sure dtypes are proper for joining
weather['STN---'] = weather['STN---'].astype(str)
stations_merged['STN_NO'] = stations_merged['STN_NO'].astype(str)

In [11]:
weather_data = weather.merge(stations_merged, left_on="STN---", right_on="STN_NO")
weather_data.head()

Unnamed: 0,STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT,STN_NO,COUNTRY_ABBR,COUNTRY_FULL
0,10260,99999,20190101,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5,1000,10260,NO,NORWAY
1,10260,99999,20190102,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8,1000,10260,NO,NORWAY
2,10260,99999,20190103,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9,11000,10260,NO,NORWAY
3,10260,99999,20190104,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9,1000,10260,NO,NORWAY
4,10260,99999,20190105,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6,10000,10260,NO,NORWAY


## Questions

### 1. Which country had hottest avg mean temp?
- How: Group all data by country, get the mean of each countries TEMP column

In [28]:
columns_to_use = [
    "TEMP", # avg temp on a given day
    "COUNTRY_FULL", # country name
]

#### Clean up 9999.9(s) in the mean temp column

In [12]:
weather_data['TEMP'].replace(9999.9, np.NaN)
weather_data.head()

Unnamed: 0,STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT,STN_NO,COUNTRY_ABBR,COUNTRY_FULL
0,10260,99999,20190101,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5,1000,10260,NO,NORWAY
1,10260,99999,20190102,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8,1000,10260,NO,NORWAY
2,10260,99999,20190103,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9,11000,10260,NO,NORWAY
3,10260,99999,20190104,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9,1000,10260,NO,NORWAY
4,10260,99999,20190105,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6,10000,10260,NO,NORWAY


In [16]:
# checking that it worked
weather_data['TEMP'].max().compute()

110.0

In [26]:
# Get the answer
station_mean_temps = weather_data[['COUNTRY_FULL', 'TEMP']].groupby("COUNTRY_FULL")['TEMP'].mean().reset_index().compute()
station_mean_temps.sort_values("TEMP", ascending=False).head()

Unnamed: 0,COUNTRY_FULL,TEMP
208,DJIBOUTI,90.061145
10,CHAD,87.360997
35,NIGER,85.060223
224,SUDAN,84.454942
171,EL SALVADOR,84.440459


#### Hottest average mean temperature:
* DJIBOUTI, with 90.1 degrees Fahrenheit

### 2. Which country had most consecutive days of tornadoes/funnel cloud formations?
- How: 
    - function to determine if bit string (FRSHTT) indicates tornado, True/False, put in other column
    - Going to assume readings are taken every day, not going to check dates are 1 day apart
    - filter on each location, then use series to mask cumsum of negation: https://stackoverflow.com/questions/48897265/how-to-count-longest-uninterrupted-sequence-in-pandas

In [43]:
columns_to_use = [
    "FRSHTT", # sixth digit only = tornado/funnel cloud
    "COUNTRY_FULL", # country name
    "YEARMODA", # date, for sorting
]

In [49]:
# Making sure dtypes are proper
weather_data['FRSHTT'] = weather_data['FRSHTT'].astype(str)
weather_data['YEARMODA'] = dd.to_datetime(weather_data['YEARMODA'], format="%Y%m%d")

In [50]:
def had_tornado(x: str):
    if len(x) >= 6 and x[5] == "1":
        return True
    else:
        return False
    
# testing
print("Returned:", had_tornado("100101"), "expected:", True)
print("Returned:", had_tornado("000001"), "expected:", True)
print("Returned:", had_tornado("10010"), "expected:", False)
print("Returned:", had_tornado("1"), "expected:", False)

Returned: True expected: True
Returned: True expected: True
Returned: False expected: False
Returned: False expected: False


In [51]:
weather_data['had_tornado'] = weather_data['FRSHTT'].apply(lambda x: had_tornado(x))
weather_data.head()

You did not provide metadata, so Dask is running your function on a small dataset to guess output types. It is possible that Dask will guess incorrectly.
To provide an explicit output types or to silence this message, please provide the `meta=` keyword, as described in the map or apply function that you are using.
  Before: .apply(func)
  After:  .apply(func, meta=('FRSHTT', 'bool'))



Unnamed: 0,STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT,STN_NO,COUNTRY_ABBR,COUNTRY_FULL,had_tornado
0,10260,99999,2019-01-01,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5,1000,10260,NO,NORWAY,False
1,10260,99999,2019-01-02,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8,1000,10260,NO,NORWAY,False
2,10260,99999,2019-01-03,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9,11000,10260,NO,NORWAY,False
3,10260,99999,2019-01-04,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9,1000,10260,NO,NORWAY,False
4,10260,99999,2019-01-05,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6,10000,10260,NO,NORWAY,False


In [None]:
location_tornado_seq = dict()

for location in weather_data["COUNTRY_FULL"].unique():
    group = weather_data[weather_data['COUNTRY_FULL'] == location]
    tornado_seq_count = (~group['had_tornado'])[group['had_tornado']].value_counts().max().compute()
    location_tornado_seq[location] = tornado_seq_count

In [None]:
dd.from_dict(location_tornado_seq)

In [None]:
#### Longest sequence of tornados:
* X, with Y tornadoes in a row

In [None]:
#### This was a tricky question, haven't done anything with streaks without using dumb loops (similar to the above)
- I also didn't quite have enough time to finish running the above loop, so X and Y aren't filled into the answer section

### 3. Which country had the second highest average mean wind speed over the year?
* How:
    * Similar to first question, group by country, get mean WDSP

In [70]:
columns_to_use = [
    "WDSP", # windspeed mean for day
    "COUNTRY_FULL", # country name
]

In [72]:
# clean up bad values
weather_data['WDSP'].replace(999.9, np.NaN)
weather_data.head()

Unnamed: 0,STN---,WBAN,YEARMODA,TEMP,DEWP,SLP,STP,VISIB,WDSP,MXSPD,GUST,MAX,MIN,PRCP,SNDP,FRSHTT,STN_NO,COUNTRY_ABBR,COUNTRY_FULL,had_tornado
0,10260,99999,2019-01-01,26.1,21.2,1001.9,987.5,20.6,9.0,15.9,29.7,29.8,21.7*,0.02G,18.5,1000,10260,NO,NORWAY,False
1,10260,99999,2019-01-02,24.9,22.1,1020.1,1005.5,5.4,5.6,13.6,22.1,27.1*,20.7,0.48G,22.8,1000,10260,NO,NORWAY,False
2,10260,99999,2019-01-03,31.7,29.1,1008.9,994.7,13.6,11.6,21.4,49.5,37.4*,26.8*,0.25G,999.9,11000,10260,NO,NORWAY,False
3,10260,99999,2019-01-04,32.9,30.3,1011.4,997.1,15.8,4.9,7.8,10.9,36.1,31.8,0.52G,999.9,1000,10260,NO,NORWAY,False
4,10260,99999,2019-01-05,35.5,33.0,1015.7,1001.4,12.0,10.4,13.6,21.0,38.5*,32.7,0.02G,23.6,10000,10260,NO,NORWAY,False


In [74]:
# Get the answer
station_mean_wind = weather_data[['COUNTRY_FULL', 'WDSP']].groupby("COUNTRY_FULL")['WDSP'].mean().reset_index().compute()
station_mean_wind.sort_values("WDSP", ascending=False).head()

Unnamed: 0,COUNTRY_FULL,WDSP
74,GABON,485.179478
60,ARMENIA,457.365932
70,ETHIOPIA,441.251528
230,VENEZUELA,351.984397
130,LATVIA,338.47314


#### Second highest mean windspeed throughout the year:
* ARMENIA, with 457.4 mean windspeed over the year