In [367]:
import pandas as pd
import chardet
import json
import requests
import vertica_python

Получение списка всех групп из Vertica - <b>запрос</b>:

In [368]:
def get_groups_from_vertica():
    conn_info = {'host': 'vertica-dwh-proxy',
             'port': 5435,
             'user': 'sakornilova',
             'password': 'YaJZtNfel2P8TSH',
             'database': 'DWH'
            }

    def executeScriptsFromFile(filename, connection_cursor):

        fd = open(filename, 'r')
        sqlFile = fd.read()
        fd.close()

        sqlCommands = sqlFile.split(';')

        for command in sqlCommands:
            try:
                connection_cursor.execute(command)
            except Exception:
                print(Exception)

    filename = 'sql_file.sql'

    with vertica_python.connect(**conn_info) as connection:
        cur = connection.cursor('dict')
        executeScriptsFromFile(filename, cur)
        df = pd.DataFrame(cur.fetchall())
    
    # region filter
    df.loc[df['Region'].isin(['Москва', 'Санкт-Петербург', 'Московская область'])]
    df1 = df[['User_id_ext', 'Item_id_ext', 'parent_pe_key']]
    grouped = df1.groupby(['parent_pe_key', 'User_id_ext'])['Item_id_ext'].apply(list).reset_index()
    df_grouped = pd.DataFrame({
            'userId': grouped['User_id_ext'], 
            'description': grouped['parent_pe_key'].map(str) + ' ' + grouped['User_id_ext'].map(str),
            'itemIds': grouped['Item_id_ext']
        })
    df_grouped.reset_index(drop=True, inplace=True)
    return df_grouped

Получение всех групп из API - <b>запрос</b>:

In [369]:
def get_groups_from_api():
    url = 'http://app00:8888/service-calltracking'+'/itemGroup/search'
    header = {'content-type': 'application/json'}

    payload = {'id': 21}
    r = requests.post(url, headers = header) #,data=json.dumps(payload))
    data = json.loads(r.text)
   
    df = pd.DataFrame(data['result']['items'])
    df_groups_api = pd.DataFrame({
        'groupId': df['groupId'],
        'userId': df['userId'], 
        'description': df['description'],
        'itemIds': df['itemIds'],
        'status': df['status']})
    df_groups_api.reset_index(drop=True, inplace=True)
    return df_groups_api

Старт группы:

In [370]:
def start_group(groupId):
    url = 'http://app00:8888/service-calltracking'+'/itemGroup/startGroup'
    data = {
            'id': int(groupId)
            }
    r = requests.post(url, data = data)

Остановка группы:

In [371]:
def stop_group(groupId):
    url = 'http://app00:8888/service-calltracking'+'/itemGroup/stopGroup'
    data = {
            'id': int(groupId)
            }
    r = requests.post(url, data = data)

Добавление + старт новых групп и их items в API:
<br/>(на вход подается одна строка из датафрейма)

In [372]:
def add_and_start_new_group(row):
    
    # add
    url = 'http://app00:8888/service-calltracking'+'/itemGroup/createGroup'
    data = {
            'userId': int(row['userId']),
            'description': row['description']
            }
    r = requests.post(url, data = data)
    
    #start
    url2 = 'http://app00:8888/service-calltracking'+'/itemGroup/search'
    header = {'content-type': 'application/json'}

    payload = {'id': 21}
    r2 = requests.post(url, headers = header) #,data=json.dumps(payload))
    data2 = json.loads(r.text)
   
    df = pd.DataFrame(data['result']['items'])
    df = df[(df['description'] == row['description']) & (df['userId'] == row['userId'])]
    start_group(df['groupId'])

После добавления новой группы получаем ее groupId и обновляем это значение в рабочем df:

In [373]:
def get_groupId_by_description(description):
    url = 'http://app00:8888/service-calltracking'+'/itemGroup/search'
    header = {'content-type': 'application/json'}
   
    payload = {'id': 21}
    r = requests.post(url, headers = header) #,data=json.dumps(payload))
    data = json.loads(r.text)
   
    df = pd.DataFrame(data['result']['items'])
    return df[df['description'] == description]['groupId']

Добавление новых items в группу:

In [374]:
def addItemToGroup(groupId, itemIds):
    url = 'http://app00:8888/service-calltracking'+'/itemGroup/addItemToGroup'
    data = {
            'groupId': groupId,
            'itemIds': itemIds
                }
    r = requests.post(url, data = data)

Удаление items из группы:

In [375]:
def removeItemFromGroup(groupId, itemIds):
    url = 'http://app00:8888/service-calltracking'+'/itemGroup/removeItemFromGroup'
    data = {
            'groupId': groupId,
            'itemIds': itemIds
                }
    r = requests.post(url, data = data)

Получение данных через запросы + создание dataframe для дальнейших операций по актуализации групп:

In [380]:
def get_initial_df():
    api_groups = get_groups_from_api()  
    vertica_groups = get_groups_from_vertica()
    
    # full-join таблиц по description
    df = vertica_groups.join(
            api_groups.set_index('description'), 
            on='description', 
            how='outer', 
            lsuffix='_vertica', 
            rsuffix='_api')
    df.reset_index(drop=True, inplace=True)
    
    # по этому столбцу потом выбирается действие в методе actualize_items_in_groups
    df['what_to_do'] = 'nothing' 
    
    # здесь хранится int[] с items, которые нужно добавить в конкретную группу
    df['items_to_add'] = None 
    # здесь хранится int[] с items, которые нужно удалить из конкретной группы
    df['items_to_remove'] = None
    return df

Добавляем, запускаем или отсанавливаем группы (items здесь не изменяем):

In [377]:
def actualize_groups(df):
    for i, row in df.iterrows():
        if pd.isna(row['status']):      
            add_and_start_new_group(row)
            df.at[i, 'what_to_do'] = 'add + start'           
            df.at[i, 'items_to_add'] = row['itemIds_vertica']

            # update groupId in df for a new group
            df.at[i, 'groupId'] = get_groupId_by_description()

        elif (int(row['status']) == 0) & (~pd.isna(row['userId_vertica'])):       
            start_group(row['groupId'])
            df.at[i, 'what_to_do'] = 'start + update'
            
            df.at[i, 'items_to_add'] = list(set(row['itemIds_vertica'])-set(row['itemIds_api']))
            df.at[i, 'items_to_remove'] = list(set(row['itemIds_api'])-set(row['itemIds_vertica']))


        elif (int(row['status']) == 1) & (pd.isna(row['userId_vertica'])):
            stop_group(row['groupId'])      
            df.at[i, 'what_to_do'] = 'stop'


        elif (int(row['status']) == 1) & (~pd.isna(row['userId_vertica'])):
            df.at[i, 'what_to_do'] = 'update'

            df.at[i, 'items_to_add'] = list(set(row['itemIds_vertica'])-set(row['itemIds_api']))
            df.at[i, 'items_to_remove'] = list(set(row['itemIds_api'])-set(row['itemIds_vertica']))

        else:
            pass

Добавляем/удаляем items в группах:

In [378]:
def actualize_items_in_groups(df):
    for i, row in df.iterrows():
        if row['what_to_do'] == 'add + start':   
            addItemToGroup(row['groupId'], row['itemIds'])
        elif row['what_to_do'] == 'start + update':
            addItemToGroup(row['groupId'], row['itemIds'])
            removeItemFromGroup(row['groupId'], row['itemIds']) 
        elif row['what_to_do'] == 'update':
            addItemToGroup(row['groupId'], row['itemIds'])
            removeItemFromGroup(row['groupId'], row['itemIds'])
        else:
            pass

Сам скрипт:
<br/><b>расскомментить для запуска</b>

In [379]:
#df = get_initial_df()
#actualize_groups(df)
#actualize_items_in_groups(df)