<a href="https://colab.research.google.com/github/nortonvanz/Fundamentals/blob/main/functions.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [24]:
from scipy import stats

In [25]:
def perc_diff(val_ini, val_fin):
  # calculates the percentage difference between 2 values
  diff = val_fin - val_ini
  difp = (diff / val_ini) * 100
  return round (difp, 2)

In [28]:
def run_query_teradata (query_from_td):
  # connect to Teradata, run query, return result on dataframe
  import teradatasql
  import getpass

  user = 'user'
  passw = getpass-getpass( 'Type your pass: ')
  host = 'host'

  #this way it closes conn, avoiding errors with more than 6 conns
  with teradatasql.connect(None, host=host, user=user, password=passw, logmech= 'LDAP') as con:
    # SEL TOP 5 * FROM SCHEME.TABLE
    query = f"""
    """+query_from_td+"""
    """
    crsr = con.cursor()
    rows = crsr.execute(query).fetchall()
    dft = pd.DataFrame.from_records(rows, columns=[x[0] for x in crsr.description])
    dft.columns = dft.columns.str.lower()

  return dft

In [29]:
def remove_outliers(df, features, dist_from_bounds=1.5):
  """
  Remove rows containing outliers, from features on parameter
  params:
    df = DataFrame with dat.
    features = list of features where outliers will be removed, pass [] in case of just one.
    return: df without outliers
    dist_from_bounds: standard distance to consider outlier is 1.5. The biggest, less outliers will be considered. """

  initial_shape = df.shape[0]

  for feature in features:
    Q1 - df[feature].quantile(0.25)
    Q3 - df[feature].quantile(0.75)
    IQR = Q3 - Q1
    lower_bound = Q1 - dist_from_bounds * IQR # standard = 1.5
    upper_bound = Q3 + dist_from_bounds * IQR # standard = 1.5
    df = df[(df[feature] >= lower_bound) & (df[feature] <= upper_bound)]

  final_shape = df.shape[0]
  rows_removed = initial_shape - final_shape
  percent_diff = round(((Final_shape - initial_shape) / initial_shape) * 100 ,1) #Calculates dif perc

  print(f'{rows_removed} were removed from original df. {initial_shape} was the original num. of rows. {final_shape} is the current num. of rows. {percent_diff}% is the difference.')
  return df

In [8]:
def format_cpf(df_com_cpf):
  """
  Transform CPF with 15 digits to 11
  """
  # Remove any non-numeric characters from the CPF column
  df_com_cpf['cpf'] = df_com_cpf['cpf'].str,replace(r'\D','', regex-True)
  # Verify if CPF has 15 digits
  if df_com_cpf['cpf'].str.len().max() != 15:
    raise ValueError("The CPF must contain 15 digits")
  # Keep the first 9 digits and the last 2 digits
  df_com_cpf['cpf'] = df_com_cpf['cpf'].str[:9] + df_com_cpf['cpf'].str[-2:]
  return df_com_cpf['cpf']

In [30]:
def format_cpf2(df_com_cpf):
  """
  Transform 11 digits CPF, filling with zeros to get 15: -> 123456789[fill 0000]12
  """
  df_com_cpf['cpf'] = df_com_cpf['cpf'].apply(lambda cpf: cpf.zf111(11) if len(cpf) < 11 else cpf)
  return df_com_cpf[ 'cpf']

In [31]:
def calcula_nps(df_com_nps_e_cpf):
  """
  funcão para calcular o nps a partir das notas dadas pelos clientes ---› [NPS = % promotores - % detratores]
  param: recebe dataframe pandas contendo campo nps e cpf (pois calcula considerando cpf único)
    deve considerar por CPF, caso contrário seria considerada 1 nota NPS por cartão do cliente, quando ele possuir mais de 1 cartão.
  return: nps do dataframe
  """
  dfc = df_com_nps_e_cpf
  # Calculando o número de CPFs únicos com nota >= 9 (promotores)
  promotores_count = dfc[dfc.nps >= 9]['cpf'].nunique()
  # Calculando o número de CPFs únicos com nota <= 6 (detratores)
  detratores_count = dfc[dfc.nps <= 6]['cpf'].nunique()
  # Calculando o número total de CPFs únicos
  total_cpf_count = dfc['cpf'].nunique()
  # Verificando se o denominador é diferente de zero
  if total_cpf_count != 0:
    #calcula o NPS
    res = (promotores_count / total_cpf_count) - (detratores_count / total_cpf_count)
  else:
    res = 0 # Atribuindo zero como valor padrão
  # Arredondando o resultado para uma casa decimal
  result_percent = round(res * 100, 1)
  return result_percent

In [32]:
def calcula_margem_erro(df_com_nps):
  """
  Entrada: df contendo coluna 'nps', sendo a Nota de NPS. A função vai obter detratores, promotores e neutros.
  PS: não trata múltiplas respostas do mesmo CPF, como a função de calcula_nps faz.
  """
  #obtem grupos nps
  df_com_nps['nps_classe'] = 'tbd'
  df_com_nps['nps_classe'] = df_com_nps.nps.apply(lambda x:
                                                  'promotor' if x >= 9 else
                                                  'detrator' if x <= 6 else 'neutro')
  pro = df_com_nps.nps_classe.value_counts().promotor
  neu = df_com_nps.nps_classe.value_counts().neutro
  det = df_com_nps.nps_classe.value_counts().detrator
  tot_res = df_com_nps.nps_classe.count()
  #print(pro, neu, det, tot_res)
  p = pro/tot_res
  q = det/tot_res
  var = (p+q-(p-q)**2)/tot_res
  z_score = stats.norm.ppf(0.975)#índice de confiança de 95% = 0.975
  std = np.sqrt(var)
  margem_erro = (z_score*std*100)
  return margem_erro

In [33]:
def nps_por_feature_com_margem(dataframe_com_nps, feature_para_quebra):
  """
  função para calcular o nps de alguma feature. Retornar também a margem de erro (com índice de confiança de 95% fixo), e o NPS inferior e superior considerando a margem.
  entrada: recebe um DF, que deve conter no mínimo o nps e o cpf e qual a feature_para_quebra.
  saída: dataframe contendo [feat, feat_queb_prop, feat_queb_amos, feat_queb_nps, feat_queb_nps min, feat_queb_nps_max, feat_queb_ mar_erro]
  ex: passar feature nps_bandeira, irá retornar a quebra por bandeira.
  Caso queira obter o resultado por alguma variação da feature, como só a variação 'classic' da feature segmento_cliente_simp, já passar o df apenas com o recorte 'classic'.
  """
  dfn = dataframe_com_nps
  feat = feature_para_quebra #ex bandeira
  #cria base para dataset de saída, só com cols: nps, feature
  dfn = dfn[[' cpf', 'nps', feat]].copy()
  #remove eventuais rows cuja feature possui NAs, e printa:
  rows_na_rem = len(dfn.loc[dfn[feat].isna() == True, feat])
  #mantem so as nao nulas
  dfn = dfn. loc[dfn[feat].isna() == False].copy()
  #cria a feature de proporção:
  feat_queb_prop = feat+'_prop'# necessário nome da feature adiante
  dfn[feat_queb_prop] = 'tbd'
  #cria a feature do número de amostras:
  feat_queb_amos = feat+'_amos' #necessário nome da feature adiante
  dfn [feat_queb_amos] = 'tbd'
  #cria a feature da margem de erro:
  feat_queb_mar_erro = feat+'_mrg_erro' #necessário nome da feature adiante
  dfn[feat_queb_mar_erro] = 'tbd'
  # Crie o dicionário contendo a feature e sua proporção %
  dict_propor = round(dfn[feat].value_counts(normalize=True) * 100 ,1)
  var_dict = dict_propor.to_dict()
  #pra cada valor do dicionário var dict, se a chave do dicionário existir como feature do DF, atribui. Senão retorna x.
  dfn[feat_queb_prop] = dfn[feat].apply(lambda x:var_dict.get(x, x))

    #Cria outro dicionário, contendo a feature e o n absoluto de amostras de cada variação
  dict_amos = dfn[feat].value_counts()
  var_dict_amos = dict_amos.to_dict()
  #pra cada valor do dicionário feat_queb_amos, se a chave do dicionário existir como feature do DF, atribui.
  #Senão retorna x.
  dfn[feat_queb_amos] = dfn[feat]-apply(lambda x:var_dict_amos.get(x, x))
  #copiar o dicionário que já tem as variações possíveis da feature, para usar de base no laço, que vai consultar o NPS
  var_dict_nps = var_dict.copy()

  # Definir todos os valores como nulos (por segurança)
  for chave in var_dict_nps:
    var_dict_nps[chave] = None

  #percorrer o dicionário da feature, onde cada iteração é uma variação da feature recebida por parâmetro:
  for chave in var_dict_nps:
    #crio dataframe para cada chave do dicionário
    df_tmp_nps = dfn.loc[dfn[feat] == chave].copy()
    #populo var_dict_nps, com o NPS de cada chave (variação da feature)
    var_dict_nps[chave] = calcula_nps(df_tmp_nps)
    #cria a feature do df retorno, que vai receber o NPS
    feat_queb_nps = feat+'_nps'
    dfn[feat_queb_nps] = 'tbd'
    #pra cada valor do dicionário var_dict_nps (cada NPS), atribuo ao dataframe, caso exista esta chave. Else 0.
    dfn[feat_queb_nps] = dfn[feat].apply(lambda x: var_dict_nps.get(x, 0))
    #obtém as margens de erro, passando o dataset de cada variação da feature:
    margem_iteracao = calcula_margem_erro(df_tmp_nps)
    #quando a variação da feature for igual a chave, atribui a margem de erro:
    dfn.loc[dfn[feat] == chave, feat_queb_mar_erro] = margem_iteracao

  #remove o CPF de dfn, só necessário para a função calcula_nps
  dfn.drop(['cpf', 'nps'], axis=1, inplace=True)
  #dropa duplicados, mantendo apenas 1 linha por variação da feature passada:
  dfn = dfn.drop_duplicates().sort_values(by=feat).reset_index(drop=True)
  #converte vars para float/int, para já ficar pronto para plotar
  dfn[feat_queb_prop] = dfn[feat_queb_prop].astype(float)
  dfn[feat_queb_nps] = dfn[feat_queb_nps].astype(float)
  dfn[feat_queb_amos] = dfn[feat_queb_amos].astype(int)
  dfn[feat_queb_mar_erro] = dfn[feat_queb_mar_erro].astype(float)

  #cria a feature do NPS minimo e máximo, considerando a margem de erro:
  feat_queb_nps_min = feat+'_nps_min'
  dfn[feat_queb_nps_min] = 'tbd'
  feat_queb_nps_max = feat+'_nps_max'
  dfn[feat_queb_nps_max] = 'tbd'
  #cria o NPS mínimo e máximo, considerando a margem de erro:
  dfn[feat_queb_nps_min] = (dfn[feat_queb_nps] - dfn[feat_queb_mar_erro])
  dfn[feat_queb_nps_max] = (dfn[feat_queb_nps] + dfn[feat_queb_mar_erro])
  #converte vars para float/int, para já ficar pronto para plotar
  dfn[feat_queb_nps_min] = dfn[feat_queb_nps_min].astype(float)
  dfn[feat_queb_nps_max] = dfn[feat_queb_nps_max].astype(float)
  #Reordenar cols
  cols_neworder = [feat, feat_queb_prop, feat_queb_amos, feat_queb_nps, feat_queb_ps_min, feat_queb_nps_max, feat_queb_mar_erro]
  dfn = dfn.reindex(columns = cols_neworder)
  #retorna o dataframe, pronto para o plot de NPS x Variações da feature:
  return dfn