In [10]:
from typing import Optional,Dict,Any
#from pydantic import BaseModel
from datetime import datetime
from pydantic_avro.base import AvroBase

PayloadDict = Dict[str, Any]

class BaseOrmModel(AvroBase):
    class Config:
        orm_mode = True

    @property
    def dict_for_model(self) -> PayloadDict:
        return self.dict(by_alias=False)

class Employees(BaseOrmModel):
    id: Optional[int]
    name: str
    datetime: datetime
    job_id: int
    department_id: int


class Departments(BaseOrmModel):
    id: Optional[int]
    department: str


class Jobs(BaseOrmModel):
    id: Optional[int]
    job: str

In [11]:
from sqlalchemy import Column, Integer, String, ForeignKey
from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class DepartmentEntity(Base):
    __tablename__ = "departments"
    id = Column(Integer, primary_key=True, index=True)
    department_id = Column(String, index=True)


class JobEntity(Base):
    __tablename__ = "jobs"
    id = Column(Integer, primary_key=True, index=True)
    job = Column(String, index=True)

class EmployeeEntity(Base):
    __tablename__ = "employees"
    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, index=True)
    datetime = Column(String, index=True)
    department_id = Column(Integer)
    job_id = Column(Integer)


  Base = declarative_base()


In [4]:
from sqlalchemy.orm import Session
from typing import Type, List,TypeVar,Generic
from fastavro import reader, parse_schema
from io import BytesIO
from datetime import date
import boto3
import os

MINIO_ENDPOINT="http://minio:9000"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio1234"

BaseT = TypeVar("BaseT", bound=Base)
BaseOrmModelT = TypeVar("BaseOrmModelT", bound=BaseOrmModel)


BUCKET_NAME="backup"

s3 = boto3.resource("s3",
    endpoint_url=MINIO_ENDPOINT,
    aws_access_key_id=MINIO_ACCESS_KEY,
    aws_secret_access_key=MINIO_SECRET_KEY)


class Connector(Generic[BaseT, BaseOrmModelT]):
    def __init__(self, db_session: Session, entity: Type[BaseT], domain: Type[BaseOrmModelT]):
        self.db_session = db_session
        self.entity = entity
        self.domain = domain
        
    def convert_db_to_domain(self, entity: BaseT) -> BaseOrmModelT:
        return self.domain.from_orm(entity)

    def convert_domain_to_db(self, domain: BaseOrmModelT) -> BaseT:
        return self.entity(**domain.dict_for_model)

    def write(self, obj: BaseOrmModelT):
        db_obj = self.convert_domain_to_db(obj)
        try:
            self.db_session.add(db_obj)
            self.db_session.commit()
        except:
            self.db_session.rollback()
            raise

    def read_all(self) -> List[BaseOrmModelT]:
        db_objs = self.db_session.query(self.entity).all()
        return [self.convert_db_to_domain(db_obj) for db_obj in db_objs]

    def read_by_id(self, id: int) -> BaseOrmModelT:
        db_obj = self.db_session.query(self.entity).filter_by(id=id).first()
        return self.convert_db_to_domain(db_obj) if db_obj else None

    def update(self, obj: BaseOrmModelT):
        db_obj = self.db_session.query(self.entity).filter_by(id=obj.id).first()
        if db_obj:
            try:
                for key, value in obj.dict(exclude_unset=True).items():
                    setattr(db_obj, key, value)
                self.db_session.commit()
            except:
                self.db_session.rollback()
                raise

    def delete(self, id: int):
        db_obj = self.db_session.query(self.entity).filter_by(id=id).first()
        if db_obj:
            try:
                self.db_session.delete(db_obj)
                self.db_session.commit()
            except:
                self.db_session.rollback()
                raise
            
    def delete_all(self, entity: BaseT):
        try:
            self.db_session.query(entity).delete()
            self.db_session.commit()
        except:
            self.db_session.rollback()
            raise

    def restore_from_avro(self, bucket_name: str, date: date, entity: Base, domain: any):
        schema_file_name = f"{entity.__tablename__}.avsc"
        schema_file = s3.Object(bucket_name, schema_file_name)
        schema = parse_schema(schema_file.get()["Body"].read())
        backup_file_name = f"{entity.__tablename__}_{date}.avro"
        backup_file = s3.Object(bucket_name, backup_file_name)
        buffer = BytesIO(backup_file.get()["Body"].read())
        self.delete_all(entity)
        with reader(buffer, parse_schema(schema)) as records:
            for record in records:
                domain_obj = domain(**record)
                self.write(domain_obj)


In [5]:
import os
import pandas as pd
from sqlalchemy import create_engine



In [6]:


local_data_path=os.environ["LOCAL_DATA_PATH"]

def read_csv_file(file_path):
    return pd.read_csv(file_path)

def create_table(engine, entity, domain):
    connector = Connector(engine, entity, domain)
    table_name = entity.__name__
    print(f"Creating {table_name}s...")
    table = read_csv_file(f"{local_data_path}/{table_name}.csv")
    for _, row in table.iterrows():
        record = domain(**row.to_dict())
        print(record)
        connector.write(record)

def create_tables(connector):
    print("Creating Tables...")
    Base.metadata.create_all(bind=connector.db_session.bind)

if __name__ == "__main__":
    user=os.environ["POSTGRES_USER"]
    password=os.environ["POSTGRES_PASSWORD"]
    host=os.environ["POSTGRES_HOST"]
    port=os.environ["POSTGRES_PORT"]
    db=os.environ["POSTGRES_DB"]
    db_uri=f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}"
    engine = create_engine(db_uri)

    print(f"Data to db...")
    create_table(engine, EmployeeEntity, Employee)
    create_table(engine, JobEntity, Job)
    create_table(engine, DepartmentEntity, Department)

    print("Done!")

KeyError: 'LOCAL_DATA_PATH'

In [None]:
from datetime import datetime
from io import BytesIO
from fastavro import writer
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import os
import boto3
from src.connector import Connector
from src.models import Employees, Jobs, Departments
from src.schemas import (Departments as DepartmentsEntity,
                         Employees as EmployeesEntity,
                         Jobs as JobsEntity)
import json

MINIO_ENDPOINT=os.environ["MINIO_ENDPOINT"]
MINIO_ACCESS_KEY=os.environ["MINIO_ACCESS_KEY"]
MINIO_SECRET_KEY=os.environ["MINIO_SECRET_KEY"]
TARGET_BUCKET=os.environ["BACKUP_BUCKET"]
NUM_TRANSFER_THREADS = 50
TRANSFER_VERBOSITY = True
USER = os.environ["POSTGRES_USER"]
PASSWORD = os.environ["POSTGRES_PASSWORD"]
HOST = os.environ["POSTGRES_HOST"]
PORT = os.environ["POSTGRES_PORT"]
DB = os.environ["POSTGRES_DB"]
DB_URI = f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}"
 
    
s3 = boto3.client('s3',
                  aws_access_key_id=MINIO_ACCESS_KEY,
                  aws_secret_access_key=MINIO_SECRET_KEY,
                  endpoint_url=MINIO_ENDPOINT,
                  use_ssl=False)

def backup_to_avro(engine, entity, domain):
    connector = Connector(engine, entity, domain)
    table_name = entity.__name__
    backup_filename = f"{table_name}_{datetime.now().strftime('%Y-%m-%d_%H-%M')}.avro"
    schema_filename = f"{table_name}.avsc"

    data = connector.read_all()
    schema = domain.avro_schema()
    data_buffer = BytesIO()
    writer(data_buffer, schema, data)
    data_buffer.seek(0)

    schema_buffer = BytesIO(json.dumps(schema).encode("utf-8"))
    schema_buffer.seek(0)

    try:
        s3.upload_fileobj(data_buffer, TARGET_BUCKET, backup_filename)
        print(f"Backup of table {table_name} uploaded to MinIO: {TARGET_BUCKET}")

        s3.upload_fileobj(schema_buffer, TARGET_BUCKET, schema_filename)
        print(f"Schema for table {table_name} uploaded to MinIO: {TARGET_BUCKET}")
    except Exception as err:
        print(f"Error uploading backup of table {table_name} to MinIO: {err}")

if __name__ == "__main__":
    print(f"Running backup now: {datetime.now().strftime('%Y-%m-%d_%H-%M')}")

    engine = create_engine(DB_URI)
    Session = sessionmaker(bind=engine)
    session = Session()
    connector = Connector(session, EmployeesEntity, Employees)

    backup_to_avro(session, EmployeesEntity, Employees)
    backup_to_avro(session, JobsEntity, Jobs)
    backup_to_avro(session, DepartmentsEntity, Departments)

    print("Backup process completed.")

In [40]:
POSTGRES_USER="postgres"
POSTGRES_PASSWORD="postgres"
POSTGRES_DB="employees"
POSTGRES_HOST="localhost"
POSTGRES_PORT=5432
LOCAL_DATA_PATH="/app/data"
MINIO_ENDPOINT="http://localhost:9000"
MINIO_ACCESS_KEY="minio"
MINIO_SECRET_KEY="minio1234"
BACKUP_BUCKET="backup"
DB_URI = f"postgresql+psycopg2://{USER}:{PASSWORD}@{HOST}:{PORT}/{DB}"
 

In [8]:
from datetime import datetime
from io import BytesIO
from fastavro import writer
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker
import os
import boto3

In [42]:
engine = create_engine(DB_URI)
Session = sessionmaker(bind=engine)
session = Session()
connector = Connector(session, EmployeeEntity, Employees)

In [22]:
connector.read_all()

[Employees(id=1, name='Stacey Walker', datetime=datetime.datetime(1996, 2, 2, 13, 17, 36, tzinfo=datetime.timezone.utc), job_id=2, department_id=3),
 Employees(id=2, name='Jasmine Sanders', datetime=datetime.datetime(2013, 9, 8, 2, 44, 42, tzinfo=datetime.timezone.utc), job_id=1, department_id=2),
 Employees(id=3, name='Amanda Roberts', datetime=datetime.datetime(1989, 7, 12, 18, 19, 39, tzinfo=datetime.timezone.utc), job_id=1, department_id=1),
 Employees(id=4, name='Sarah Wade', datetime=datetime.datetime(2021, 12, 17, 21, 39, 11, tzinfo=datetime.timezone.utc), job_id=2, department_id=2),
 Employees(id=5, name='Sabrina Moss', datetime=datetime.datetime(1976, 9, 6, 5, 45, 25, tzinfo=datetime.timezone.utc), job_id=1, department_id=3),
 Employees(id=6, name='Andrew Walton', datetime=datetime.datetime(1996, 2, 11, 23, 39, 50, tzinfo=datetime.timezone.utc), job_id=3, department_id=3),
 Employees(id=7, name='Ricky Arnold', datetime=datetime.datetime(2000, 5, 12, 22, 42, 48, tzinfo=datetime

In [29]:

backup_file = s3.Object(bucket_name, backup_file_name)
buffer = BytesIO(backup_file.get()["Body"].read())
self.delete_all(entity)
with reader(buffer, parse_schema(schema)) as records:
    for record in records:
        domain_obj = domain(**record)
        self.write(domain_obj)


NameError: name 'backup_file_name' is not defined

In [43]:
entity=EmployeeEntity
bucket_name=BACKUP_BUCKET
domain=Employees

In [44]:
schema_file_name = f"{domain.__name__}.avsc"
schema_file = s3.Object(bucket_name, schema_file_name)
schema = parse_schema(schema_file.get()["Body"].read())
backup_file_name = f"{domain.__name__}_{date}.avro"

EndpointConnectionError: Could not connect to the endpoint URL: "http://minio:9000/backup/Employees.avsc"

In [17]:
import os
os.listdir("../../")

['README.md',
 '__pycache__',
 'requirements.txt',
 '.gitignore',
 'api-data',
 '.python-version',
 '__init__.py',
 'crontab',
 'Dockerfile',
 'data_generator.py',
 'tests',
 'api',
 'postgres-data',
 'docker-compose.yml',
 'Makefile',
 '.git']

In [24]:
user="postgres"
password="postgres"
host="db"
port=5432
db="employees"
db_uri=f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}"
engine = create_engine(db_uri)

In [25]:
data = pd.read_csv("../../api-data/Employee.csv")

In [20]:
data

Unnamed: 0,id,name,datetime,job_id,department_id
0,1,Michael Kelley,1971-01-31T22:26:49+00:00,2,3
1,2,Marcus Stevenson,2022-07-14T09:13:30+00:00,1,3
2,3,Gary Hernandez,1981-02-20T06:07:51+00:00,1,3
3,4,Candice Nelson,2003-09-06T17:49:40+00:00,3,3
4,5,Anthony Gallagher,1973-07-08T21:37:25+00:00,1,2
...,...,...,...,...,...
995,996,John Jones,1987-04-08T16:51:57+00:00,1,3
996,997,Joel Webster,2014-12-25T13:01:49+00:00,2,2
997,998,Morgan Ramirez,2017-06-28T03:21:21+00:00,2,3
998,999,Linda Hull,1994-01-04T17:17:39+00:00,1,1


In [50]:
connector = Connector(engine, EmployeeModel, Employee)

In [42]:
for _, row in data.iterrows():
    record = Employee(**row.to_dict())

In [43]:
record

Employee(id=1000, name='Jonathan Lee', datetime=datetime.datetime(1982, 6, 7, 12, 37, 39, tzinfo=datetime.timezone.utc), job_id=1, department_id=2)

In [52]:
connector.convert_db_to_domain(connector.convert_domain_to_db(record))

Employee(id=1000, name='Jonathan Lee', datetime=datetime.datetime(1982, 6, 7, 12, 37, 39, tzinfo=datetime.timezone.utc), job_id=1, department_id=2)

In [46]:
EmployeeModel

__main__.EmployeeModel

In [47]:
record.dict_for_model

{'id': 1000,
 'name': 'Jonathan Lee',
 'datetime': datetime.datetime(1982, 6, 7, 12, 37, 39, tzinfo=datetime.timezone.utc),
 'job_id': 1,
 'department_id': 2}

In [45]:
EmployeeModel(**record.dict_for_model)

InvalidRequestError: One or more mappers failed to initialize - can't proceed with initialization of other mappers. Triggering mapper: 'Mapper[DepartmentModel(departments)]'. Original exception was: When initializing mapper Mapper[DepartmentModel(departments)], expression 'Employee' failed to locate a name ('Employee'). If this is a class name, consider adding this relationship() to the <class '__main__.DepartmentModel'> class after both dependent classes have been defined.

In [49]:
EmployeeModel(**record.dict())

<__main__.EmployeeModel at 0x7f06ee714e50>

In [38]:
EmployeeModel

__main__.EmployeeModel