# Класс для преобразования запией из парсера в таблицы из БД

## пайплайн:
* 1. выгрузка логов с сервера
* 2. трансформация логов в таблицы из БД
* 3. добавление записей (трансформированные логи) в БД

## схема БД:


In [123]:
import pandas as pd
import numpy as np
import pickle, json, os
from tqdm import tqdm_notebook
import warnings
warnings.filterwarnings('ignore')
from sqlalchemy import create_engine
import pymysql
from collections import defaultdict
from datetime import datetime

In [135]:
class LogTransformerCSGO():
    
    def __init__(self, path_to_log, engine):
        self.path_to_log = path_to_log 
        self.engine = engine 
    def transform_logs(self):
        L_rows = []
        with open(PATH_TO_LOG, 'r') as f:
            json_log = json.load(f)
            try:
                for log in tqdm_notebook(json_log):
                    df_row = pd.json_normalize(log)
                    L_rows.append(df_row)
            except:
                pass
        df = pd.concat(L_rows).reset_index(drop = True)
        cs = []
        for col in df.columns:
            for c in col.split('.'):
                if (c =='id') | ('_id' in c):
                    cs.append(col)
        idx_columns = list(set(cs))
        df = df[~df[idx_columns].isna().any(1)]
        self.df = df[df.columns[~df.isna().all()]]
        return self    
    
    def get_tables(self):
        
        ##################################################################################################################
        def _get_league_dict(subdf):
            league_ser = pd.concat([subdf['match.league_id'].drop_duplicates(),
                                        subdf['match.league.name'].drop_duplicates(),
                                        subdf['match.league.modified_at'].drop_duplicates(),
                                        subdf['match.league.slug'].drop_duplicates()])
            try:
                league_ser.index = ['id', 'name', 'modified_at', 'slug']
                league_d = league_ser.to_dict()
            except:
                league_d = None
            return league_d

        def _get_serie_dict(subdf):
            serie_ser = pd.concat([subdf['match.serie_id'].drop_duplicates(),
                                       subdf['match.league_id'].drop_duplicates(),
                                       subdf['match.serie.name'].drop_duplicates(),
                                       subdf['match.serie.full_name'].drop_duplicates(),
                                       subdf['match.serie.season'].drop_duplicates(),
                                       subdf['match.serie.begin_at'].drop_duplicates(),
                                       subdf['match.serie.end_at'].drop_duplicates(),
                                       subdf['match.serie.modified_at'].drop_duplicates(),
                                       subdf['match.serie.tier'].drop_duplicates()])
            try:
                serie_ser.index = ['id', 'league_id', 'name', 'full_name', 'season', 'begin_at', 'end_at', 'modified_at', 'tier']
                serie_d = serie_ser.to_dict()
            except:
                serie_d = None  
            return serie_d

        def _get_tournament_dict(subdf):
            tournament_ser = pd.concat([subdf['match.tournament.id'].drop_duplicates(),
                                       subdf['match.serie.id'].drop_duplicates(),
                                       subdf['match.tournament.name'].drop_duplicates(),
                                       subdf['match.tournament.begin_at'].drop_duplicates(),
                                       subdf['match.tournament.end_at'].drop_duplicates(),
                                       subdf['match.tournament.winner_id'].drop_duplicates(),
                                       subdf['match.tournament.prizepool'].drop_duplicates(),
                                       subdf['match.tournament.modified_at'].drop_duplicates()])
            try:
                tournament_ser.index = ['id', 'serie_id', 'name', 'begin_at', 'end_at', 'winner_id', 'prizepool', 'modified_at']
                tournament_d = tournament_ser.to_dict()
            except:
                tournament_d = None  
            return tournament_d

        def _get_match_dict(subdf):
            L_row_matches = []
            for _id, _subdf2 in subdf.groupby('id'):
                match_ser = pd.concat([_subdf2['match_id'].drop_duplicates(),
                                   _subdf2['match.tournament.id'].drop_duplicates(),
                                   _subdf2['map.id'].drop_duplicates(),
                                   _subdf2['match.winner_id'].drop_duplicates(),
                                   _subdf2['match.begin_at'].drop_duplicates(),
                                   _subdf2['match.end_at'].drop_duplicates(),
                                   _subdf2['length'].drop_duplicates(),
                                   _subdf2['position'].drop_duplicates(),
                                   _subdf2['status'].drop_duplicates()])
                try:
                    match_ser.index = ['id', 'tournament_id', 'map_id', 'winner_id',\
                                           'begin_at', 'end_at', 'length', 'position', 'status']
                    match_d = match_ser.to_dict()
                except:
                    match_d = None  
                L_row_matches.append(match_d)
            return L_row_matches

        def _get_game_dict(subdf):
            L_row_games = []
            for idx in subdf['match.games'].astype(str).drop_duplicates().index:
                row_df = subdf.loc[idx]
                try:
                    for game in row_df['match.games']:
                        _subdf = subdf[subdf['begin_at'] == game['begin_at']]
                        game_ser = _subdf['match_id'].drop_duplicates()
                        game_ser = pd.concat([pd.Series(game['id'], index = game_ser.index),
                                                  game_ser])
                        game_ser.index = ['id', 'match_id']

                        for i, rounds_score in enumerate(_subdf['rounds_score'].iloc[0]):
                            game_ser[f'team{i+1}_id'] = rounds_score['team_id']
                            game_ser[f'score{i+1}'] = rounds_score['score']
                        L_row_games.append(game_ser.to_dict())
                except:
                    pass
            return L_row_games

        def _get_round_dict(subdf):
            L_row_rounds = []
            for idx in subdf['match.games'].astype(str).drop_duplicates().index:
                row_df = subdf.loc[idx]
                try:
                    for game in row_df['match.games']:                
                        _subdf = subdf[subdf['begin_at'] == game['begin_at']]
                        _df_rounds = pd.DataFrame(_subdf['rounds'].iloc[0])
                        _df_rounds['game_id'] = game['id']
                        _df_rounds = _df_rounds.rename(columns = {'ct':'ct_id', 'terrorists':'t_id'})                
                        L_row_rounds.append(_df_rounds)
                except:
                    pass
            return L_row_rounds

        def _get_team_dict(subdf):
            L_teams =[]
            for _d in subdf['teams'].iloc[0]:
                L_teams.append({'id':_d['id'],\
                                    'modified_at':_d['modified_at'],\
                                    'name':_d['name'],\
                                    'location':_d['location']})
            return L_teams

        def _get_player_dict(subdf):
            L_d_players = []    
            for idx in subdf['players'].astype(str).drop_duplicates().index:
                _s = subdf.loc[idx]
                for player in _s['players']:    
                    _d = player['player']
                    _d['team_id'] = player['team']['id']

                    d_player = {'id':_d['id'],'name':_d['name'], 'first_name':_d['first_name'],\
                                    'last_name':_d['last_name'], 'birthday':_d['birthday'],\
                                    'hometown':_d['hometown'], 'nationality':_d['nationality']}
                    L_d_players.append(d_player)
            return L_d_players

        def _get_stat_dict(subdf):
            L_d_players_stat = []
            _subdf = subdf.drop_duplicates('id')
            for i in _subdf.index:
                _row = _subdf.loc[i]
                for game in _row['match.games']:
                    if _row['begin_at'] == game['begin_at']:
                        for _i, player in enumerate(_row['players']):                
                            d_player_stat = {'player_number':_i, 'game_id':game['id'], 'player_id':player['player']['id'],\
                                                 'adr':player['adr'], 'assists':player['assists'],\
                                                 'kills':player['kills'], 'deaths':player['deaths'],\
                                                 'headshots':player['headshots'], 'flash_assists':player['flash_assists'],\
                                                 'k_d_diff':player['k_d_diff'], 'kast':player['kast'],\
                                                 'rating':player['rating']}
                            L_d_players_stat.append(d_player_stat)
            return L_d_players_stat

        def _get_map_dict(subdf):
            d_map = dict(zip(subdf['map.id'], subdf['map.name']))
            return d_map

        def _convert_types(df):
            L = []
            for col in df.columns:
                ser = df[col]
                try:
                    ser_dt = ser.astype('datetime64')
                    L.append(ser_dt)
                except:
                    try:
                        ser_int = ser.astype(int)
                        _ser = ser
                        if (ser_int == _ser).all():
                            L.append(ser_int)                
                    except:
                        try:
                            ser_float = ser.astype(float)
                            L.append(ser_float) 
                        except:
                            L.append(ser)  
            return pd.concat(L, 1)          
        ##################################################################################################################
                
        df = self.df        
        DD = defaultdict(list)
        for match_id, subdf in tqdm_notebook(df.groupby('match_id')):


            try:
                DD['Maps'].append(_get_map_dict(subdf))
            except:
                pass

            try:
                DD['Teams'].extend(_get_team_dict(subdf))
            except:
                pass
            try:
                DD['Players'].extend(_get_player_dict(subdf))
            except:
                pass

            try:
                DD['Leagues'].append(_get_league_dict(subdf))    
            except:
                pass
            try:
                DD['Series'].append(_get_serie_dict(subdf))    
            except:
                pass
            try:
                DD['Tournaments'].append(_get_tournament_dict(subdf))    
            except:
                pass
            try:
                DD['Matches'].extend(_get_match_dict(subdf))    
            except:
                pass
            try:
                DD['Games'].extend(_get_game_dict(subdf))    
            except:
                pass            
            try:
                DD['Rounds'].append(_get_round_dict(subdf))    
            except:
                pass
            try:
                DD['Stats'].extend(_get_stat_dict(subdf))    
            except:
                pass

        DD2 = {}
        for k in ['Teams', 'Players', 'Leagues', 'Series', 'Tournaments', 'Matches', 'Games']:
            try:
                DD2[k] = _convert_types(pd.DataFrame.from_records(DD[k])\
                                            .drop_duplicates(subset = ['id'])\
                                            .reset_index(drop = True))
            except:                
                DD2[k] = _convert_types(pd.concat(DD[k])\
                                            .drop_duplicates(subset = ['id'])\
                                            .reset_index(drop = True))
        l_dfs= []
        for dfs in DD['Rounds']:
            for df in dfs:
                l_dfs.append(df)
        DD2['Rounds'] = _convert_types(pd.concat(l_dfs)).reset_index(drop = True)   
        DD2['Stats'] = _convert_types(pd.DataFrame.from_records(DD['Stats'])).reset_index(drop = True)  
         

        d_map = {}
        for _d in DD['Maps']:
            for k, v in _d.items():
                if k not in list(d_map.keys()):
                    d_map[k] = v    
        DD2['Maps'] = _convert_types(pd.Series(d_map).to_frame('name')\
                                     .reset_index().rename(columns = {'index':'id'}))\
                                     .drop_duplicates(subset = ['id']).reset_index(drop = True)     
        self.d_tables=DD2
        return DD2  
    
    def load_tables_to_database(self):
        
        df_Players =self.d_tables['Players']
        df_Players.to_sql('Players', con=connection, index=False, if_exists='append')

        df_Teams =self.d_tables['Teams']
        df_Teams.to_sql('Teams', con=connection, index=False, if_exists='append')
        Teams_touse= df_Teams['id'].unique()

        df_Leagues =self.d_tables['Leagues']
        df_Leagues.to_sql('Leagues', con=connection, index=False, if_exists='append')

        df_Series =self.d_tables['Series']
        df_Series.to_sql('Series', con=connection, index=False, if_exists='append')

        df_tournaments = self.d_tables['Tournaments']
        Tournaments_touse = df_tournaments['id'].unique()
        df_tournaments= df_tournaments[df_tournaments['winner_id'].isin(Teams_touse)].reset_index(drop = True)
        df_tournaments.to_sql('Tournaments', con=connection, index=False, if_exists='append')

        df_maps=self.d_tables['Maps']
        Maps_touse = df_maps['id'].unique()
        df_maps.to_sql('Maps', con=connection, index=False, if_exists='append')  

        df_matches = self.d_tables['Matches']
        df_matches = df_matches[(df_matches['tournament_id'].isin(df_tournaments['id'].values)) &\
                                (df_matches['map_id'].isin(df_maps['id'].unique())) &\
                                (df_matches['winner_id'].isin(df_Teams['id'].unique()))].reset_index(drop = True) 
        df_matches.to_sql('Matches', con=connection, index=False, if_exists='append')

        df_games = self.d_tables['Games']
        df_games= df_games[(df_games['match_id'].isin(df_matches['id'])) &\
                           (df_games['team1_id'].isin(Teams_touse)) &\
                           (df_games['team2_id'].isin(Teams_touse))].reset_index(drop = True)
        df_games.to_sql('Games', con=connection, index=False, if_exists='append')

        df_rounds = self.d_tables['Rounds']
        df_rounds = df_rounds[(df_rounds['ct_id'].isin(df_Teams['id'])) &\
                              (df_rounds['t_id'].isin(df_Teams['id'])) &\
                              (df_rounds['winner_team'].isin(df_Teams['id'])) &\
                              (df_rounds['game_id'].isin(df_games['id']))].reset_index(drop = True)
        df_rounds.to_sql('Rounds', con=connection, index=False, if_exists='append')

        df_stats= self.d_tables['Stats']
        df_stats = df_stats[(df_stats['game_id'].isin(df_games['id']))&\
                             (df_stats['player_id'].isin(df_Players['id']))].reset_index(drop= True)
        df_stats.to_sql('Stats', con=connection, index=False, if_exists='append')

        df_Teams4Players = pd.concat([df_Teams['id'].to_frame('team_id'),\
                                      df_Players['id'].to_frame('player_id')],1)
        df_Teams4Players.to_sql('Teams4Players', con=connection, index=False, if_exists='append')
        
        today = str(np.datetime64(datetime.today().date()))
        print('Database was updated {} ...'.format(today))

In [130]:
PATH_TO_LOG = r'C:\Users\Sergey\Desktop\logs\log.txt'

# Credentials to database connection
hostname="localhost"
dbname="csgo_db"
uname="root"
pwd="serrg123456"

# Create SQLAlchemy engine to connect to MySQL Database
CONNECTION = create_engine("mysql+pymysql://{user}:{pw}@{host}/{db}"\
                            .format(host=hostname, db=dbname, user=uname, pw=pwd))

In [131]:
logtransformer = LogTransformerCSGO(PATH_TO_LOG, CONNECTION)

In [132]:
logtransformer.transform_logs()

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=602.0), HTML(value='')))




<__main__.LogTransformerCSGO at 0x18022ea8580>

In [133]:
d_tables = logtransformer.get_tables()

HBox(children=(HTML(value=''), FloatProgress(value=0.0, max=67.0), HTML(value='')))




In [134]:
logtransformer.load_tables_to_database()

Database was updated 2021-07-30 ...


In [None]:
d_tables