In [None]:
import pandas as pd
import numpy as np
from datetime import timedelta
from datetime import datetime
import os

import telegram

from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from airflow.models import Variable

login = 'm.mihaelis-15'
year = 1994 + hash(f'{login}') % 23
path_file = '/var/lib/airflow/airflow.git/dags/m.mihaelis-15/sales.csv'

default_args = {
    'owner': 'm.mihaelis',
    'depends_on_past': False,
    'retries': 4,
    'retry_delay': timedelta(minutes=10),
    'start_date': datetime(2021, 11, 2),
    'schedule_interval': '00 10 * * *'
}

@dag(default_args=default_args, catchup=False)
def m_mihaelis_lesson3():
    @task(retries=3)
    def get_data():
        data = pd.read_csv(path_file)
        data = data.query('Year == @year')
        return data

# 1 Самая продаваемая игра
    @task()
    def get_top_game(data):
        top_game = data.sort_values('Global_Sales', ascending=False) \
                       .iloc[0]['Name']
        return top_game

# 2 Самые продаваемые игры в Европе
    @task()
    def get_top_genre_eu(data):
        genre_eu = data.groupby('Genre', as_index=False) \
                       .agg({'EU_Sales': 'sum'}) \
                       .sort_values('EU_Sales', ascending=False)
        genre_eu_sales_max = genre_eu['EU_Sales'].max()
        top_genre_eu = genre_eu.query('EU_Sales == @genre_eu_sales_max')['Genre'].unique()
        return top_genre_eu

# 3 Платформа с большим количеством игр
    @task()
    def get_top_platform_na(data):
        platform_na = data.query('NA_Sales > 1') \
                         .groupby('Platform', as_index=False) \
                         .agg({'NA_Sales': 'count'}) \
                         .sort_values('NA_Sales', ascending=False)
        platform_games_na_max = platform_na['NA_Sales'].max()
        top_platform_na = platform_na.query('NA_Sales == @platform_games_na_max')['Platform'].unique()
        return top_platform_na

# 4 Издатель с самыми высокими продажами
    @task()
    def get_top_publisher_jp(data):
        publisher_jp = data.groupby('Publisher', as_index=False) \
                           .agg({'JP_Sales': 'mean'}) \
                           .sort_values('JP_Sales', ascending=False)
        publisher_mean_sales_jp_max = publisher_jp['JP_Sales'].max()
        top_publisher_jp = publisher_jp.query('JP_Sales == @publisher_mean_sales_jp_max')['Publisher'].unique()
        return top_publisher_jp

# 5 Европа vs Япония
    @task()
    def get_games_sales_eu_bet_jp(data):
        count_better_in_eu = data.query('Year == @year and EU_Sales > JP_Sales') \
                                 ['Name'].nunique()
        return count_better_in_eu


    @task()
    def print_data(top_game, top_genre_eu, top_platform_na, top_publisher_jp, count_better_in_eu):

        context = get_current_context()
        date = context['ds']

        print(f'''Game sales data in {year} for {date}:
        Best selling game in the world:
            {top_game}
        Best-selling genre in Europe:
            {', '.join(top_genre_eu)}
        Most popular platform for popular games in North America (over 1 million copies sold):
            {', '.join(top_platform_na)}
        Best Publisher in Japan (Highest Average Game Sales):
            {', '.join(top_publisher_jp)}
        The number of games sold in Europe is better than in Japan:
            {count_better_in_eu}''')
        

    data = get_data()
    top_game = get_top_game(data)
    top_genre_eu = get_top_genre_eu(data)
    top_platform_na = get_top_platform_na(data)
    top_publisher_jp = get_top_publisher_jp(data)
    count_better_in_eu = get_games_sales_eu_bet_jp(data)
    print_data(top_game, top_genre_eu, top_platform_na, top_publisher_jp, count_better_in_eu)
        
m_mihaelis_lesson3 = m_mihaelis_lesson3()