## Prefect

In [1]:
from prefect import task, Flow
from prefect.tasks.secrets import PrefectSecret
from prefect.tasks.postgres import PostgresExecute, PostgresFetch
import psycopg2 as pg
import requests
import pandas as pd
import time
import yaml
import requests
import datetime



In [2]:
with open("../config.yml", "r") as ymlfile:
    cfg = yaml.load(ymlfile)
api_key = cfg['riotgamesapi']['key']
username = cfg['postgresql']['user']
password = cfg['postgresql']['password']
database = 'teamfighttactics'

  cfg = yaml.load(ymlfile)


In [3]:
query_header = {
        "X-Riot-Token": api_key
}
challenger_url = 'https://euw1.api.riotgames.com/tft/league/v1/challenger'
challenger_response = requests.get(url = challenger_url, headers=query_header).json()

In [5]:
@task
def update_challenger_league(username, password, db_name, api_key):
    """A task that requires credentials to access something. Passing the
    credentials in as an argument allows you to change how/where the
    credentials are loaded (though we recommend using `PrefectSecret` tasks to
    load them."""
    query_header = {
            "X-Riot-Token": api_key
    }
    challenger_url = 'https://euw1.api.riotgames.com/tft/league/v1/challenger'
    challenger_response = requests.get(url = challenger_url, headers=query_header).json()
    challenger_response_df = pd.DataFrame(challenger_response['entries'])

    for i in range(0, challenger_response_df.shape[0]):
        summonerId = str(challenger_response_df["summonerId"][i])
        summonerName = str(challenger_response_df["summonerName"][i])
        leaguePoints = int(challenger_response_df["leaguePoints"][i])

        puuid_url = f'https://euw1.api.riotgames.com/tft/summoner/v1/summoners/{summonerId}'
        puuid_response = requests.get(url = puuid_url, headers=query_header).json()
        puuid = str(puuid_response["puuid"])

        time.sleep(1.6)

        user_query = '''
            INSERT INTO public.users(
                id, summonername, summonerid, rankedleague, puuid, leaguepoints, region)
                VALUES (DEFAULT, %s, %s, 'CHALLENGER', %s, %s , 'EUW1')
                '''
        query_data = (summonerName, summonerId, puuid,leaguePoints,)

        PostgresExecute(
            db_name=db_name,
            user=username,
            host="127.0.0.1",
            port=5432,
            query=user_query,
            data=query_data,
            commit=True,
        ).run(password=password)

In [6]:
with Flow("Hello") as flow:
    username = PrefectSecret("USERNAME")
    password = PrefectSecret("PASSWORD")
    dbname= 'teamfighttactics' 
    update_challenger_league(username = username, password = password,db_name = dbname, api_key = api_key )

flow.run()

[2021-10-27 23:35:36+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Hello'
[2021-10-27 23:35:36+0100] INFO - prefect.TaskRunner | Task 'PASSWORD': Starting task run...
[2021-10-27 23:35:37+0100] INFO - prefect.TaskRunner | Task 'PASSWORD': Finished task run for task with final state: 'Success'
[2021-10-27 23:35:37+0100] INFO - prefect.TaskRunner | Task 'USERNAME': Starting task run...
[2021-10-27 23:35:37+0100] INFO - prefect.TaskRunner | Task 'USERNAME': Finished task run for task with final state: 'Success'
[2021-10-27 23:35:37+0100] INFO - prefect.TaskRunner | Task 'update_challenger_league': Starting task run...
[2021-10-27 23:41:45+0100] INFO - prefect.TaskRunner | Task 'update_challenger_league': Finished task run for task with final state: 'Success'
[2021-10-27 23:41:45+0100] INFO - prefect.FlowRunner | Flow run SUCCESS: all reference tasks succeeded


<Success: "All reference tasks succeeded.">

In [12]:
@task
def get_match_history(username, password, db_name, api_key):
    """A task that requires credentials to access something. Passing the
    credentials in as an argument allows you to change how/where the
    credentials are loaded (though we recommend using `PrefectSecret` tasks to
    load them."""

    query_header = {"X-Riot-Token": api_key}

    user_query = "SELECT puuid FROM users"
    password = password
    puuids = PostgresFetch(
        db_name=db_name,
        user=username,
        host="127.0.0.1",
        port=5432,
        query=user_query,
        data=None,
        fetch="all",
        commit=True,
    ).run(password=password)

    for i in puuids:
        the_puuid = i[0]
        match_id_url = f"https://europe.api.riotgames.com/tft/match/v1/matches/by-puuid/{the_puuid}/ids?count=20"
        recent_match_ids = requests.get(url=match_id_url, headers=query_header).json()
        time.sleep(2)

        match_id_query = "SELECT matchid FROM matchdata WHERE matchid = ANY(%s);"
        match_id_data = (recent_match_ids,)
        existing_ids = PostgresFetch(
            db_name=dbname,
            user=username,
            host="127.0.0.1",
            port=5432,
            query=match_id_query,
            data=match_id_data,
            fetch="all",
            commit=True,
        ).run(password=password)

        existing_ids = [i[0] for i in existing_ids]
        new_matches = set(recent_match_ids).symmetric_difference(set(existing_ids))
        new_matches = list(new_matches)

        for j in new_matches:
            match_detail_url = (
                f"https://europe.api.riotgames.com/tft/match/v1/matches/{j}"
            )
            time.sleep(2)
            match_detail = requests.get(match_detail_url, headers=query_header).json()
            print(match_detail.keys())
            match_time = datetime.datetime.fromtimestamp(
                match_detail["info"]["game_datetime"] / 1e3
            ).strftime("%Y-%m-%d %H:%M:%S")
            time_now = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")

            match_query = """
                    INSERT INTO matchdata VALUES (DEFAULT, %s,%s,%s, 'EUW')
                    """

            match_data = (
                j,
                str(match_time),
                str(time_now),
            )

            PostgresExecute(
                db_name=dbname,
                user=username,
                host="127.0.0.1",
                port=5432,
                query=match_query,
                data=match_data,
                commit=True,
            ).run(password=password)

In [13]:
with Flow("Get Latest Matches") as flow:
    dbname = "teamfighttactics"
    get_match_history(
        username=username, password=password, db_name=dbname, api_key=api_key
    )

flow.run()

[2021-10-27 23:49:10+0100] INFO - prefect.FlowRunner | Beginning Flow run for 'Get Latest Matches'
[2021-10-27 23:49:11+0100] INFO - prefect.TaskRunner | Task 'PASSWORD': Starting task run...
[2021-10-27 23:49:11+0100] INFO - prefect.TaskRunner | Task 'PASSWORD': Finished task run for task with final state: 'Success'
[2021-10-27 23:49:11+0100] INFO - prefect.TaskRunner | Task 'USERNAME': Starting task run...
[2021-10-27 23:49:11+0100] INFO - prefect.TaskRunner | Task 'USERNAME': Finished task run for task with final state: 'Success'
[2021-10-27 23:49:11+0100] INFO - prefect.TaskRunner | Task 'get_match_history': Starting task run...
dict_keys(['metadata', 'info'])
dict_keys(['metadata', 'info'])
dict_keys(['metadata', 'info'])
dict_keys(['metadata', 'info'])
dict_keys(['metadata', 'info'])
dict_keys(['metadata', 'info'])
dict_keys(['metadata', 'info'])
dict_keys(['metadata', 'info'])
dict_keys(['metadata', 'info'])
dict_keys(['metadata', 'info'])
dict_keys(['metadata', 'info'])
dict_ke

KeyboardInterrupt: 