In [1]:
import pandas as pd
import numpy as np
import requests
# Connect to postgres
from sqlalchemy.dialects.postgresql import *
from sqlalchemy import create_engine

In [2]:
from schema import Tables
from sqlalchemy import create_engine, insert, update
from dataclasses import dataclass

@dataclass
class DBConfig:
    user : str
    pwd : str
    host : str
    port : int
    name: str
    schema: str
    
    @property
    def connection_string(self):
        return f"postgresql://{self.user}:{self.pwd}@{self.host}:{self.port}/{self.name}"

class DBManager:
    def __init__(self, dbConfig:DBConfig) -> None:
        self.config = dbConfig
        self.engine = self._init_engine()
        self.metadata = self._init_tables()
        

    def _init_engine(self):
        return create_engine(self.config.connection_string)

    def _init_tables(self):
        tables = Tables(schema=self.config.schema, engine=self.engine)
        try:
            metadata = tables._create()
            return metadata
        except Exception as e:
            print(f"Error creating tables\n{str(e)}")

    def _table(self, table_name:str):
        return self.metadata.tables[f"{self.config.schema}.{table_name}"]

    def insert(self, table, data):
        sql = (
            insert(self._table(table_name=table)).
            values(data)
        )
        return sql

    def select(self, table, filters:dict):
        pass

    def update(self, table, filters:dict, data):
        sql = (
            update(self._table(table_name=table)).
            where(filters)
        )

    def delete(self, table, filters:dict):
        pass

    def execute(self, sql):
        with self.engine.connect() as con:
            con.execute(sql)

In [3]:
from dataclasses import dataclass
from database import DBManager, DBConfig
import requests

@dataclass
class DataSourceConfig:
    url:str
    uri:str
    headers:dict

class DataCollector:
    def __init__(self,type) -> None:
        self.source = self._get_source(type)
        
    def collect(self):
        return requests.get(f"{self.source.url}/{self.source.uri}", headers=self.source.headers)

    def _get_source(self, type):
        if type == "area":
            uri = "areas"
        elif type == "teams":
            uri = "teams"
        elif type == "competitions":
            uri = "competitions"

        return DataSourceConfig(url="https://api.football-data.org/v4",
            uri=f"{uri}",
            headers= { 'X-Auth-Token': '7b12df0d8cf74ca29cc2926ebb4b00e1' })

class DataProcessor:
    def __init__(self, type) -> None:
        self.type = type

    def process(self, data):
        if self.type == "area":
            self.proc_data = self._process_area_data(data)
        elif self.type == "teams":
            self.proc_data = self._process_teams_data(data)
        elif self.type == "competitions":
            self.proc_data = self._process_competitions_data(data)
        return self.proc_data

    def _process_area_data(self, data):
        self.proc_data = []
        for area in data["areas"]:
            self.proc_data.append({
                "id" : area["id"],
                "name" : area["name"],
                "country_code" : area["countryCode"],
                "region" : area["parentArea"]
            })
        return self.proc_data

    def _process_teams_data(self, data):
        pass

    def _process_competitions_data(self, data):
        self.proc_data = []
        for comp in data["competitions"]:
            self.proc_data.append({
                "id": comp["id"],
                "name" : comp["name"],
                "area_id" : comp["area"]["id"],
                "code" : comp["code"],
                "type" : comp["type"]
            })
        return self.proc_data


class DataManager:
    def __init__(self, data_type) -> None:
        self.type = data_type
        self.db = self._get_db()
        self.collector = DataCollector(type=data_type)
        self.processor = DataProcessor(type=data_type)

    def _get_db(self):
        return DBManager(
            dbConfig=DBConfig(
                user="postgres",
                pwd="postgres",
                host="localhost",
                port="2345",
                name="football",
                schema="football_updates"
            )
        ) 

    def _get_data(self) -> dict:
        return self.collector.collect().json()

    def _process_data(self, data):
        return self.processor.process(data)

    def _ingest_data(self, proc_data, table):
        sql = self.db.insert(table=table, data=proc_data)
        self.db.execute(sql)


In [4]:

source = "https://api.football-data.org"
url = f"{source}/v4/competitions/"


headers = { 'X-Auth-Token': '7b12df0d8cf74ca29cc2926ebb4b00e1' }
response = requests.get(url, headers=headers)

In [5]:
series = pd.DataFrame(response.json()["standings"])
#df = series.to_frame()

KeyError: 'standings'

In [6]:
#from eralchemy import render_er
manager = DataManager(data_type="competitions")
data = manager._get_data()
proc_data = manager._process_data(data)
#manager._ingest_data(proc_data, "t_competitions")
print(manager.db.metadata)


MetaData(bind=Engine(postgresql://postgres:***@localhost:2345/football))


In [7]:
###### Get all persons #######


In [8]:
###### Get all teams #######