In [2]:
from airflow.decorators import dag, task
from airflow.operators.python import get_current_context
from datetime import timedelta
from datetime import datetime

import pandas as pd

In [1]:
default_args = {
    'owner': 'me',
    'depends_on_past': False,
    'retries': 2,
    'retry_delay': timedelta(minutes=5),
    'start_date': datetime(2023, 4, 19)
}

@dag(default_args=default_args, schedule_interval='20 4 * * *', catchup=False)
def gvs_l2_dag():
    
    @task()
    def get_df():
        # Получение данных
        df = pd.read_csv('../a.batalov/vgsales.csv')
        return df

    @task() 
    def filter_df(df):
        # Фильтрация данных
        login = 'v-grabchuk-33'
        filter_year = 1994 + hash(f'{login}') % 23
        df = df.query('Year == @filter_year')
        return df
    
    @task() 
    def get_max_sales_game(df):
        # Какая игра была самой продаваемой в этом году во всем мире?
        return [df.set_index('Name').Global_Sales.idxmax()]
    
    @task()
    def get_max_sales_genre_eu(df):
        # Игры какого жанра были самыми продаваемыми в Европе? Перечислить все
        genre_ser = df.groupby('Genre').EU_Sales.sum()
        max_val = genre_ser.max()
        return list(genre_ser[genre_ser == max_val].index)
    
    @task() #3
    def get_max_sales_platform_na(df):
        # На какой платформе было больше всего игр, которые продались более чем 
        # миллионным тиражом в Северной Америке? Перечислить все
        platform_ser = df[df.NA_Sales > 1].groupby('Platform').NA_Sales.count()
        max_val = platform_ser.max()
        return list(platform_ser[platform_ser == max_val].index)
    
    @task()
    def get_max_sales_publisher_jp(df):
        # У какого издателя самые высокие средние продажи в Японии?
        # Перечислить все
        publisher_ser = df.groupby('Publisher').JP_Sales.mean()
        max_val = publisher_ser.max()
        return list(publisher_ser[publisher_ser == max_val].index)
    
    @task()
    def get_games_eu_better_jp(df):
        # Сколько игр продались лучше в Европе, чем в Японии?
        return df[df.EU_Sales > df.JP_Sales].Name.count()
    
    @task()
    def print_info(
        max_sales_game, 
        max_sales_genre_eu, 
        max_sales_platform_na,
        max_sales_publisher_jp, 
        games_eu_better_jp
    ):
        # Вывод информации
        context = get_current_context()
        date = context['ds']
        print(f'Log on {date}')

        print(f'1. Game with max sales:')
        print(max_sales_game)

        print(f'2. Games with max sales in EU:')
        print(max_sales_genre_eu)

        print(f'3. Platforms with max amount of popular games (sales > 1m) in NA:')
        print(max_sales_platform_na)

        print(f'Publisher with max mean sales in JP:')
        print(max_sales_publisher_jp)

        print(f'Amount of games that was better sold in EU than in JP')
        print(games_eu_better_jp)
        
    
    df = get_df()
    df_f = filter_df(df)
    max_sales_game = get_max_sales_game(df_f)
    max_sales_genre_eu = get_max_sales_genre_eu(df_f)
    max_sales_platform_na = get_max_sales_platform_na(df_f)
    max_sales_publisher_jp = get_max_sales_publisher_jp(df_f)
    games_eu_better_jp = get_games_eu_better_jp(df_f)
    print_info(max_sales_game, max_sales_genre_eu, max_sales_platform_na, max_sales_publisher_jp, games_eu_better_jp)


gvs_l2_dag = gvs_l2_dag()

Проверка функций на работоспособность

In [4]:
def get_df():
    # Получение данных
    df = pd.read_csv('vgsales.csv')
    return df

In [5]:
df = get_df()
df

Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
0,1,Wii Sports,Wii,2006.0,Sports,Nintendo,41.49,29.02,3.77,8.46,82.74
1,2,Super Mario Bros.,NES,1985.0,Platform,Nintendo,29.08,3.58,6.81,0.77,40.24
2,3,Mario Kart Wii,Wii,2008.0,Racing,Nintendo,15.85,12.88,3.79,3.31,35.82
3,4,Wii Sports Resort,Wii,2009.0,Sports,Nintendo,15.75,11.01,3.28,2.96,33.00
4,5,Pokemon Red/Pokemon Blue,GB,1996.0,Role-Playing,Nintendo,11.27,8.89,10.22,1.00,31.37
...,...,...,...,...,...,...,...,...,...,...,...
16593,16596,Woody Woodpecker in Crazy Castle 5,GBA,2002.0,Platform,Kemco,0.01,0.00,0.00,0.00,0.01
16594,16597,Men in Black II: Alien Escape,GC,2003.0,Shooter,Infogrames,0.01,0.00,0.00,0.00,0.01
16595,16598,SCORE International Baja 1000: The Official Game,PS2,2008.0,Racing,Activision,0.00,0.00,0.00,0.00,0.01
16596,16599,Know How 2,DS,2010.0,Puzzle,7G//AMES,0.00,0.01,0.00,0.00,0.01


In [6]:
login = 'v-grabchuk-33'
1994 + hash('v-grabchuk-33') % 23

1994

In [7]:
def filter_df(df):
    # Фильтрация данных
    login = 'v-grabchuk-33'
    filter_year = 1994 + hash(f'{login}') % 23
    df = df.query('Year == @filter_year')
    return df

In [8]:
df_f = filter_df(df)
df_f

Unnamed: 0,Rank,Name,Platform,Year,Genre,Publisher,NA_Sales,EU_Sales,JP_Sales,Other_Sales,Global_Sales
71,72,Donkey Kong Country,SNES,1994.0,Platform,Nintendo,4.36,1.71,3.00,0.23,9.30
184,185,Super Mario Land 3: Wario Land,GB,1994.0,Platform,Nintendo,2.49,0.98,1.57,0.15,5.19
305,306,Donkey Kong Land,GB,1994.0,Platform,Nintendo,1.97,0.76,1.07,0.11,3.91
353,354,Doom II: Hell on Earth,PC,1994.0,Shooter,Virgin Interactive,2.05,1.40,0.00,0.16,3.61
389,390,Final Fantasy III,SNES,1994.0,Role-Playing,SquareSoft,0.86,0.00,2.55,0.02,3.42
...,...,...,...,...,...,...,...,...,...,...,...
14996,14999,Bust-A-Move,3DO,1994.0,Puzzle,Micro Cabin,0.00,0.00,0.02,0.00,0.02
15255,15258,Sugoi Hebereke,SNES,1994.0,Fighting,Sunsoft,0.00,0.00,0.02,0.00,0.02
15256,15259,Pebble Beach Golf Links,SAT,1994.0,Sports,Sega,0.00,0.00,0.02,0.00,0.02
15687,15690,World Class Rugby 2: Kokunai Gekitou Hen '93,SNES,1994.0,Sports,Misawa,0.00,0.00,0.02,0.00,0.02


In [9]:
def get_max_sales_game(df):
    # Какая игра была самой продаваемой в этом году во всем мире?
    return df.set_index('Name').Global_Sales.idxmax()

In [10]:
get_max_sales_game(df_f)

'Donkey Kong Country'

In [11]:
def get_max_sales_genre_eu(df):
    # Игры какого жанра были самыми продаваемыми в Европе? 
    genre_ser = df.groupby('Genre').EU_Sales.sum()
    max_val = genre_ser.max()
    return list(genre_ser[genre_ser == max_val].index)

In [12]:
get_max_sales_genre_eu(df_f)

['Platform']

In [13]:
df_f.groupby('Genre').EU_Sales.sum().sort_values(ascending=False)

Genre
Platform        5.73
Adventure       2.81
Shooter         2.57
Strategy        1.08
Fighting        0.98
Misc            0.81
Sports          0.55
Simulation      0.23
Action          0.12
Role-Playing    0.00
Racing          0.00
Puzzle          0.00
Name: EU_Sales, dtype: float64

In [14]:
def get_max_sales_platform_na(df):
    # На какой платформе было больше всего игр, которые продались более чем 
    # миллионным тиражом в Северной Америке?
    platform_ser = df[df.NA_Sales > 1].groupby('Platform').NA_Sales.count()
    max_val = platform_ser.max()
    return list(platform_ser[platform_ser == max_val].index)

In [15]:
get_max_sales_platform_na(df_f)

['SNES']

In [17]:
df_f[df_f.NA_Sales > 1].groupby('Platform').NA_Sales.count().sort_values(ascending=False)

Platform
SNES    4
PC      3
GEN     3
GB      3
Name: NA_Sales, dtype: int64

In [18]:
def get_max_sales_publisher_jp(df):
    # У какого издателя самые высокие средние продажи в Японии?
    publisher_ser = df.groupby('Publisher').JP_Sales.mean()
    max_val = publisher_ser.max()
    return list(publisher_ser[publisher_ser == max_val].index)

In [19]:
get_max_sales_publisher_jp(df_f)

['SquareSoft']

In [20]:
df_f.groupby('Publisher').JP_Sales.mean().sort_values(ascending=False)

Publisher
SquareSoft                      1.435000
Nintendo                        1.125000
ASCII Entertainment             0.870000
ChunSoft                        0.810000
Hudson Soft                     0.610000
Tecmo Koei                      0.530000
Takara                          0.480000
Taito                           0.460000
Human Entertainment             0.365000
Media Rings                     0.360000
Enix Corporation                0.360000
Laguna                          0.360000
Namco Bandai Games              0.286667
Hect                            0.270000
Sony Computer Entertainment     0.257143
Konami Digital Entertainment    0.241429
ArtDink                         0.240000
Atlus                           0.225000
BPS                             0.200000
SNK                             0.200000
Capcom                          0.160000
Sega                            0.148846
Game Arts                       0.140000
Banpresto                       0.124000
Angel 

In [21]:
def get_games_eu_better_jp(df):
    # Сколько игр продались лучше в Европе, чем в Японии?
    return df[df.EU_Sales > df.JP_Sales].Name.count()

In [22]:
get_games_eu_better_jp(df_f)

21

Inspired by: KCM11L3MP