# Przygotowanie danych
## Pobranie danych z kafki i przygotowanie dataframe 

In [1]:
binary_data = spark.readStream.format('kafka'). \
    option('kafka.bootstrap.servers', 'master:9092,slave01:9092,slave02:9092,slave03:9092,slave04:9092,slave05:9092'). \
    option('subscribe', 'final_open_weather_mw_v2'). \
    option('startingOffsets', 'earliest'). \
    load()

In [2]:
dataframe = binary_data.selectExpr('CAST(key AS STRING)', "split(value, '[|]')[0] as current_temp", "split(value, '[|]')[1] as feels_like", 'partition', 'offset', 'timestamp')

## Zamienienie kolumny 'current_temp' i 'feels_like' z typu string na double

In [3]:
from pyspark.sql.types import DoubleType
dataframe = dataframe.withColumn('current_temp', dataframe.current_temp.cast(DoubleType()))
dataframe = dataframe.withColumn('feels_like', dataframe.feels_like.cast(DoubleType()))

## Potrzebuję teraz jeszcze dodatkowej kolumny "dnia" do późniejszych operacji

In [4]:
dataframe = dataframe.withColumn('yyyy_mm_dd', dataframe.timestamp.substr(1, 10)) #Wyciągnięcie yyyy-mm-dd

### Filtracja danych, biorę tylko te dane, które został odebrane w ciągu ostatnich 5 dni. Zostało to rozwiązane na dwa sposoby - I zakomentowany polega na różnicy timestampa otrzymanego z akutalnym i przyrównania go do:
### 5 dni na s -> 432000

### Drugi sposób to wykorzystanie metody 'DataDiff', a następnie na podsatwie tej wartości określamy ostatnie 5 dni

In [5]:
from pyspark.sql import functions as F
#dataframe = dataframe.withColumn('timestamp_in_ms', F.unix_timestamp(dataframe.timestamp)).withColumn('timestamp_current', F.unix_timestamp(F.current_timestamp()))
#dataframe = dataframe.filter((dataframe.timestamp_current - dataframe.timestamp_in_ms) <= 432000)

dataframe = dataframe.withColumn('date_diff', F.datediff(F.current_timestamp(), dataframe.yyyy_mm_dd))
dataframe = dataframe.filter((dataframe.date_diff >= 1) & (dataframe.date_diff <= 5))


## Wyznaczenie dla każdego rekordu różnicę między temp. Aktualną, a temperaturą odczuwalną
## Dodatkowo wyciągnięcie z niej wartości bezwzględnej

In [6]:
from  pyspark.sql.functions import abs
dataframe = dataframe.withColumn('temp_diff', abs(F.round((dataframe.current_temp - dataframe.feels_like), 2)))

In [7]:
ds = dataframe.writeStream.format('memory').queryName('weather_data_v2').outputMode('append').start()

In [11]:
spark.sql('SELECT w.key, w.yyyy_mm_dd, w.temp_diff_avg, w.rank \
           FROM ( \
               SELECT q.key, q.yyyy_mm_dd, q.temp_diff_avg, RANK() OVER (PARTITION BY q.yyyy_mm_dd ORDER BY q.temp_diff_avg DESC) as rank\
               FROM ( \
                   Select key, Round(avg(temp_diff),2) as temp_diff_avg, yyyy_mm_dd \
                   FROM weather_data_v2 \
                   Group by key, yyyy_mm_dd) as q \
                ) as w \
            WHERE w.rank <= 3 \
            ORDER BY yyyy_mm_dd, rank \
            ').show()
          

+------+----------+-------------+----+
|   key|yyyy_mm_dd|temp_diff_avg|rank|
+------+----------+-------------+----+
|Warsaw|2020-05-25|          4.8|   1|
|Krakow|2020-05-25|         3.81|   2|
+------+----------+-------------+----+



In [9]:
spark.sql('SELECT * from weather_data_v2\
            ').show(800)


+------+------------+----------+---------+------+-------------------+----------+---------+---------+
|   key|current_temp|feels_like|partition|offset|          timestamp|yyyy_mm_dd|date_diff|temp_diff|
+------+------------+----------+---------+------+-------------------+----------+---------+---------+
|Warsaw|      288.41|    283.56|        0|     0|2020-05-25 13:24:16|2020-05-25|        0|     4.85|
|Warsaw|      288.91|    284.16|        0|     1|2020-05-25 13:34:27|2020-05-25|        0|     4.75|
|Krakow|      285.74|    281.93|        0|     2|2020-05-25 13:41:04|2020-05-25|        0|     3.81|
+------+------------+----------+---------+------+-------------------+----------+---------+---------+



In [12]:
spark.sql('SELECT * from weather_data_v2\
            ').show(800)


+------+------------+----------+---------+------+-------------------+----------+---------+---------+
|   key|current_temp|feels_like|partition|offset|          timestamp|yyyy_mm_dd|date_diff|temp_diff|
+------+------------+----------+---------+------+-------------------+----------+---------+---------+
|Warsaw|      288.41|    283.56|        0|     0|2020-05-25 13:24:16|2020-05-25|        0|     4.85|
|Warsaw|      288.91|    284.16|        0|     1|2020-05-25 13:34:27|2020-05-25|        0|     4.75|
|Krakow|      285.74|    281.93|        0|     2|2020-05-25 13:41:04|2020-05-25|        0|     3.81|
+------+------------+----------+---------+------+-------------------+----------+---------+---------+

