In [1]:
import pandas as pd
from datetime import date, datetime
from dateutil.relativedelta import relativedelta
import teradatasql
import numpy as np
import os

- Открываем файл.
- Импортируем данные.
- Обрабатываем данные.
- Делаем бэкап UAT_PRODUCT.PRODUCT_PARAMETERS в UAT_PRODUCT.PRODUCT_PARAMETERS_BACKUP.
- Грузим обработанные данные в UAT_PRODUCT.PRODUCT_PARAMETERS_TEST_FOR_SAP.
- Запросом транформируем данные в подневные из UAT_PRODUCT.PRODUCT_PARAMETERS_TEST_FOR_SAP в UAT_PRODUCT.PRODUCT_PARAMETERS.

In [2]:
class SAP():
    
    def __init__(self, path = r'P:\\CP_PLM\\Reporting\\Report_Data\\Revenue & Subs\\'):
        self.path = path
        self.shortname = None
        self.condition = str()
        self.choose_file()
        self.raw = pd.DataFrame()
        
 
    def choose_file(self):
        for file in enumerate(os.listdir(self.path)):
            print(file)
        while not self.shortname:
            file_index = int(input('Какой номер нужен? '))
            for i, file in enumerate(os.listdir(self.path)):
                if i == file_index:
                    print(f'Выбран {file}')
                    self.shortname = file
                    self.filename = self.path + self.shortname
                    break
                
            
    def import_file(self, filename = None):
        '''
        как результат - получаем self.raw
        '''
        if not filename: filename = self.filename
        dtype_dict = {
            'version': str, 'month': str, 'year': str, 'region': str, 'base_type': str, 
            'tariff': str, 'account': str, 'param_value': float
        }
        
        header_dict = {
            'data' : ['version','month','year','region','account','param_value'],
            'phd' : ['version','month','year','region','base_type','tariff','account','param_value'],
            'rev' : ['version','month','year','region','base_type','tariff','account','param_value']
        }
        
        for sh_name in ['Region']:
            try:
                sh_df = pd.read_excel(filename, sheet_name = sh_name, engine='pyxlsb',
                    names = header_dict[sh_name],
                    usecols = header_dict[sh_name],
                    dtype = dtype_dict, skiprows = 0)
            except:
                continue
            else:
                sh_df = sh_df.dropna()
                self.raw = pd.concat([self.raw,sh_df])
        print(self.raw.head())
        
        
        
    def apply_condition(self, param_dict = None):
        '''
        применяем условия к датафрейму и как результат - получаем self.df для инсерта
        '''
        if not param_dict: param_dict = dict()
            
        if param_dict.get('min_date', None):
            min_date_dt = datetime.strptime(param_dict['min_date'], '%Y-%m-%d').date()
        else:
            min_date_dt = self.full_df.report_month.min()
        min_date_str = min_date_dt.strftime('%Y-%m-%d')
            
            
        if param_dict.get('max_date', None):
            max_date_dt = datetime.strptime(param_dict['max_date'], '%Y-%m-%d').date()
        else:
            max_date_dt = self.full_df.report_month.max()
        max_date_dt = (max_date_dt + relativedelta(months=1)).replace(day=1) - relativedelta(days=1)
        max_date_str = max_date_dt.strftime('%Y-%m-%d')
        
        
        
        if param_dict.get('param_1', None):
            param_1 = param_dict['param_1']
        else:
            param_1 = [param.strip() for param in self.full_df.version.unique()]
            
        if param_dict.get('param_2', None):
            param_2 = param_dict['param_2']
        else:
            param_2 = [param.strip() for param in self.full_df.account.unique()]
            
            
        self.df = self.full_df[
                (self.full_df.report_month >= min_date_dt)
                & (self.full_df.report_month <= max_date_dt)
                & (self.full_df.version.isin(param_1))
                & (self.full_df.account.isin(param_2))
        ]
        cols = ['report_month', 'version', 'account','NULL', 'NULL1', 'BRANCH_ID', 'base_type',
                'NULL2', 'NULL3', 'tariff', 'NULL4', 'param_value']
        self.df = self.df.reindex(columns = cols,  fill_value = np.NaN).where((pd.notnull(self.df)), None)
        
        
        '''
        формулируем те же условия в SQL, чтобы затирать при необходимости эти же данные в терадате
        '''
        self.condition = f'''
        WHERE REPORT_DATE BETWEEN DATE'{min_date_str}' AND DATE'{max_date_str}'
        AND PARAM_1 in ({','.join("'{0}'".format(p) for p in param_1)}) 
        AND PARAM_2 in ({','.join("'{0}'".format(p) for p in param_2)})
        '''

        self.uat_pp_df = self.get_uat_pp_df()
        print(self.condition)
        
    
    def get_sap_codes(self):
        with teradatasql.connect() as session:
            query = '''
                    sel branch_id, sap_code, sap_name_ru
                    FROM PRD2_DIC_V.BRANCH
                    where SAP_CODE is not null
                    '''
            return pd.read_sql(query, session)
                
    
    def process_df(self):
        '''
        как результат - получаем self.full_df
        '''
        
        self.full_df = self.raw.copy()
        self.full_df['report_month'] = self.full_df.apply(lambda row: date(int(row.year), int(row.month), 1), axis = 1)
        self.full_df.param_value = self.full_df.param_value.apply(lambda x: round(x,4))
        
        if 'tariff' in self.full_df:
            tariff_dict = {'1100' : 'Bundle', '2000' : 'PAYG'}
            self.full_df.tariff = self.full_df.tariff.replace(tariff_dict)
            self.full_df.tariff = self.full_df.tariff.replace('#','NO SUBS')
        
        if 'base_type' in self.full_df:
            self.full_df.base_type = self.full_df.base_type.replace('#','NOT A')
        
        self.full_df.version = self.full_df.version.apply(lambda x: 'BU' if 'BU' in x else x.strip())
        self.sap_codes_df = self.get_sap_codes()
        self.full_df = pd.merge(self.full_df, self.sap_codes_df, how='inner', left_on='region', right_on='SAP_CODE')       
        print(self.full_df.head())
        
    
    def make_uat_pp_backup(self):
        with teradatasql.connect() as session:
            with session.cursor() as cur:
                cur.execute('''
                    DELETE FROM UAT_PRODUCT.PRODUCT_PARAMETERS_BACKUP; 
                    INSERT INTO UAT_PRODUCT.PRODUCT_PARAMETERS_BACKUP
                    SELECT * FROM UAT_PRODUCT.PRODUCT_PARAMETERS 
                ''')
        print(f'{cur.rowcount} rows inserted into UAT_PRODUCT.PRODUCT_PARAMETERS_BACKUP.')
        
    
    def get_uat_pp_df(self):
        with teradatasql.connect() as session:
            query = 'sel * from UAT_PRODUCT.PRODUCT_PARAMETERS ' + self.condition
            return pd.read_sql(query, session)
            
            
    def get_df_diff(self, df1, df2, which='right_only'):
        comparison_df = df1.merge(df2,
                                  indicator=True,
                                  how='outer')
        if which is None:
            diff_df = comparison_df[comparison_df['_merge'] != 'both']
        else:
            diff_df = comparison_df[comparison_df['_merge'] == which]
        return diff_df
    
    
    def update_uat_pp_test(self, batchsize = 100000):
        with teradatasql.connect() as session:
            with session.cursor() as cur:
                print('deleting from UAT_PRODUCT.PRODUCT_PARAMETERS_TEST_FOR_SAP...')
                cur.execute('''delete from UAT_PRODUCT.PRODUCT_PARAMETERS_TEST_FOR_SAP;''')
                print(f'{cur.rowcount} rows deleted from UAT_PRODUCT.PRODUCT_PARAMETERS_TEST_FOR_SAP\n')

                print('inserting into UAT_PRODUCT.PRODUCT_PARAMETERS_TEST_FOR_SAP...')
                for num in range(0, len(self.df), batchsize):
                    cur.executemany(f'''
                     INSERT into UAT_PRODUCT.PRODUCT_PARAMETERS_TEST_FOR_SAP ({','.join('?'*len(self.df.columns))})
                    ''',
                        [tuple(row) for row in self.df.iloc[num:num+batchsize,:].itertuples(index=False)]
                        )
                print(f'{len(self.df)} rows inserted into UAT_PRODUCT.PRODUCT_PARAMETERS_TEST_FOR_SAP.')
                
                
    def delete_from_uat_pp(self):
        query = 'DELETE from UAT_PRODUCT.PRODUCT_PARAMETERS ' + self.condition      
        with teradatasql.connect() as session:
            with session.cursor() as cur:
                cur.execute(query)      
                print(f'{cur.rowcount} rows deleted from UAT_PRODUCT.PRODUCT_PARAMETERS')
                
                
    def insert_into_uat_pp(self, replace=True):
        self.update_uat_pp_test()
        self.make_uat_pp_backup()
        if replace: self.delete_from_uat_pp()

        query = '''
            insert into UAT_PRODUCT.PRODUCT_PARAMETERS
            sel
                cal.calendar_date as REPORT_DATE, test.PARAM_1, test.PARAM_2, test.PARAM_3,
                test.PARAM_4, test.BRANCH_ID, test.BASE_TYPE, test.TP_ID_1, test.TP_ID_2,
                test.TARIFF_1, test.TARIFF_2, PARAM_VALUE/EXTRACT(DAY FROM LAST_DAY(REPORT_DATE)) as PARAM_VALUE
             from uat_product.product_parameters_test_for_sap test
             left join Sys_Calendar.BusinessCalendar cal
                on test.REPORT_DATE = trunc(cal.calendar_date,'mon')
             where test.PARAM_1 <> 'AC'
             union all
             sel *
             from uat_product.product_parameters_test_for_sap test
             where test.PARAM_1 = 'AC'
        '''
                
        with teradatasql.connect() as session:
            with session.cursor() as cur:
                cur.execute(query)      
                print(f'{cur.rowcount} rows inserted into UAT_PRODUCT.PRODUCT_PARAMETERS.')

In [16]:
df = pd.read_excel(r'P:\\CP_PLM\\Reporting\\Report_Data\\Revenue & Subs\\Core Live Subs PLAN.xlsx', sheet_name='B2C')
id_vars=['region code', 'region', 'base']
df = df.melt(id_vars,
            value_vars=[col for col in df.columns if col not in id_vars],
            var_name='REPORT_DATE',
            value_name='PARAM_VALUE'
            )
df['BASE_TYPE'] = df['base'].apply(lambda x: x.upper())
df['PARAM_3'] = 'B2C'
df = df[['region code', 'region', 'BASE_TYPE', 'REPORT_DATE', 'PARAM_VALUE', 'PARAM_3']]
df.head()

Unnamed: 0,region code,region,BASE_TYPE,REPORT_DATE,PARAM_VALUE,PARAM_3
0,203000,Buryatia,NEW,2021-01-01,50608,B2C
1,204000,Gorno-Altaisk,NEW,2021-01-01,3613,B2C
2,211000,Komi,NEW,2021-01-01,22915,B2C
3,212000,Mari El,NEW,2021-01-01,23841,B2C
4,213000,Mordovia,NEW,2021-01-01,28547,B2C


In [17]:
df2 = pd.read_excel(r'P:\\CP_PLM\\Reporting\\Report_Data\\Revenue & Subs\\Core Live Subs PLAN.xlsx', sheet_name='B2B')
df2_new = pd.DataFrame()
for month in df.REPORT_DATE.unique():
    temp_df = df2.copy()
    temp_df['REPORT_DATE'] = month
    df2_new = pd.concat([temp_df, df2_new])
df2_new['BASE_TYPE'] = None
df2_new['PARAM_3'] = 'B2B'
df2_new.columns = ['region code', 'region', 'PARAM_VALUE', 'REPORT_DATE', 'BASE_TYPE', 'PARAM_3']
df2_new = df2_new[['region code', 'region', 'BASE_TYPE', 'REPORT_DATE', 'PARAM_VALUE', 'PARAM_3']]
df2_new.head()

Unnamed: 0,region code,region,BASE_TYPE,REPORT_DATE,PARAM_VALUE,PARAM_3
0,203000,Buryatia,,2021-12-01,14628,B2B
1,204000,Gorno-Altaisk,,2021-12-01,0,B2B
2,211000,Komi,,2021-12-01,14140,B2B
3,212000,Mari El,,2021-12-01,5554,B2B
4,213000,Mordovia,,2021-12-01,2012,B2B


In [20]:
union_df = pd.concat([df, df2_new])
union_df['PARAM_1'] = 'Plan'
union_df['PARAM_2'] = 'Core Live Subs'
for col in ['PARAM_4', 'TP_ID_1','TP_ID_2','TARIFF_1','TARIFF_2']:
    union_df[col] = None
union_df.head()

Unnamed: 0,region code,region,BASE_TYPE,REPORT_DATE,PARAM_VALUE,PARAM_3,PARAM_1,PARAM_2,PARAM_4,TP_ID_1,TP_ID_2,TARIFF_1,TARIFF_2
0,203000,Buryatia,NEW,2021-01-01,50608,B2C,Plan,Core Live Subs,,,,,
1,204000,Gorno-Altaisk,NEW,2021-01-01,3613,B2C,Plan,Core Live Subs,,,,,
2,211000,Komi,NEW,2021-01-01,22915,B2C,Plan,Core Live Subs,,,,,
3,212000,Mari El,NEW,2021-01-01,23841,B2C,Plan,Core Live Subs,,,,,
4,213000,Mordovia,NEW,2021-01-01,28547,B2C,Plan,Core Live Subs,,,,,


In [21]:
with teradatasql.connect() as session:
    query = '''
            sel branch_id, sap_code, sap_name_ru
            FROM PRD2_DIC_V.BRANCH
            where SAP_CODE is not null
            '''
    sap_codes = pd.read_sql(query, session)
sap_codes.head()

Unnamed: 0,BRANCH_ID,SAP_CODE,SAP_NAME_RU
0,15.0,248000,Tele2-Липецк
1,55.0,256000,Tele2-Оренбург
2,106.0,212000,Tele2-Марий Эл
3,36.0,335000,Tele2-Вологда
4,100.0,286000,Tele2 - ХМАО


In [22]:
union_df_new = pd.merge(union_df, sap_codes, how='inner', left_on='region code', right_on='SAP_CODE')
union_df_new.head()

Unnamed: 0,region code,region,BASE_TYPE,REPORT_DATE,PARAM_VALUE,PARAM_3,PARAM_1,PARAM_2,PARAM_4,TP_ID_1,TP_ID_2,TARIFF_1,TARIFF_2,BRANCH_ID,SAP_CODE,SAP_NAME_RU
0,203000,Buryatia,NEW,2021-01-01,50608,B2C,Plan,Core Live Subs,,,,,,66.0,203000,Tele2-Бурятия
1,203000,Buryatia,OLD,2021-01-01,231141,B2C,Plan,Core Live Subs,,,,,,66.0,203000,Tele2-Бурятия
2,203000,Buryatia,NEW,2021-02-01,48976,B2C,Plan,Core Live Subs,,,,,,66.0,203000,Tele2-Бурятия
3,203000,Buryatia,OLD,2021-02-01,231242,B2C,Plan,Core Live Subs,,,,,,66.0,203000,Tele2-Бурятия
4,203000,Buryatia,NEW,2021-03-01,49616,B2C,Plan,Core Live Subs,,,,,,66.0,203000,Tele2-Бурятия


In [23]:
union_df_new = union_df_new[['REPORT_DATE', 'PARAM_1', 'PARAM_2', 'PARAM_3', 'PARAM_4', 'BRANCH_ID', 'BASE_TYPE', 'TP_ID_1',
    'TP_ID_2', 'TARIFF_1', 'TARIFF_2', 'PARAM_VALUE']]
union_df_new.head()

Unnamed: 0,REPORT_DATE,PARAM_1,PARAM_2,PARAM_3,PARAM_4,BRANCH_ID,BASE_TYPE,TP_ID_1,TP_ID_2,TARIFF_1,TARIFF_2,PARAM_VALUE
0,2021-01-01,Plan,Core Live Subs,B2C,,66.0,NEW,,,,,50608
1,2021-01-01,Plan,Core Live Subs,B2C,,66.0,OLD,,,,,231141
2,2021-02-01,Plan,Core Live Subs,B2C,,66.0,NEW,,,,,48976
3,2021-02-01,Plan,Core Live Subs,B2C,,66.0,OLD,,,,,231242
4,2021-03-01,Plan,Core Live Subs,B2C,,66.0,NEW,,,,,49616


In [24]:
union_df_new.BASE_TYPE.unique()

array(['NEW', 'OLD', None], dtype=object)

In [25]:
def update_uat_pp_test(df, year, batchsize = 100000):
    with teradatasql.connect() as session:
        with session.cursor() as cur:
            
            print('deleting from uat_product.PRODUCT_PARAMETERS...')
            cur.execute('''
                delete from uat_product.PRODUCT_PARAMETERS
                WHERE PARAM_1='Plan'
                AND PARAM_2='Core Live Subs' 
                AND extract(YEAR from REPORT_DATE)=?
                ;''', params=(year, ))
            print(f'{cur.rowcount} rows deleted from uat_product.PRODUCT_PARAMETERS')

            print('inserting into UAT_PRODUCT.PRODUCT_PARAMETERS...')
            for num in range(0, len(df), batchsize):
                cur.executemany(f'''
                    INSERT into UAT_PRODUCT.PRODUCT_PARAMETERS ({','.join('?'*len(df.columns))})
                ''',
                    [tuple(row) for row in df.iloc[num:num+batchsize,:].itertuples(index=False)]
                    )
            print(f'{len(df)} rows inserted into UAT_PRODUCT.PRODUCT_PARAMETERS.')

In [26]:
update_uat_pp_test(union_df_new, 2021)

deleting from uat_product.PRODUCT_PARAMETERS...
2292 rows deleted from uat_product.PRODUCT_PARAMETERS
inserting into UAT_PRODUCT.PRODUCT_PARAMETERS...
2292 rows inserted into UAT_PRODUCT.PRODUCT_PARAMETERS.


In [27]:
union_df_new_2020 = union_df_new.copy()
union_df_new_2020['REPORT_DATE'] = union_df_new_2020['REPORT_DATE'].apply(lambda x: x.replace(year=2020))
union_df_new_2020['PARAM_VALUE'] = 0
union_df_new_2020.head()

Unnamed: 0,REPORT_DATE,PARAM_1,PARAM_2,PARAM_3,PARAM_4,BRANCH_ID,BASE_TYPE,TP_ID_1,TP_ID_2,TARIFF_1,TARIFF_2,PARAM_VALUE
0,2020-01-01,Plan,Core Live Subs,B2C,,66.0,NEW,,,,,0
1,2020-01-01,Plan,Core Live Subs,B2C,,66.0,OLD,,,,,0
2,2020-02-01,Plan,Core Live Subs,B2C,,66.0,NEW,,,,,0
3,2020-02-01,Plan,Core Live Subs,B2C,,66.0,OLD,,,,,0
4,2020-03-01,Plan,Core Live Subs,B2C,,66.0,NEW,,,,,0


In [28]:
update_uat_pp_test(union_df_new_2020, 2020)

deleting from uat_product.PRODUCT_PARAMETERS...
2292 rows deleted from uat_product.PRODUCT_PARAMETERS
inserting into UAT_PRODUCT.PRODUCT_PARAMETERS...
2292 rows inserted into UAT_PRODUCT.PRODUCT_PARAMETERS.
