In [180]:
import json
import logging
import requests, json
from amocrm.v2 import tokens, Pipeline, Lead
import pandas as pd
from typing import Optional
import numpy as np 
import datetime

pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 500)
pd.set_option('display.width', 1000)

# logger = logging.getLogger(__name__)
# logger.setLevel(logging.INFO)

In [181]:
with open('secrets.json', 'r') as f:
    secrets = json.load(f)

storage = tokens.FileTokensStorage()
tokens.default_token_manager(
    client_id=secrets['client_id'],
    client_secret=secrets['client_secret'],
    subdomain=secrets['subdomain'],
    redirect_url=secrets['redirect_url'],
    storage=storage
)
access_token = tokens.default_token_manager.get_access_token()

In [192]:
class Vocabulary:
    def __init__(self, token_manager, secrets):
            """ 
            Получение справочников amocrm
            
            Аргументы:
                token_manager - получение и обновление токена для доступа к данным 
                subdomain - поддомен аккаунта в autocrm 
            """
            
            self.token_manager = token_manager
            self.subdomain = secrets["subdomain"]
            
            self._create_contact_vocab()   
            self._create_companies_vocab() 
            self._create_leads_vocab() 
            self._create_pipline_and_status_vocab()
            self._create_users_vocab()


    def _api_call(self, endpoint, page):
        headers = {
            'Authorization': f'Bearer {self.token_manager.get_access_token()}',
            'Content-Type': 'application/json'
        }

        url = f'https://{self.subdomain}.amocrm.ru/api/v4/{endpoint}'
        params = {"page": page}

        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        return response
    
    def _create_contact_vocab(self):
        """
        Создания словаря контактов, где ключём является id контакта,\
            значнием - вся информация о контакте
        """
        contacts_id_to_contacts_info = {}
        page=1 
        while True:
            response = self._api_call('contacts', page)
            if response.status_code == 204:
                break
            contacts =  response.json()['_embedded']['contacts']
            for contact in contacts:
                id = contact['id']
                contacts_id_to_contacts_info[id] = contact 
            page+=1
        
        self.contacts = contacts_id_to_contacts_info
        
    def _create_companies_vocab(self):
        """
        Создания словаря компаний, где ключём является id компании,\
            значнием - вся информация о компании
        """
        companies_id_to_companies_info = {}
        page=1
        while True:
            response = self._api_call('companies', page)
            if response.status_code == 204:
                break
            companies =  response.json()['_embedded']['companies']
            for company in companies:
                id = company['id']
                companies_id_to_companies_info[id] = company
            page+=1
        
        self.companies = companies_id_to_companies_info
   
    def _create_leads_vocab(self):
        """
        Создания словаря сделок, где ключём является id сделки,\
            значнием - вся информация о сделке
        """
        leads_id_to_leads_info = {}
        page=1
        while True:
            response = self._api_call('leads', page)
            if response.status_code == 204:
                break
            leads =  response.json()['_embedded']['leads']
            for lead in leads:
                id = lead['id']
                leads_id_to_leads_info[id] = lead
            page+=1
        
        self.leads = leads_id_to_leads_info
        
    def _create_users_vocab(self):
        """
        Создания словаря менеджеров, где ключём является id менеджера,\
            значнием - вся информация о менеджере
        """
        users_id_to_users_info = {}
        page=1
        while True:
            response = self._api_call('users', page)
            if response.status_code == 204:
                break
            usesrs =  response.json()['_embedded']['users']
            for user in usesrs:
                id = user['id']
                users_id_to_users_info[id] = user
            page+=1
        
        self.users = users_id_to_users_info   
        
        
    def _create_pipline_and_status_vocab(self):
        """
        Создания словаря воронок, где ключём является id воронки,\
            значнием - вся информация о воронке (например, статус сделки)
        """
        pipe_id_to_pipe_info = {}
        response = self._api_call('leads/pipelines', None)
        piplines = response.json()['_embedded']['pipelines']
        for pipline in piplines:
            pipline_id = pipline['id']
            pipe_id_to_pipe_info[pipline_id] = {'name':pipline['name'],
                                                'statuses': dict([(status_dict['id'], status_dict['name']) \
                                                    for status_dict in pipline['_embedded']['statuses']])
                                            }
        self.piplines = pipe_id_to_pipe_info  


In [193]:
class SpecificDataProcessing:
    """
    Обработка specific_data в зависимоси от типа входящей строки
    """
    
    def __init__(self):
        """
        entity_linked_func - получить даные о клиенте/компании
        sale_field_changed_func- поулчить данные о цене 
        lead_status_func - получить данные о статусе задачи 
        pipline_func - получить даные о pipline 
        
        sale: float - цена 
        company: int - id компании 
        client: int - id клиента
        lead_status: int - id статуса задачи 
        pipline: int - id пайплайна
        """
        
        # логика обработки для разных типов specific_data
        self.entity_linked_func = lambda row: row.specific_data['after'][0]['link']['entity']['id']                
        self.sale_field_changed_func = lambda row: row.specific_data['after'][0]['sale_field_value']['sale']
        self.lead_status_func = lambda row: row.specific_data['after'][0]['lead_status']['id']
        self.pipline_func = lambda row: row.specific_data['after'][0]['lead_status']['pipeline_id'] 
        
        # начальная инициализация сквозных значений (sale, responsible_user_id, pipeline, lead_status)
        self.initial_func= lambda row: (row.specific_data['sale'], 
                                        row.specific_data['responsible_user_id'],
                                        row.specific_data['pipeline'],
                                        row.specific_data['lead_status'])
        
        # сквозные поля датасета
        self.sale: Optional[float] = np.nan
        self.company: Optional[str]= np.nan 
        self.contact: Optional[str]= np.nan
        self.lead_status: Optional[str] = np.nan
        self.pipline: Optional[int]= np.nan 
        self.responsible: Optional[int]= np.nan 
    
    def __call__(self, row: pd.Series) -> pd.Series:
        """  
        Обработка поля specific_data для получени сквозных показателей
        
        Аргументы:
            row: строка данных 
        Возвращает:
            pd.Series для следующих полей:
                ('client', 'company', 'sale', 'lead_status', 'pipline')
        """
        
        # если это строка инициализации задачи
        if row.type == 'initial':
            self.sale, self.responsible, self.pipline, self.lead_status = self.initial_func(row)
        
        # если установили/изменили sale
        elif row.type == 'sale_field_changed':
           self.sale =  self.sale_field_changed_func(row)
           
        # если добавили/изменили сущности: company, contact
        elif row.type=='entity_linked':
            if row.specific_data['after'][0]['link']['entity']['type']=='contact':
                 self.contact = self.entity_linked_func(row)
            elif row.specific_data['after'][0]['link']['entity']['type']=='company':
                self.company = self.entity_linked_func(row)
                
        # если изменился статус/пайплайн задачи 
        elif row.type=='lead_status_changed':
            self.lead_status = self.lead_status_func(row)
            self.pipline = self.pipline_func(row)

        return pd.Series([self.contact, self.company, self.sale, self.lead_status, self.pipline, self.responsible])

In [194]:
class DataRepresentation: 
    """ 
    Представление данных в "человеческом" виде
    """
    def __init__(self, token_manager, secrets):
        """
        df - итоговая таблица, содержащая всю информацию о задаче 
        vocab - все словари аккаунта
        """
        self.vocab = Vocabulary(tokens.default_token_manager, secrets) 
        self._processing()
        
    def _processing(self):
        """
        Подготовка словарей для мапинга в итоговую таблицу
        """
        self.entity_id = {k:v['name'] for k,v in self.vocab.leads.items()}
        self.client = {k:v['name'] for k,v in self.vocab.contacts.items()}
        self.company = {k:v['name'] for k,v in self.vocab.companies.items()}
        self.pipline = {k:v['name'] for k,v in self.vocab.piplines.items()}
        self.lead_status= {k:v['statuses'] for k,v in self.vocab.piplines.items()}
        self.users = {k:v['name'] for k,v in self.vocab.users.items()}
        
    def __call__(self, df):
        return df.assign(
                created_at = df.created_at.apply(lambda timestamp: datetime.datetime.fromtimestamp(timestamp)),
                created_by = df.responsible.apply(lambda cell: self.users.get(cell, None)),
                entity_id = df.entity_id.apply(lambda cell: self.entity_id.get(cell, None)),
                client = df.client.apply(lambda cell: self.client.get(cell, None)),
                company = df.company.apply(lambda cell: self.company.get(cell, None)),
                pipline = df.pipline.apply(lambda cell: self.pipline.get(cell, None)),
                lead_status	= df.apply(lambda row: self.lead_status[row.pipline][row.lead_status] , axis=1),
                responsible = df.responsible.apply(lambda cell: self.users.get(cell, None))
            )
        
        
        


In [195]:
class AmoCRM:
    def __init__(self, token_manager, secrets):
        """ 
        Получение и обработка джанных AmoCRM
        
        Аргументы:
            token_manager - получение и обновление токена для доступа к данным 
            subdomain - поддомен аккаунта в autocrm 
            processor - класс для обработки сквозных значений
            presentation - класс для представления данных в "человеческом" виде
            _general_fields - поля основных значений (одинаковые для каждого типа записи)
        """
        
        self.token_manager = token_manager
        self.subdomain = secrets["subdomain"]
        self.processor = SpecificDataProcessing
        self.presentation = DataRepresentation(token_manager, secrets)
        self._general_fields = ['type', 'entity_id', 'created_by', 'created_at', 'specific_data']

    def _api_call(self, endpoint, entity, entity_id):
        headers = {
            'Authorization': f'Bearer {self.token_manager.get_access_token()}',
            'Content-Type': 'application/json'
        }

        url = f'https://{self.subdomain}.amocrm.ru/api/v4/{endpoint}'

        params = {
            "filter[entity]": entity,
            "filter[entity_id]": entity_id
        }

        response = requests.get(url, headers=headers, params=params)
        response.raise_for_status()
        return response
    

    def _initial_processing(self, initial_df: pd.DataFrame)-> pd.DataFrame:
        ''' 
        Унификация инициализирующих данных
        '''

        initial_df['type']='initial',
        initial_df['entity_id']=initial_df['id']
        initial_df['specific_data'] = initial_df.apply(
            lambda row: {'sale': row.price, 
                        'responsible_user_id': row.responsible_user_id, 
                        'pipeline': row.pipeline_id, 
                        'lead_status': row.status_id}, axis=1)
        
        return initial_df[self._general_fields]
        
    
    def _event_processing(self, event_df: pd.DataFrame) -> pd.DataFrame:
        '''
        Унификация данных по евентам
        '''
        
        event_df['specific_data'] = event_df.apply(
            lambda row: {'after':row.value_after, 'before':row.value_before}, axis=1)
        
        return event_df[self._general_fields]
    
    def _task_processing(self, task_df: pd.DataFrame) -> pd.DataFrame:
        '''
        Унификация данных по задачам
        '''
        
        task_df['type'] = 'task'
        task_df['specific_data'] = task_df.apply(
            lambda row: {'text': row.text, 
                         'is_completed': row.is_completed,
                         'result':row.result, 
                         'responsible_user_id': row.responsible_user_id,
                         'complete_till': row.complete_till}, axis=1)
        
        return  task_df[self._general_fields ]


    def _note_processing(self, note_df: pd.DataFrame) -> pd.DataFrame:
        '''
        Унификация данных по заметкам
        '''
        
        note_df['type'] = 'note' 
        note_df['specific_data'] = note_df.apply(
            lambda row: {'text': row.params['text'],
                         'note_type': row.note_type,
                         'responsible_user_id': row.responsible_user_id,
                         'updated_at': row.updated_at}, axis=1)
        
        return note_df[self._general_fields]

    def get_initial_data_lead(self, lead_id):
        """ 
        Получение начальных данных по задаче
        """
        
        headers = {
        'Authorization': f'Bearer {self.token_manager.get_access_token()}',
        'Content-Type': 'application/json'
        }
        url = f'https://{secrets["subdomain"]}.amocrm.ru/api/v4/leads/{lead_id}'
        response = requests.get(url, headers=headers)
        
        if response.status_code == 200:
            initial_df = pd.Series(response.json()).to_frame().T # 
            return  self._initial_processing(initial_df)
        
        elif response.status_code == 204:
            return pd.DataFrame()
        
        else:
            raise Exception('Error: {}'.format(response.status_code))
        

    def get_events_by_lead_id(self, lead_id):
        response = self._api_call('events', 'lead', lead_id)
        if response.status_code == 200:
            event_df_raw = pd.DataFrame(response.json()['_embedded']['events'])
            return self._event_processing(event_df_raw)
        
        elif response.status_code == 204:
            return pd.DataFrame()
        
        else:
            raise Exception('Error: {}'.format(response.status_code))


    def get_tasks_by_lead_id(self, lead_id):
        response = self._api_call('tasks', 'lead', lead_id)
        if response.status_code == 200:
            task_df_raw = pd.DataFrame(response.json()['_embedded']['tasks'])
            return self._task_processing(task_df_raw)
        
        elif response.status_code == 204:
            return pd.DataFrame()
        
        else:
            raise Exception('Error: {}'.format(response.status_code))


    def get_notes_by_lead_id(self, lead_id):
        response = self._api_call('leads/notes', 'lead', lead_id)
        if response.status_code == 200:
            note_df_raw = pd.DataFrame(response.json()['_embedded']['notes'])
            return self._note_processing(note_df_raw)
        
        elif response.status_code == 204:
            return pd.DataFrame()
        
        else:
            raise Exception('Error: {}'.format(response.status_code))
    
    # preparation all_lead_info
    def get_all_lead_info(self, lead_id):
        """
        Получение и обоработка все данных по задачам
        """
        
        processor = self.processor() # обработчик сквозных значений
        presentation = self.presentation # пердставление данных в удобном виде
        
        inital_df = self.get_initial_data_lead(lead_id)
        events_df = self.get_events_by_lead_id(lead_id)
        tasks_df = self.get_tasks_by_lead_id(lead_id)
        notes_df = self.get_notes_by_lead_id(lead_id)
        
        result = pd.concat([inital_df, events_df, tasks_df, notes_df], axis=0).\
                sort_values('created_at').reset_index(drop=True).assign(client=None,
                                                                        company=None,
                                                                        sale=None,  
                                                                        lead_status=None,
                                                                        pipline=None,
                                                                        responsible=None)
                
        result[['client', 'company', 'sale', 'lead_status', 'pipline', 'responsible']] = result.apply(processor , axis=1)
        
        result = result.astype({'client': pd.Int32Dtype(), 
                              'company': pd.Int32Dtype(), 
                              'sale': pd.Float32Dtype(),
                              'lead_status': pd.Int32Dtype(),
                              'pipline':pd.Int32Dtype(),
                              'responsible':pd.Int32Dtype()})
        
        return presentation(result)

In [196]:
lead_id =37925381
vocab = Vocabulary(tokens.default_token_manager, secrets)  # получили все словари аккаунта 
crm = AmoCRM(tokens.default_token_manager, secrets)
df = crm.get_all_lead_info(lead_id) # получили всю информацию о задаче (с НЕобработанными id)
df