In [2]:
import pandas as pd
import os 
import psycopg2
import numpy as np
from sqlalchemy import create_engine 

import datetime as dt

In [3]:
# отображение всех столбцов 
pd.options.display.max_columns = 200
# полное название записей таблицы
pd.options.display.max_colwidth = 200

## Function

In [4]:
def prognoz_query(query):
    try:
        conn = psycopg2.connect(dbname='prognoz', user='prognoz', password='prognoz', host='192.168.245.37',port=5432)
        df = pd.read_sql(query, conn)
        return df
    except(Exception, psycopg2.Error) as error:
        print('Error read sql: ', error)
    finally:
        if conn:
            conn.close()

In [5]:
def rename_region(df):
    dct = {'Республика Татарстан':'Республика Татарстан (Татарстан)',
            'Республика Адыгея':'Республика Адыгея (Адыгея)',
            'Ханты-Мансийский автономный округ':'Ханты-Мансийский автономный округ - Югра',
            'Город Москва':'г. Москва',
            'Республика Чувашия':'Чувашская Республика - Чувашия',
            'Республика Саха':'Республика Саха (Якутия)',
            'Республика Северная Осетия-Алания':'Республика Северная Осетия - Алания', 
            'Республика Удмуртия':'Удмуртская Республика',
            'Город Санкт-Петербург' : 'Санкт-Петербург',
            'Республика Чечня' : 'Чеченская Республика',
            'Город Севастополь' : 'Севастополь',
            'Республика Карачаево-Черкесия' : 'Карачаево-Черкесская Республика',
            'Республика Кабардино-Балкария' : 'Кабардино-Балкарская Республика'}
    
    for a, b in dct.items():
        if a in df['region'].tolist():
            df['region'] = df['region'].apply(lambda x: x.replace(a, b))
    return df


def clean_text(str_):
    del_symbol = ['"', '\n', '«', '»', '.', ',']
    for symb in del_symbol:
        str_ = str_.lower().strip().replace(symb, '')
    return str_

In [6]:
def get_df(df_raw):
    df = df_raw.copy()
    df.iloc[3, :] = df.iloc[3, :].fillna(method='ffill')
    
    df = df[~df[9].isna()].reset_index(drop=True)
    df = df.dropna(thresh=3, axis=1)
    
    # последний отчетный день
    #df = df.loc[:, [5, 7, 9] + list(range(df.columns.max()-7, df.columns.max()+1))]  
    df = df.loc[:, [5, 7, 9] + list(range(15, df.columns.max()+1))] 
    
    df_tr = transpose_df(df)
    df_tr[0] = df_tr[0].astype('float')

    if df_raw.equals(df_ost_raw):
        df_tr.rename(columns={0:'vol_ost'}, inplace=True)
        display(df_tr.head())
    else:
        df_tr.rename(columns={0:'vol_emk'}, inplace=True)   
        display(df_tr.head())
    return df_tr    

In [7]:
def transpose_df(df):
    header_area = df.iloc[:2,3:]
    index_area = df.iloc[2:,:3]
    data_area = df.iloc[2:,3:]
    
    header = pd.MultiIndex.from_frame(header_area.transpose(), names=['date', 'product'])
    index = pd.MultiIndex.from_frame(index_area, names=['company', 'region', 'oilbase'])
    
    data_area.index = index
    data_area.columns = header
    
    df_tr = data_area.stack(list(range(2))).reset_index()
    
    return df_tr 

In [8]:
def find_id(df):
    
    df_id = rename_region(df)
    oilbases_id = oilbases.copy()
    companies_id = companies.copy()
    
    df_id['oilbase'] = df_id['oilbase'].apply(clean_text)
    df_id['company'] = df_id['company'].apply(clean_text)
    oilbases_id['oilbase'] = oilbases_id['oilbase'].apply(clean_text)
    companies_id['company'] = companies_id['company'].apply(clean_text)
        
    # добавление айди нефтебазы
    df_id = df_id.merge(oilbases_id, on=('oilbase', 'region'), how='inner')
    
    # добавление айди компании
    df_id = df_id.merge(companies_id, on=('company'), how='inner')

    # добавление айди продукта
    df_id = df_id.replace({"product": prod_dict})
    df_id.rename(columns={"product":"product_id"}, inplace=True)
    df_id = df_id.groupby(['company_id', 'oilbase_id', 'date', 'product_id']).sum().reset_index()
    
    return df_id

In [9]:
def final_df(df_ost_merge, df_emk_merge):
    ost_sql = find_id(df_ost_merge)
    emk_sql = find_id(df_emk_merge)
    
    df_merge = ost_sql.merge(emk_sql, on=('company_id', 'oilbase_id', 'date', 'product_id'), how='inner')
    
    df_merge['date'] = df_merge['date'].astype(str)
    df_merge['date'] = month_report + '-' + df_merge['date']
    
    df_merge['date'] = df_merge['date'].astype('datetime64')
    df_merge['date'] = df_merge['date'].astype('str')
    df_merge = df_merge[df_merge['date'] == str(date_report)]
    
    display(df_merge.head())
        
    return df_merge

In [10]:
companies = prognoz_query("""select company_code "company_id",lower(company_name) as company from company""")
comp_dict= pd.Series(companies.company_id.values,index=companies.company).to_dict()

oilbases = prognoz_query(
                        """
                        select id "oilbase_id",lower(zm05) as "oilbase", r.region_name region
                        from oilbases o
                        left join regions r on o.region_code=r.region_code 
                        where zm05 is not null
                        """
                        )

prod_dict = {'Арктич.':24, 
             'Зим.':20,
             'Лет.':14,
             'Межсезонное':19,
             'ОЧ-92':6,
             'ОЧ-95':8,
             'ОЧ-98':10,
             'ОЧ-78/80':3}



### Start

In [12]:
airflow = False

execute_to_db = True

In [13]:
if airflow:
    path = ''
else:
    #path = r'C:\Users\mendgaziev\Desktop\Git\Загрузка zm05_new/'.replace('\\', '/')
    #path = r'F:\Everyone\Чернышева/'.replace('\\', '/')
    path = r'F:\Airflow\zm05/'.replace('\\', '/')

In [26]:
date_report = dt.date.today()-dt.timedelta(days=2)
month_report = str(date_report)[:7]
month_report

'2024-06'

In [27]:
date_report

datetime.date(2024, 6, 19)

In [16]:
df_ost_raw = pd.read_excel(path+'ПНПО_МЕС_'+month_report.replace('-', '_') + '.xlsm', sheet_name='Н№', header=None)
df_emk_raw = pd.read_excel(path+'ПНПО_МЕС_'+month_report.replace('-', '_') + '.xlsm', sheet_name='Е№', header=None)

In [17]:
# исходные транспонируемые данные без id 
df_ost = get_df(df_ost_raw)
df_emk = get_df(df_emk_raw)

  uniques = Index(uniques)


Unnamed: 0,company,region,oilbase,date,product,vol_ost
0,"ПАО ""ЛУКОЙЛ""",Архангельская область,Нефтебаза Транс-Лес,1,Арктич.,0.0
1,"ПАО ""ЛУКОЙЛ""",Архангельская область,Нефтебаза Транс-Лес,1,Зим.,0.0
2,"ПАО ""ЛУКОЙЛ""",Архангельская область,Нефтебаза Транс-Лес,1,Лет.,0.000476
3,"ПАО ""ЛУКОЙЛ""",Архангельская область,Нефтебаза Транс-Лес,1,Межсезонное,0.0
4,"ПАО ""ЛУКОЙЛ""",Архангельская область,Нефтебаза Транс-Лес,1,ОЧ-78/80,0.0


  uniques = Index(uniques)


Unnamed: 0,company,region,oilbase,date,product,vol_emk
0,"ПАО ""ЛУКОЙЛ""",Архангельская область,Нефтебаза Транс-Лес,1,Арктич.,0.0
1,"ПАО ""ЛУКОЙЛ""",Архангельская область,Нефтебаза Транс-Лес,1,Зим.,0.0
2,"ПАО ""ЛУКОЙЛ""",Архангельская область,Нефтебаза Транс-Лес,1,Лет.,0.0
3,"ПАО ""ЛУКОЙЛ""",Архангельская область,Нефтебаза Транс-Лес,1,Межсезонное,0.0
4,"ПАО ""ЛУКОЙЛ""",Архангельская область,Нефтебаза Транс-Лес,1,ОЧ-78/80,0.0


In [18]:
df_to_sql = final_df(df_ost, df_emk)

Unnamed: 0,company_id,oilbase_id,date,product_id,vol_ost,vol_emk
112,1,414,2024-06-15,3,0.0,0.0
113,1,414,2024-06-15,6,0.609791,1.325137
114,1,414,2024-06-15,8,0.380463,0.515791
115,1,414,2024-06-15,10,0.0,0.0
116,1,414,2024-06-15,14,1.099775,2.298162


In [19]:
df_to_sql['date'].values[0]

'2024-06-15'

In [20]:
max_date_DB = prognoz_query("""select max("date") from zm05_new""").astype(str)
try:
    if df_to_sql['date'].values[0] <= max_date_DB.values[0]:
        print('Ошибка. Эксель файл не обновлен или данные уже загружены')
        execute_to_db = False
except:
    print('Ошибка. Эксель файл не обновлен')
    execute_to_db = False
    

if execute_to_db:
    engine = create_engine('postgresql+psycopg2://prognoz:prognoz@192.168.245.37:5432/prognoz')
    df_to_sql.to_sql('zm05_new', engine, if_exists='append', index=False)
    print(f"Данные в таблицу zm05_new за {df_to_sql['date'].max()} - загружены")

Ошибка. Эксель файл не обновлен или данные уже загружены




In [22]:
execute_to_db = True

if execute_to_db:
    engine = create_engine('postgresql+psycopg2://prognoz:prognoz@192.168.245.37:5432/prognoz')
    df_to_sql.to_sql('zm05_new', engine, if_exists='append', index=False)
    print(f"Данные в таблицу zm05_new за {df_to_sql['date'].max()} - загружены")

Данные в таблицу zm05_new за 2024-06-15 - загружены


In [23]:
len(df_to_sql['oilbase_id'].unique())

238

In [24]:
df_to_sql[(df_to_sql['company_id']==14) & (df_to_sql['oilbase_id']==896)]

Unnamed: 0,company_id,oilbase_id,date,product_id,vol_ost,vol_emk


In [25]:
df_to_sql[(df_to_sql['oilbase_id']==93)]

Unnamed: 0,company_id,oilbase_id,date,product_id,vol_ost,vol_emk
