In [1]:
import pandas as pd
import numpy as np
import sqlalchemy

from sqlalchemy_utils.functions import database_exists, create_database
from sqlalchemy.orm import declarative_base
from sqlalchemy import Identity

In [2]:
PASSWORD = "root"
USERNAME = "root"
HOSTNAME = "localhost"
PORT = "3306"
DB_NAME = "data_lake_v2"
DATA_LAKE_FILEPATH = "./data/data_lake.csv"
SQLITE_ENGINE_DRIVER = f"sqlite+pysqlite:///{DB_NAME}"
MYSQL_ENGINE_DRIVER = f"mysql+mysqlconnector://{USERNAME}:{PASSWORD}@{HOSTNAME}:{PORT}/{DB_NAME}"


Creating SQLAlchemy Model

In [3]:
Base = declarative_base()


In [4]:
class PokemonCards(Base):
    __tablename__ = 'pokemon_cards'
    pokemon_card_id = sqlalchemy.Column("pokemon_card_id", sqlalchemy.Integer, primary_key=True)
    name = sqlalchemy.Column("name", sqlalchemy.String(length=70), nullable=False)
    card_number = sqlalchemy.Column("card_number", sqlalchemy.String(length=50), nullable=False)
    price = sqlalchemy.Column("price", sqlalchemy.Float, nullable=False)
    card_type = sqlalchemy.Column("card_type", sqlalchemy.String(200))
    generation = sqlalchemy.Column("generation", sqlalchemy.String(200))

    def __repr__(self):
        return f"(pokemon_card_id: {self.pokemon_card_id}, name:{self.name}, card_number:{self.card_number}, price: {self.price}, card_type: {self.card_type}, generation: {self.generation})"

Handle:
1. Engine Creation
2. Creating DB if not exist
3. Creating Tables within database

In [5]:
engine = sqlalchemy.create_engine(MYSQL_ENGINE_DRIVER, echo=False)

# Create DB if not exist
if not database_exists(engine.url):
    create_database(engine.url)


Base.metadata.drop_all(bind=engine)
Base.metadata.create_all(bind=engine)


Importing data lake and start cleaning:
1. Retrieve all columns in relation to pokemons.
    1. Pokemon | Card Type | Generation | Card Number | Price 
2. Remove all nan values or replace them with valid data if necessary

In [6]:
columns_with_relation_to_pokemons = ['Pokemon', 'Card Number', 'Price', 'Card Type', 'Generation']
table_columns_name = [ 'name', 'card_number', 'price', 'card_type', 'generation']

data_lake = pd.read_csv(DATA_LAKE_FILEPATH)

pokemon_df = data_lake[columns_with_relation_to_pokemons]

  data_lake = pd.read_csv(DATA_LAKE_FILEPATH)


In [7]:
# Cleaning NaN values | lower case strings | drop duplicates
pokemon_df = pokemon_df.dropna().apply(lambda x: x.str.lower() if x.dtype == "object" else x).drop_duplicates()

# Checking for any NaN value
pokemon_df.isna().sum()


Pokemon        0
Card Number    0
Price          0
Card Type      0
Generation     0
dtype: int64

Updating dataframe_column names to match values for our database table:

In [8]:
columns_mapping = dict(zip(columns_with_relation_to_pokemons, table_columns_name))
pokemon_df.rename(columns=columns_mapping, inplace=True)
pokemon_df.index = np.arange(1, len(pokemon_df) + 1)
pokemon_df.index.name = "pokemon_card_id"
pokemon_df

Unnamed: 0_level_0,name,card_number,price,card_type,generation
pokemon_card_id,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1
1,ampharos,001 of 147,2.95,reverse holo,aquapolis
2,ampharos,001 of 147,2.50,standard,aquapolis
3,arcanine,002 of 147,3.95,reverse holo,aquapolis
4,arcanine,002 of 147,2.95,standard,aquapolis
5,ariados,003 of 147,2.50,reverse holo,aquapolis
...,...,...,...,...,...
25426,mega gardevoir ex,112 of 114,7.85,standard,xy - steam siege
25427,pokemon ranger,113 of 114,4.65,standard,xy - steam siege
25428,professor sycamore,114 of 114,6.85,standard,xy - steam siege
25429,volcanion ex,115 of 114,9.85,standard,xy - steam siege


Pushing data to DB

In [9]:
pokemon_df.to_sql(name="pokemon_cards", con=engine, if_exists="replace", index=True)

25430

Queries:
1. Select all from database
2. Select all from database where price within range

In [26]:
from sqlalchemy.orm import Session
from sqlalchemy import select


In [28]:
# Query 1
with Session(engine) as session:
    stmt_one = select(PokemonCards).limit(10)
    results = [x[0] for x in session.execute(stmt_one)]

results

[(pokemon_card_id: 1, name:ampharos, card_number:001 of 147, price: 2.95, card_type: reverse holo, generation: aquapolis),
 (pokemon_card_id: 2, name:ampharos, card_number:001 of 147, price: 2.5, card_type: standard, generation: aquapolis),
 (pokemon_card_id: 3, name:arcanine, card_number:002 of 147, price: 3.95, card_type: reverse holo, generation: aquapolis),
 (pokemon_card_id: 4, name:arcanine, card_number:002 of 147, price: 2.95, card_type: standard, generation: aquapolis),
 (pokemon_card_id: 5, name:ariados, card_number:003 of 147, price: 2.5, card_type: reverse holo, generation: aquapolis),
 (pokemon_card_id: 6, name:ariados, card_number:003 of 147, price: 1.5, card_type: standard, generation: aquapolis),
 (pokemon_card_id: 7, name:azumarill, card_number:004 of 147, price: 2.95, card_type: reverse holo, generation: aquapolis),
 (pokemon_card_id: 8, name:azumarill, card_number:004 of 147, price: 1.95, card_type: standard, generation: aquapolis),
 (pokemon_card_id: 9, name:bellosso

In [38]:
with Session(engine) as session:
    stmt_two = select(PokemonCards).where(PokemonCards.name == "ampharos").limit(10)
    results = [x for x in session.execute(stmt_two)]

results


[((pokemon_card_id: 1, name:ampharos, card_number:001 of 147, price: 2.95, card_type: reverse holo, generation: aquapolis),),
 ((pokemon_card_id: 2, name:ampharos, card_number:001 of 147, price: 2.5, card_type: standard, generation: aquapolis),),
 ((pokemon_card_id: 306, name:ampharos, card_number:h01 of h32, price: 7.95, card_type: standard, generation: aquapolis),),
 ((pokemon_card_id: 1109, name:ampharos, card_number:040 of 124, price: 2.29, card_type: reverse holo, generation: b&w - dragons exalted),),
 ((pokemon_card_id: 1110, name:ampharos, card_number:040 of 124, price: 1.65, card_type: standard, generation: b&w - dragons exalted),),
 ((pokemon_card_id: 3684, name:ampharos, card_number:23 of 95, price: 2.79, card_type: reverse holo, generation: call of legends),),
 ((pokemon_card_id: 3685, name:ampharos, card_number:23 of 95, price: 1.65, card_type: standard, generation: call of legends),),
 ((pokemon_card_id: 4962, name:ampharos, card_number:001 of 101, price: 5.75, card_type: 