In [None]:
import requests
import pandas as pd
from datetime import timedelta
from datetime import datetime

from airflow import DAG
from airflow.operators.python import PythonOperator

TOP_1M_DOMAINS = 'https://storage.yandexcloud.net/kc-startda/top-1m.csv'
TOP_1M_DOMAINS_FILE = 'top-1m.csv'


def get_data():
    top_doms = pd.read_csv(TOP_1M_DOMAINS)
    top_data = top_doms.to_csv(index=False)

    with open(TOP_1M_DOMAINS_FILE, 'w') as f:
        f.write(top_data)


def get_top10_domainnzones():
    df = pd.read_csv(TOP_1M_DOMAINS_FILE, names=['rank', 'domain'])
    df['domainzone'] = df.domain.str.split('.', expand=True)[1]
    domzinzones_counting = df.groupby('domainzone', as_index=False) \
    .agg({'domain':'count'}) \
    .rename(columns={'domain':'amount'}) \
    .sort_values('amount', ascending=False)
    
    top_10_domainzones = domzinzones_counting.head(10)
    top_10_domainzones.reset_index(drop=True, inplace=True)
  
    with open('top_10_domainzones.csv', 'w') as f:
        f.write(top_10_domainzones.to_csv(index=False, header=False))


def get_longest_domain():
    df = pd.read_csv(TOP_1M_DOMAINS_FILE, names=['rank', 'domain'])
    df['domain_name'] = df.domain.str.split('.', expand=True)[0]
    df['length_of_domainname'] = df['domain_name'].str.len()
    longest_domain = df.loc[df['length_of_domainname'] == df['length_of_domainname'].max()].sort_values('domain_name')[:1]
    
    longest_domain = longest_domain.domain_name.get(key=777626)
    
    with open('longest_domain.csv', 'w') as f:
        f.write(longest_domain)
        
def find_airflow_rank():
    df = pd.read_csv(TOP_1M_DOMAINS_FILE, names=['rank', 'domain'])
   
    rank = df.loc[df['domain'] == "airflow.com"]['rank'].values[0]
    rank = str(rank)
    
    with open('airflow_rank.csv', 'w') as f:
        f.write(rank)


def print_data(ds):
    with open('top_10_domainzones.csv', 'r') as f:
        top_10 = f.read()
    with open('longest_domain.csv', 'r') as f:
        longest_domain = f.read()
    with open('airflow_rank.csv', 'r') as f:
        airflow_rank = f.read()
    date = ds

    print(f'Top 10 domains zones for date {date}')
    print(top_10)

    print(f'The longest domain name for date {date}')
    print(longest_domain)
    
    print(f'Domain "airflow.com" has the following rank for date {date}')
    print(airflow_rank)


default_args = {
    'owner': 's.chebrikov',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 10, 16),
}
schedule_interval = '50 11 * * *'

dag_chebrikov = DAG('dag_chebrikov', default_args=default_args, schedule_interval=schedule_interval)

t1 = PythonOperator(task_id='get_data',
                    python_callable=get_data,
                    dag=dag_chebrikov)

t2 = PythonOperator(task_id='get_top10_domainnzones',
                    python_callable=get_top10_domainnzones,
                    dag=dag_chebrikov)

t2_longest = PythonOperator(task_id='get_longest_domain',
                        python_callable=get_longest_domain,
                        dag=dag_chebrikov)

t2_airflow_rank = PythonOperator(task_id='find_airflow_rank',
                        python_callable=find_airflow_rank,
                        dag=dag_chebrikov)

t3 = PythonOperator(task_id='print_data',
                    python_callable=print_data,
                    dag=dag_chebrikov)

t1 >> [t2, t2_longest, t2_airflow_rank] >> t3