In [None]:
import requests
from datetime import datetime
from airflow import DAG
from airflow.operators.postgres_operator import PostgresOperator
from airflow.hooks.postgres_hook import PostgresHook
from airflow.operators.python_operator import PythonOperator
from airflow.operators.dummy_operator import DummyOperator
from requests_ntlm import HttpNtlmAuth
import smtplib
import time
import warnings
warnings.simplefilter(action = 'ignore')
import keyring
from dateutil.relativedelta import relativedelta
from itertools import product
import base64
import os
import pandas as pd
from IPython.display import display
from email.mime.text import MIMEText
from email.mime.application import MIMEApplication
from email.mime.multipart import MIMEMultipart
from smtplib import SMTP
import sys




def _request_data(**context):
    ###Авторизация
    auth = HttpNtlmAuth("Login", "password")


    ###Тянем информацию по отчетам
    def folder_report_info(folder_id):
        url = f'http://server//reports/api/v2.0/Folders({folder_id})/CatalogItems'
        response = requests.get(url, auth=auth, verify=False)
        resp_json = response.json()
        df_folder = pd.DataFrame(resp_json['value'])
        df_folder['Folder'] = (df_folder["Path"]
                                         .str.split('/', 3).str[2])
        return df_folder


    ###Собираем фреймы
    df_1 = folder_report_info('id') #Id папки PBI RS
    df_2 = folder_report_info('id') #Id папки PBI RS
    df_3 = folder_report_info('id') #Id папки PBI RS
    df_info_ = [df_1,df_2,df_3] #Объединение в единый датафрейм
    df_info_ = pd.concat(df_info_) #Объединение в единый датафрейм


    ###Собираем ссылки на отчеты для статусов обновления
    http_1 = 'http://server//reports/api/v2.0/PowerBIReports('
    http_2 = ')/CacheRefreshPlans'
    url = [http_1 + x + http_2 for x in df_info_['Id']] #из предыдущей таблицы с основной информацией об отчетах.


    ###Собираем статусы обновления по отчетам
    df_info_refresh = [] #объявляем пустой список, в который будем складывать записи по api

    #Собираем данные для фрейма
    for i in range(len(url)): #цикл по ранее полученным ссылкам на отчеты
        response = requests.get(str(url[i]), auth=auth, verify=False) #подключение по api
        resp_json = response.json() #складываем в json
        df_refresh = pd.DataFrame(resp_json['value']) #складываем в фрейм
        for j in range(len(df_refresh)): ##для чтения всех записей фреймов
            df_info_refresh.append(df_refresh) #добавляем очередную запись в итогоый список
    df_info_refresh = pd.concat(df_info_refresh) #склеииваем записи в единый фрейм


def _parse_data(**context):
    #Обработка фрейма
    df_info_refresh = ( 
    df_info_refresh[['Id', 'Owner', 'Description', 
                             'CatalogItemPath', 'EventType',
                             'ScheduleDescription', 'LastRunTime', 
                             'LastStatus', 'ModifiedBy','ModifiedDate']]) #объявляем все строки, которые нужны для выгрузки

    ###Все статусы обновлений, в т.ч. успешные
    df_info_refresh = df_info_refresh.drop_duplicates(keep='last') #удаляем дубликаты (не последние записи)
    df_info_refresh["ReportName"] = (df_info_refresh["CatalogItemPath"]
                                         .str.split('/', -1).str[-1]) #создаем столбец с названием отчета



    ###Что будем отправлять
    df_failed_refresh = (df_info_refresh[['Owner','ReportName','CatalogItemPath',
                                              'Description','LastRunTime','LastStatus']]
                     [df_info_refresh['LastStatus']!='Completed Data Refresh'])


def _send_data(**context):
    ###Объявление кредов
    recipients = ['mail1.ru','mail2.ru'] #Список получателй
    emaillist = [elem.strip().split(',') for elem in recipients]
    msg = MIMEMultipart()
    msg['Subject'] = "Daily Failed Refresh PBI Reporting" #Тема письма
    FROMADDR = "you@gmail.com" #С какого почтового адреса будет отправка
    LOGIN    = FROMADDR #Логин/Отправитель
    PASSWORD = "yourcode" #полученный код


    ###Заготовка письма
    html = """\
    <html>
      <head></head>
      <body>
        {0}
      </body>
    </html>
    """.format(df_failed_refresh.to_html()) #Формирование фрейма для отображения в письме

    part1 = MIMEText(html, 'html')
    msg.attach(part1)


    ###Отправка письма
    server = smtplib.SMTP('smtp.gmail.com', 587) #Отправка письма
    server.set_debuglevel(1)
    server.set_debuglevel(1)
    server.ehlo()
    server.starttls()
    server.login(LOGIN, PASSWORD)
    server.sendmail(FROMADDR, emaillist , msg.as_string())
    server.quit() #Закрытие подключения



args = {'Your Name': 'Topic'}

dag = DAG(
    dag_id="temp",
    default_args=args,
    description='DAG_PBI_Alerting',
    start_date=datetime(2023, 1, 1),
    schedule_interval='0 9,13,15,18 * * *',
    catchup=False,
)


start = DummyOperator(task_id="start", dag=dag)

request_data = PythonOperator(
    task_id='request_data', python_callable=_request_data, provide_context=True, dag=dag)
parse_data = PythonOperator(
    task_id='parse_data', python_callable=_parse_data, provide_context=True, dag=dag)
send_data = PythonOperator(
    task_id='parse_data', python_callable=_send_data, provide_context=True, dag=dag)



start >> request_data >> parse_data >> send_data 