In [1]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql.functions import col, unix_timestamp, to_date
from pyspark.sql import functions as fun
from pyspark.sql.window import Window

from pyspark.sql import SparkSession
from functools import lru_cache

#import sys
#sys.path.append("spark_ts-0.1.0-py2.7.egg")
import spark_ts as spts

@lru_cache(maxsize=None)
def get_spark():
    return (SparkSession.builder
                .master("local")
                .appName("gill")
                .getOrCreate())

spark = get_spark()

# Dataframe schema of INMET station data
schema = StructType([
    StructField("Estacao",StringType()),
    StructField("Data",StringType()),
    StructField("Hora",StringType()),
    StructField("Precipitacao",DoubleType()),
    StructField("TempMaxima",DoubleType()),
    StructField("TempMinima",DoubleType()),
    StructField("Insolacao",DoubleType()),
    StructField("Piche",DoubleType()),
    StructField("TempCompMedia",StringType()),
    StructField("UmidadeRelativaMedia",DoubleType()),
    StructField("VelocidadedoVentoMedia",DoubleType())])

path = "data_basetxt/total.csv"
df = spark.read.csv(path, header=True, sep=";", schema=schema)



As we can see above, the dataframe have multiple null values, does not have a well formed date fild (with Date separated from Hour date). We will use some regular expressions in Pyspark in a map transformation two join Data and Hour in appropriate format.

In [2]:
# Function station_adjusting(df) joins info from columns Data and Hora
# INPUT: Receive a dataframe object definead as the dataframe schema for INMET station data.
# OUTPUT: Returns a dataframe objetc with adjusted "Data" column, in the format  YYYY-MM-DD HH:MM:SS
def station_adjusting(df):
    df = df.withColumn('Data', 
    fun.concat(
            fun.regexp_replace('Data', '/', '-'), 
            fun.lit(" "), 
            fun.regexp_replace(col("Hora") ,  "(\\d{2})(\\d{2})" , "$1:$2:00" )
        )
    )
    
    df = (df
        .withColumn('Data', 
            fun.to_timestamp(col('Data'), 'dd-MM-yyyy HH:mm')
            .cast("timestamp")
            )
        )
    
    df = df.drop('Hora')
    
    return df
    
adjusted = station_adjusting(df)

For the ETCCDI indices, we just need Preciptation, TempMax and TempMin columns. Lets select them in a new 

In [3]:
df = adjusted.select(["Estacao", "Data", "TempMinima"]).where(fun.hour("Data") == 12).where(col("Estacao") == 82294)

In [4]:
#df.transform(summary)
df.transform(spts.summary)

TempMinima_miss 0.05
Estacao_miss 0.0
Data_miss 0.0


In [5]:
df.transform(spts.gaps("Data"))

+-------------------+-------------------+--------+
|               Data|         lagged_col|diff_col|
+-------------------+-------------------+--------+
|1976-03-04 12:00:00|1963-06-30 12:00:00|    4631|
|1982-01-01 12:00:00|1980-12-31 12:00:00|     366|
|2001-09-01 12:00:00|2001-03-31 12:00:00|     154|
|2001-03-01 12:00:00|2000-09-30 12:00:00|     152|
|2010-12-29 12:00:00|2010-10-01 12:00:00|      89|
|1962-10-01 12:00:00|1962-07-30 12:00:00|      63|
|1990-01-01 12:00:00|1989-10-31 12:00:00|      62|
|2003-08-01 12:00:00|2003-06-30 12:00:00|      32|
|1986-08-01 12:00:00|1986-06-30 12:00:00|      32|
|1989-04-01 12:00:00|1989-02-28 12:00:00|      32|
|2005-09-01 12:00:00|2005-07-31 12:00:00|      32|
|1987-02-01 12:00:00|1986-12-31 12:00:00|      32|
|1989-06-01 12:00:00|1989-04-30 12:00:00|      32|
|1992-01-01 12:00:00|1991-11-30 12:00:00|      32|
|2006-08-01 12:00:00|2006-06-30 12:00:00|      32|
|2007-08-01 12:00:00|2007-06-30 12:00:00|      32|
|2009-08-01 12:00:00|2009-06-30

DataFrame[Estacao: string, Data: timestamp, TempMinima: double]

In [6]:
df.transform(spts.outliars("TempMinima")).show()

+-------+-------------------+----------+------------------+------------------+------------------+------------------+
|Estacao|               Data|TempMinima|         movingAvg|         movingStd|          maxLimit|          minLimit|
+-------+-------------------+----------+------------------+------------------+------------------+------------------+
|  82294|1961-01-08 12:00:00|      20.7|              20.7|               NaN|               NaN|               NaN|
|  82294|1961-01-09 12:00:00|      20.9|20.799999999999997| 0.141421356237309|21.224264068711925| 20.37573593128807|
|  82294|1961-01-10 12:00:00|      22.3|21.299999999999997|0.8717797887081367|23.915339366124407|18.684660633875588|
|  82294|1961-01-11 12:00:00|      21.7|              21.4|0.7393691004272963| 23.61810730128189| 19.18189269871811|
|  82294|1961-01-12 12:00:00|      23.1|             21.74|0.9939818911831362|24.721945673549406| 18.75805432645059|
|  82294|1961-01-13 12:00:00|      22.9|21.933333333333334| 1.00

In [9]:
df.transform(spts.interp("TempMinima", 1)).show()

+-------+-------------------+----------+
|Estacao|               Data|TempMinima|
+-------+-------------------+----------+
|  82294|1961-01-08 12:00:00|      20.7|
|  82294|1961-01-09 12:00:00|      20.9|
|  82294|1961-01-10 12:00:00|      22.3|
|  82294|1961-01-11 12:00:00|      21.7|
|  82294|1961-01-12 12:00:00|      23.1|
|  82294|1961-01-13 12:00:00|      22.9|
|  82294|1961-01-14 12:00:00|      22.9|
|  82294|1961-01-15 12:00:00|      15.1|
|  82294|1961-01-16 12:00:00|      22.7|
|  82294|1961-01-17 12:00:00|      20.1|
|  82294|1961-01-18 12:00:00|      19.7|
|  82294|1961-01-19 12:00:00|      20.3|
|  82294|1961-01-20 12:00:00|      21.1|
|  82294|1961-01-21 12:00:00|      20.5|
|  82294|1961-01-22 12:00:00|      20.7|
|  82294|1961-01-23 12:00:00|      22.5|
|  82294|1961-01-24 12:00:00|      22.5|
|  82294|1961-01-25 12:00:00|      19.1|
|  82294|1961-01-26 12:00:00|      19.9|
|  82294|1961-01-27 12:00:00|      21.1|
+-------+-------------------+----------+
only showing top

In [10]:
df.transform(spts.dateLimits("Data"))

+-------------------+-------------------+
|             min_ts|             max_ts|
+-------------------+-------------------+
|1961-01-08 12:00:00|2011-03-17 12:00:00|
+-------------------+-------------------+

