<a href="https://colab.research.google.com/github/neon-belfante/covid-19-time-series-analysis/blob/main/Utils.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Imports


In [None]:
pip install colour

Collecting colour
  Downloading colour-0.1.5-py2.py3-none-any.whl (23 kB)
Installing collected packages: colour
Successfully installed colour-0.1.5


In [None]:
pip install pyspark

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('teste').getOrCreate()

In [None]:
import pandas as pd
import numpy as np
import math
from colour import Color
from plotly.subplots import make_subplots
import plotly.express as px

pd.set_option('display.max_rows', 1000)

In [None]:
import pyspark.pandas as pypd
import pyspark
import pyspark.sql.functions as sql
from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [None]:
#Timeseries
import seaborn as sns
from matplotlib import pyplot as plt
from pandas.plotting import autocorrelation_plot
from statsmodels.tsa.seasonal import seasonal_decompose

# Functions

In [None]:
# Color Palettes utils
def colorToHex(color):
  return color.hex

def colorRampPalette(n: int, color_list: list):
  color_ramp_list = [Color(color_list[0])]
  n_colors = len(color_list)
  n_i = math.ceil((n - n_colors) / (n_colors - 1)) + 2
  for i, color_i in enumerate(color_list[1:]):
    color_ramp = list(Color(color_list[i]).range_to(Color(color_i), n_i))
    color_ramp_list = color_ramp_list + color_ramp[1:]
  return list(map(colorToHex, color_ramp_list[:n]))

def colorRampPaletteFromDfColumn(df: pd.DataFrame, column: str, color_list: list):
  n = len(df[column].unique())
  return colorRampPalette(n, color_list)

In [None]:
#graph Utils

#Correlation Matrix
def simpleCorrPlot(df: pd.DataFrame):
  cor_table = df.corr()
  fig = px.imshow(cor_table, text_auto=True, template = 'none', zmax = 1, zmin=-1)
  fig.update_traces(texttemplate = '%{z:0.1%}')
  fig.update_coloraxes(colorscale=[[0, 'rgb(255,102,102)'], [0.5, 'rgb(255,255,255)'], [1, 'rgb(102,102,255)']])
  return fig

#Autocorrplot
def plotly_autocorr(series, title = ''): 
  ax =autocorrelation_plot(series)
  corr_x = ax.lines[5].get_data()[0]
  corr_y = ax.lines[5].get_data()[1]
  interval_1 = ax.lines[0].get_data()[1][0]
  interval_2 = ax.lines[1].get_data()[1][0]
  plt.close()
  fig = px.line(x = corr_x, y= corr_y, template='seaborn', width=880)
  fig.update_traces(line_color='#9BCD9B')
  fig.update_yaxes(range=[-1,1], title='Autocorrelation')
  fig.update_xaxes(title='Lag')
  fig.update_layout({'title':{'text': title, 'font':{'size': 18}, 'x':0.09, 'y': 0.93}})
  fig.add_hline(y = interval_1, line_dash='dash', line_width =1.5, opacity=1, line_color='grey')
  fig.add_hline(y = -interval_1, line_dash='dash', line_width =1.5, opacity=1, line_color='grey')
  fig.add_hline(y = 0, line_width =1.0, opacity=1, line_color='grey')
  fig.add_hline(y = interval_2, line_width =1.5, opacity=1, line_color='grey')
  fig.add_hline(y = -interval_2, line_width =1.5, opacity=1, line_color='grey')
  return fig

#Seasonal Decompose
def plot_seasonal_decompose(dataset, x, y, freq, titulo=''):
  resultado = seasonal_decompose(dataset[y], period = freq)
  data = ({
      'timeline' : dataset[x],
      'observacao' : resultado.observed,
      'tendencia' : resultado.trend,
      'sazonalidade' : resultado.seasonal,
      'ruido' : resultado.resid.round(2),
  })
  resultado_dt = pd.DataFrame(data)
  fig = make_subplots(rows = 4, cols = 1)
  fig1 = px.line(resultado_dt, x = 'timeline', y='observacao'  , template = 'seaborn')
  fig2 = px.line(resultado_dt, x = 'timeline', y='tendencia'   , template = 'seaborn')
  fig3 = px.line(resultado_dt, x = 'timeline', y='sazonalidade', template = 'seaborn')
  fig4 = px.line(resultado_dt, x = 'timeline', y='ruido'       , template = 'seaborn')
  fig.add_trace(fig1['data'][0], row=1, col=1)
  fig.add_trace(fig2['data'][0], row=2 , col=1)
  fig.add_trace(fig3['data'][0], row=3 , col=1)
  fig.add_trace(fig4['data'][0], row=4 , col=1)
  fig.update_traces(line_color = '#9BCD9B')
  fig.update_layout({'title': {'text': titulo,
                               'font': {'size': 18}, 'x': 0.09, 'y': 0.91},
                    'yaxis' : {'title':{'text': 'Observação'   , 'font': {'size': 14}}},
                    'yaxis2': {'title':{'text': 'Tendência'    , 'font': {'size': 14}}},
                    'yaxis3': {'title':{'text': 'Sazonalidade' , 'font': {'size': 14}}},
                    'yaxis4': {'title':{'text': 'Ruído'        , 'font': {'size': 14}}},
                    'xaxis4': {'title':{'text': x.capitalize() , 'font': {'size': 14}}}
                     })
  fig.update_layout(height = 900, width = 800)
  return fig

# PySpark Funcs

In [None]:
def withBands(bands_thresholds: list, column_name: str):
  def resultfunc(df: DataFrame):
    alpha = "abcdefghijklmnopqrstuvwxyz"
    alpha_list = list()
    alpha_list[:0] = alpha
    alpha_list_double = list()
    for i in alpha_list:
      for j in alpha_list:
        alpha_list_double.append(i+j)

    if len(bands_thresholds) > 26:
      alpha_list = alpha_list_double

    df = df.withColumn(column_name + "_bands", 
                  when(col(column_name) <= bands_thresholds[0], 
                       alpha_list[0]+"_below_"+str(int(bands_thresholds[0]))))

    for n, threshold in enumerate(bands_thresholds):
      if n+1 < len(bands_thresholds):
       df = df.withColumn(column_name + "_bands", 
                    when((col(column_name) > bands_thresholds[n]) 
                       & (col(column_name) <= bands_thresholds[n+1]), 
                         alpha_list[n+1]+"_"+str(int(bands_thresholds[n])+1)+"_"+str(int(bands_thresholds[n+1])))
                    .otherwise(col(column_name + "_bands")))
      else:
        df = df.withColumn(column_name + "_bands", 
                    when(col(column_name) > bands_thresholds[n], 
                         alpha_list[n+1]+"_above_"+str(int(bands_thresholds[n])))
                    .otherwise(col(column_name + "_bands")))
    return df
  return resultfunc

def withDummies(column_name: str):
  def resultfun(df: DataFrame):
    unique_values = [row[0] for row in df.select(column_name).dropDuplicates().orderBy(column_name).collect()]
    for value in unique_values:
      df = df.withColumn(column_name+"_"+str(value), when(col(column_name) == value, 1).otherwise(0))
    return df
  return resultfun