# Wind Statistics

### Introduction:

The data have been modified to contain some missing values, identified by NaN.  
Using pandas should make this exercise
easier, in particular for the bonus question.

You should be able to perform all of these operations without using
a for loop or other looping construct.


1. The data in 'wind.data' has the following format:

In [434]:
"""
Yr Mo Dy   RPT   VAL   ROS   KIL   SHA   BIR   DUB   CLA   MUL   CLO   BEL   MAL
61  1  1 15.04 14.96 13.17  9.29   NaN  9.87 13.67 10.25 10.83 12.58 18.50 15.04
61  1  2 14.71   NaN 10.83  6.50 12.62  7.67 11.50 10.04  9.79  9.67 17.54 13.83
61  1  3 18.50 16.88 12.33 10.13 11.17  6.17 11.25   NaN  8.50  7.67 12.75 12.71
"""

'\nYr Mo Dy   RPT   VAL   ROS   KIL   SHA   BIR   DUB   CLA   MUL   CLO   BEL   MAL\n61  1  1 15.04 14.96 13.17  9.29   NaN  9.87 13.67 10.25 10.83 12.58 18.50 15.04\n61  1  2 14.71   NaN 10.83  6.50 12.62  7.67 11.50 10.04  9.79  9.67 17.54 13.83\n61  1  3 18.50 16.88 12.33 10.13 11.17  6.17 11.25   NaN  8.50  7.67 12.75 12.71\n'

   The first three columns are year, month and day.  The
   remaining 12 columns are average windspeeds in knots at 12
   locations in Ireland on that day.   

   More information about the dataset go [here](wind.desc).

### Step 1. Import the necessary libraries

In [18]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from pyspark.sql import Window
import datetime

spark = SparkSession.builder.appName("Wind_Stats").getOrCreate()

### Step 2. Import the dataset from this [address](https://raw.githubusercontent.com/guipsamora/pandas_exercises/master/06_Stats/Wind_Stats/wind.data)

### Step 3. Assign it to a variable called data and replace the first 3 columns by a proper datetime index.

In [57]:
data_url = 'https://raw.githubusercontent.com/guipsamora/pandas_exercises/master/06_Stats/Wind_Stats/wind.data'

import pandas as pd

data = pd.read_csv(data_url, sep = "\s+", parse_dates = [[0,1,2]]) 

data = spark.createDataFrame(data)


In [58]:
data.printSchema()

root
 |-- Yr_Mo_Dy: timestamp (nullable = true)
 |-- RPT: double (nullable = true)
 |-- VAL: double (nullable = true)
 |-- ROS: double (nullable = true)
 |-- KIL: double (nullable = true)
 |-- SHA: double (nullable = true)
 |-- BIR: double (nullable = true)
 |-- DUB: double (nullable = true)
 |-- CLA: double (nullable = true)
 |-- MUL: double (nullable = true)
 |-- CLO: double (nullable = true)
 |-- BEL: double (nullable = true)
 |-- MAL: double (nullable = true)



In [59]:
data.head(5)

[Row(Yr_Mo_Dy=datetime.datetime(2061, 1, 1, 0, 0), RPT=15.04, VAL=14.96, ROS=13.17, KIL=9.29, SHA=nan, BIR=9.87, DUB=13.67, CLA=10.25, MUL=10.83, CLO=12.58, BEL=18.5, MAL=15.04),
 Row(Yr_Mo_Dy=datetime.datetime(2061, 1, 2, 0, 0), RPT=14.71, VAL=nan, ROS=10.83, KIL=6.5, SHA=12.62, BIR=7.67, DUB=11.5, CLA=10.04, MUL=9.79, CLO=9.67, BEL=17.54, MAL=13.83),
 Row(Yr_Mo_Dy=datetime.datetime(2061, 1, 3, 0, 0), RPT=18.5, VAL=16.88, ROS=12.33, KIL=10.13, SHA=11.17, BIR=6.17, DUB=11.25, CLA=nan, MUL=8.5, CLO=7.67, BEL=12.75, MAL=12.71),
 Row(Yr_Mo_Dy=datetime.datetime(2061, 1, 4, 0, 0), RPT=10.58, VAL=6.63, ROS=11.75, KIL=4.58, SHA=4.54, BIR=2.88, DUB=8.63, CLA=1.79, MUL=5.83, CLO=5.88, BEL=5.46, MAL=10.88),
 Row(Yr_Mo_Dy=datetime.datetime(2061, 1, 5, 0, 0), RPT=13.33, VAL=13.25, ROS=11.42, KIL=6.17, SHA=10.71, BIR=8.21, DUB=11.92, CLA=6.54, MUL=10.92, CLO=10.34, BEL=12.92, MAL=11.83)]

### Step 4. Year 2061? Do we really have data from this year? Create a function to fix it and apply it.

In [60]:
def right_century(x):
    return datetime.datetime(x.year - 100, x.month, x.day) if x.year > 1989 else x
        

In [61]:
fix_centuty = F.udf(lambda x: right_century(x), T.DateType())

In [62]:
data = data.withColumn('Yr_Mo_Dy', fix_centuty('Yr_Mo_Dy'))

In [67]:
data.describe().show()

+-------+----+----+----+----+----+------------------+----+----+----+----+------------------+----+
|summary| RPT| VAL| ROS| KIL| SHA|               BIR| DUB| CLA| MUL| CLO|               BEL| MAL|
+-------+----+----+----+----+----+------------------+----+----+----+----+------------------+----+
|  count|6574|6574|6574|6574|6574|              6574|6574|6574|6574|6574|              6574|6574|
|   mean| NaN| NaN| NaN| NaN| NaN| 7.092254335260116| NaN| NaN| NaN| NaN|13.121006997261938| NaN|
| stddev| NaN| NaN| NaN| NaN| NaN|3.9686831153772086| NaN| NaN| NaN| NaN| 5.835037000573735| NaN|
|    min|0.67|0.21| 1.5| 0.0|0.13|               0.0| 0.0| 0.0| 0.0|0.04|              0.13|0.67|
|    max| NaN| NaN| NaN| NaN| NaN|             26.16| NaN| NaN| NaN| NaN|             42.38| NaN|
+-------+----+----+----+----+----+------------------+----+----+----+----+------------------+----+



### Step 5. Set the right dates as the index. Pay attention at the data type, it should be datetime64[ns].

In [63]:
# na

### Step 6. Compute how many values are missing for each location over the entire record.  
#### They should be ignored in all calculations below. 

In [74]:
df_agg = data.agg(*[F.count(F.when(F.isnan(c), c)).alias(c) for c in data.columns[1:]])

In [75]:
df_agg.show()

+---+---+---+---+---+---+---+---+---+---+---+---+
|RPT|VAL|ROS|KIL|SHA|BIR|DUB|CLA|MUL|CLO|BEL|MAL|
+---+---+---+---+---+---+---+---+---+---+---+---+
|  6|  3|  2|  5|  2|  0|  3|  2|  3|  1|  0|  4|
+---+---+---+---+---+---+---+---+---+---+---+---+



### Step 7. Compute how many non-missing values there are in total.

In [76]:
df_agg = data.agg(*[F.count(F.when(~ F.isnan(c), c)).alias(c) for c in data.columns[1:]])

In [77]:
df_agg.show()

+----+----+----+----+----+----+----+----+----+----+----+----+
| RPT| VAL| ROS| KIL| SHA| BIR| DUB| CLA| MUL| CLO| BEL| MAL|
+----+----+----+----+----+----+----+----+----+----+----+----+
|6568|6571|6572|6569|6572|6574|6571|6572|6571|6573|6574|6570|
+----+----+----+----+----+----+----+----+----+----+----+----+



### Step 8. Calculate the mean windspeeds of the windspeeds over all the locations and all the times.
#### A single number for the entire dataset.

In [113]:
new_data = data.na.fill(0)

not_na = data.na.drop()

In [114]:
total = new_data.withColumn("Total", sum([new_data[c] for c in new_data.columns[1:]]))

total_not_na = not_na.withColumn("Total", sum([not_na[c] for c in not_na.columns[1:]]))

In [115]:
total_not_na.agg({'Total': 'sum'}).show()

+-----------------+
|       sum(Total)|
+-----------------+
|802995.0899999997|
+-----------------+



In [111]:
total.agg({'Total': 'sum'}).show()

+-----------------+
|       sum(Total)|
+-----------------+
|806540.2299999997|
+-----------------+



In [123]:
# ???
total_not_na.agg({'Total': 'sum'}).first()[0] / total.agg({'Total': 'sum'}).first()[0]

0.9956045093993637

### Step 9. Create a DataFrame called loc_stats and calculate the min, max and mean windspeeds and standard deviations of the windspeeds at each location over all the days 

#### A different set of numbers for each location.

In [124]:
new_data.describe().show()

+-------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+
|summary|               RPT|               VAL|              ROS|               KIL|               SHA|               BIR|              DUB|               CLA|              MUL|              CLO|               BEL|               MAL|
+-------+------------------+------------------+-----------------+------------------+------------------+------------------+-----------------+------------------+-----------------+-----------------+------------------+------------------+
|  count|              6574|              6574|             6574|              6574|              6574|              6574|             6574|              6574|             6574|             6574|              6574|              6574|
|   mean|12.351703681168235|10.639456951627627|11.65697900821418

### Step 10. Create a DataFrame called day_stats and calculate the min, max and mean windspeed and standard deviations of the windspeeds across all the locations at each day.

#### A different set of numbers for each day.

### Step 11. Find the average windspeed in January for each location.  
#### Treat January 1961 and January 1962 both as January.

### Step 12. Downsample the record to a yearly frequency for each location.

### Step 13. Downsample the record to a monthly frequency for each location.

### Step 14. Downsample the record to a weekly frequency for each location.

### Step 15. Calculate the min, max and mean windspeeds and standard deviations of the windspeeds across all locations for each week (assume that the first week starts on January 2 1961) for the first 52 weeks.