In [11]:
weatherDS = spark.read.csv("../input/weather.csv", header=True)

In [12]:
weatherDS.createOrReplaceTempView("weatherDS")
weatherDS = sqlContext.sql("SELECT * FROM weatherDS WHERE Station = 1")
weatherDS.createOrReplaceTempView("weatherDS")

In [13]:
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import IntegerType

def hhmmTo24Sec(hhmmStr):
    h = int(hhmmStr[0:2])
    m = int(hhmmStr[2:4])
    return h*3600 + m*60






#weatherDS = weatherDS.select([hhmmTo24SecUDF(c).alias(c) if c in targetColumns else c for c in weatherDS.columns])
#weatherDS = weatherDS.selectExpr(["*", "YEAR(CAST(weatherDS.Date AS DATE)) year", 
#                                        "MONTH(CAST(weatherDS.Date AS DATE)) month"]
#                                                   ).alias("weatherDS")




In [14]:
def extractDateFeatures(weatherDF):
    
    yearColDef = "YEAR(CAST(Date AS DATE)) year"
    monthColDef = "MONTH(CAST(Date AS DATE)) month"
    weatherDF = weatherDF.selectExpr(["*", yearColDef , monthColDef ])
    return weatherDF


def hhmmTo24SecDf(weatherDF, inputColumns=["Sunrise", "Sunset"]):

    targetColumns = set(inputColumns)
    hhmmTo24SecUDF = UserDefinedFunction(lambda x: hhmmTo24Sec(x), IntegerType())
    weatherDF = weatherDF.select([hhmmTo24SecUDF(c).alias(c) if c in targetColumns else c for c in weatherDS.columns])
    return weatherDF


weatherDS = extractDateFeatures(weatherDS)
weatherDS = hhmmTo24SecDf(weatherDS)
weatherDS = weatherDS.alias("weatherDS")

In [15]:
sqlContext.sql("SELECT * FROM weatherDS").show()

+-------+----------+----+----+----+------+--------+-------+----+----+-------+------+-------+-----+------+--------+-----------+-----------+--------+-----------+---------+--------+
|Station|      Date|Tmax|Tmin|Tavg|Depart|DewPoint|WetBulb|Heat|Cool|Sunrise|Sunset|CodeSum|Depth|Water1|SnowFall|PrecipTotal|StnPressure|SeaLevel|ResultSpeed|ResultDir|AvgSpeed|
+-------+----------+----+----+----+------+--------+-------+----+----+-------+------+-------+-----+------+--------+-----------+-----------+--------+-----------+---------+--------+
|      1|2007-05-01|  83|  50|  67|    14|      51|     56|   0|   2|   0448|  1849|       |    0|     M|     0.0|       0.00|      29.10|   29.82|        1.7|       27|     9.2|
|      1|2007-05-02|  59|  42|  51|    -3|      42|     47|  14|   0|   0447|  1850|     BR|    0|     M|     0.0|       0.00|      29.38|   30.09|         13|        4|    13.4|
|      1|2007-05-03|  66|  46|  56|     2|      40|     48|   9|   0|   0446|  1851|       |    0|     M|

I know what columns should be numeric but I don't know what non numeric values are there. I am assuming that 
there should be only 
1. M for missing
2. T for traces 

However we better check

In [16]:
continuosColums = set(["Tmax", "Tmin", "Tavg", "Depart", "DewPoint", "WetBulb", "Heat", "Cool", 
                       "Sunrise", "Sunset", "Depth", "SnowFall", "PrecipTotal", 
                      "StnPressure", "SeaLevel", "ResultSpeed", "ResultDir", "AvgSpeed"])

def nonNumericValues(df, colName):
         
        resultSet = df.select(colName).filter("TRIM({}) not rlike '^[0-9\-]' ".format(colName)).distinct().collect()
        return [x[colName] for x in resultSet]


columnProperties = {}   
for x in continuosColums:
     l = nonNumericValues(weatherDS, x)
     columnProperties[x] = {"nonNumericValues": l}   



In [17]:
import pandas as pd

pd.DataFrame(columnProperties)

Unnamed: 0,AvgSpeed,Cool,Depart,Depth,DewPoint,Heat,PrecipTotal,ResultDir,ResultSpeed,SeaLevel,SnowFall,StnPressure,Sunrise,Sunset,Tavg,Tmax,Tmin,WetBulb
nonNumericValues,[],[],[],[],[],[],[ T],[],[],[M],[ T],[M],[],[],[],[],[],[M]


The replacement stragegy is pretty simple:

* We replace T with the minumn value ever recored 
* We replace M with the average for that month ( we could replace with the average for that week of the month or any other time interval )

In [18]:
from pyspark.sql.functions import col 

defaultCols = weatherDS.columns 


for columnName, properties in columnProperties.items() : 
    nonNumericValues = properties['nonNumericValues']
    if(len(nonNumericValues)) != 0:
         # Filter  needed
         minVal = weatherDS.selectExpr("MIN(CAST( {} AS DOUBLE)) min_val"\
                                       .format(columnName)).collect()[0]['min_val']
            
         weatherDS.createOrReplaceTempView("weatherDS")   
         avgValDF = sqlContext.sql("""
         SELECT 
         year,
         month, 
         AVG({}) avg_val
         FROM weatherDS 
         GROUP BY 1,2
         """.format(columnName))
         avgValDF = avgValDF.alias("avgValDF")
         weatherDS = weatherDS.join(avgValDF, (weatherDS.year == avgValDF.year) &
                                   (weatherDS.month == avgValDF.month),
                        "left").\
         selectExpr(["weatherDS.*" , """CAST(CASE WHEN TRIM({0}) == 'T' THEN {1} 
                                WHEN TRIM({0}) == 'M' THEN avg_val
                                ELSE TRIM({0})
                                END AS DOUBLE
                           ) new_{0}     
                 """.format(columnName, minVal)
                   ])
         
    else:
        weatherDS = weatherDS.selectExpr(["weatherDS.*", "CAST({0} AS DOUBLE) new_{0}".format(columnName)])
     
    
    weatherDS = weatherDS.drop(columnName).\
                 select([col("new_{}".format(columnName)).alias(columnName) if x == columnName else x for x in defaultCols]) 
    weatherDS = weatherDS.alias("weatherDS")
    
            



In [19]:
weatherDS = weatherDS.selectExpr(["*", "CAST(Date as DATE) AS measureDate"])
weatherDS = weatherDS.alias("weatherDS")


In [20]:
weatherDS.show()

+-------+----------+----+----+----+------+--------+-------+----+----+-------+-------+-------------+-----+------+--------+-----------+-----------+--------+-----------+---------+--------+----+-----+-----------+
|Station|      Date|Tmax|Tmin|Tavg|Depart|DewPoint|WetBulb|Heat|Cool|Sunrise| Sunset|      CodeSum|Depth|Water1|SnowFall|PrecipTotal|StnPressure|SeaLevel|ResultSpeed|ResultDir|AvgSpeed|year|month|measureDate|
+-------+----------+----+----+----+------+--------+-------+----+----+-------+-------+-------------+-----+------+--------+-----------+-----------+--------+-----------+---------+--------+----+-----+-----------+
|      1|2007-06-01|85.0|62.0|74.0|  10.0|    63.0|   66.0| 0.0| 9.0|15540.0|69600.0|   TSRA BR HZ|  0.0|     M|     0.0|       0.19|      29.19|   29.91|        4.2|     17.0|     6.4|2007|    6| 2007-06-01|
|      1|2007-06-02|81.0|65.0|73.0|   9.0|    63.0|   67.0| 0.0| 8.0|15540.0|69600.0|        RA BR|  0.0|     M|     0.0|       0.04|      29.03|   29.77|        6.

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import lag, avg

lagWindow = Window.partitionBy("Station").orderBy("measureDate")

for columnName in continuosColums:
    for lagLevel in [1, 3, 8, 12]:
        
        weatherDS.createOrReplaceTempView("weatherDS")
        avgValDF = sqlContext.sql("""
                 SELECT 
                 year,
                 month, 
                 AVG({}) avg_val
                 FROM weatherDS 
                 GROUP BY 1,2
                 """.format(columnName))
        avgValDF = avgValDF.alias("avgValDF")

        weatherDS = weatherDS.select(["*", lag(columnName, 1).over(lagWindow).alias("{0}_lag_{1}".format(columnName, lagLevel))]).\
        join(avgValDF, (weatherDS.year == avgValDF.year) &
                       (weatherDS.month == avgValDF.month),
             "left").\
        selectExpr(["weatherDS.*", 
                    "CASE WHEN {0}_lag_1 IS NULL THEN avgValDF.avg_val ELSE {0}_lag_{1} END as {0}_lag_{1}".format(columnName, lagLevel)
                   ])
        weatherDS = weatherDS.alias("weatherDS")


In [None]:
weatherDS.show()
