# API ETL with Python + SQL Alchemy into MS SQL Server and MongoDB using PyMongo

In [None]:
# increase cell width
from IPython.display import display, HTML
display(HTML("<style>.container { width:70% !important; }</style>"))

In [1]:
# import packages needed for ETL, PyMongo, and SQLAlchemy
import requests
import pandas as pd
import pymongo
import plotly
import json
import config.py
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.sql import text
import pprint
import pymssql
import pyodbc
import itertools

### Retrieve List of Response Objects and convert to JSON

In [455]:
# try looping through multiple pages WORKING!
api_data_test = []

for page in range(1, 101):
    url = "https://api.rawg.io/api/games?key=config.api_key&page=%s"%page
    data = requests.get(url)
    api_data_test.append(data)
    
# create test list object
test_list =[]

# convert api data into json list
for i in api_data_test:
    test_list.append(i.json())

### Work through Iteration through list of JSONs

In [470]:
# iterate through the list of JSON objects
test_df = pd.DataFrame()

for game_list in test_list:
    for i in game_list["results"]:
        test_dict = {"id":i["id"], "name":i["name"],"released":i["released"],"rating":i["rating"]\
,"platforms":len(i["platforms"]),"background_image":i["background_image"],"genres":len(i["genres"]),"tags":len(i["tags"])\
,"rating_top":i["rating_top"],"ratings_count":i["ratings_count"],"metacritic":i["metacritic"]\
,"owned":i["added_by_status"]["owned"]\
                     ,"yet":i["added_by_status"]["yet"],"dropped":i["added_by_status"]["dropped"]\
,"multiplayer":[x["name"] for x in i["tags"] if x["id"] == 7], "singleplayer":[x["name"] for x in i["tags"] if x["id"] == 31]}
        test_df = test_df.append(test_dict, ignore_index=True)
        
# convert columns run once only!
test_df["id"] = test_df["id"].astype("int")
test_df["genres"] = test_df["genres"].astype("int")
test_df["owned"] = test_df["owned"].astype("int")
test_df["tags"] = test_df["tags"].astype("int")
test_df["platforms"] = test_df["platforms"].astype("int")
test_df["ratings_count"] = test_df["ratings_count"].astype("int")
test_df["dropped"] = test_df["dropped"].astype("int")
test_df["yet"] = test_df["yet"].astype("int")
test_df['singleplayer'] = [','.join(map(str, x)) for x in test_df['singleplayer']]
test_df['multiplayer'] = [','.join(map(str, x)) for x in test_df['multiplayer']]

In [475]:
# reorder columns of df
test_df = test_df.reindex(["id","name","released","rating","ratings_count","rating_top","metacritic", "platforms","genres"
                     ,"tags","owned","yet","dropped","singleplayer", "multiplayer","background_image",], axis=1)

In [444]:
# export df to CSV file
test_df.to_csv("game_df.csv",index=False)

In [478]:
# create copy df
game_df = test_df.copy()

### Link to next API call, create ID List Object

In [479]:
# create list object and sort results
id_list = list(test_df["id"])
id_list.sort()

### Iterate through all IDs and append as JSON

In [486]:
# Retry looping through all ids in id_list
achievements = []

for id in id_list:
    url = f"https://api.rawg.io/api/games/{id}/achievements?key=31bc8ad2fb294e8b94b17755819a41ea"
    data = requests.get(url)
    achievements.append(data)
    
# create JSON objects from response objects
achievement_placeholder = []

for i in achievements:
    achievement_placeholder.append(i.json())

### Create Achievements DF

In [493]:
# create list object from achievement_placeholder
a_list = []

for i in achievement_placeholder:
    a_list.append(i["count"])
    
# create dict object to use for dataframe
ach_dict = {"id":id_list,"achievements":a_list}

# create df object
achievements_df = pd.DataFrame(ach_dict, columns=["id","achievements"])

### Creating Cursor Connection to MSSQL Server using pyodbc

In [231]:
# create variables for connection string
user = "JTOLEDO"
host = "W10JTOLET560B\SQLEXPRESS"
port = 1433
db = "CS779_TermProject"

# create connection object
conn = pyodbc.connect('Driver={SQL Server};'
                      'Server=W10JTOLET560B\SQLEXPRESS;'
                      'Database=CS779_TermProject;'
                      'Trusted_Connection=yes;')

In [233]:
# create cursor object
cursor = conn.cursor()

In [235]:
# iterate through cursor to check results
for i in cursor:
    print(i)

('5', )


## Create Engine to MSSQL with SQLAlchemy

#### This was a difficult task creating the correct create_engine object from SQLAlchemy. It took a couple of hours and a lot of documentation
#### scouring and experimentation to get the local SQL Server connection to work.

In [4]:
# create variables
user = "jtoledo"
host = "W10JTOLET560B\SQLEXPRESS"
db = "CS779_TermProject"

# create engine object
engine = create_engine('mssql+pyodbc://'+host+'/'+db+'?trusted_connection=yes&driver=SQL+Server')

# execute use DB statement
engine.execute("USE CS779_TermProject;")

<sqlalchemy.engine.result.ResultProxy at 0x1de11563be0>

## Embedded SQL Approach to querying and appending results to dataframe

In [672]:
# create empty dataframe object
my_df = pd.DataFrame()

# create query text object
query_text = text("""
                SELECT * FROM TestTable
                """)

# execute query text
qry = engine.execute(query_text)

# create results object as a dataframe
results = pd.DataFrame(qry.fetchall())

# cast keys as strings to use as the columns using list comprehension
results.columns = [str(x) for x in qry.keys()]

# append temporary df to existing
my_df = my_df.append(results)

# Embedded SQL Approach to Creating Table

In [565]:
# create query text object
query_text = text("""
                CREATE TABLE JT_Test_Table(
                    id NUMERIC(12)
                    , first_name VARCHAR(32)
                    , last_name VARCHAR(32)
                    , email_address VARCHAR(64)
                )
                """)

# execute query text
qry = engine.execute(query_text)

In [566]:
# check results
query_text = text("""
            SELECT *
            FROM CS779_TermProject.INFORMATION_SCHEMA.TABLES;
                """)

# execute query text
qry = engine.execute(query_text)

# interate through qry object
for i in qry:
    print(i)

('CS779_TermProject', 'dbo', 'TestTable', 'BASE TABLE')
('CS779_TermProject', 'dbo', 'Testing', 'BASE TABLE')
('CS779_TermProject', 'dbo', 'JT_Test_Table', 'BASE TABLE')


# Native SQLAlchemy Class Object Create Table

In [567]:
# import data types and create test table
from sqlalchemy import Column, Integer, String
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
Base = declarative_base()

# create class object with the column names
class Testing(Base):
   __tablename__ = 'Testing2'
   id = Column(Integer, primary_key=True)
   name = Column(String)
   address = Column(String)
   email = Column(String)

# call Base object with .create_all and use SQL engine to execute
Base.metadata.create_all(engine)

In [674]:
# check results of the create table by looking at information_schema
query_text = text("""
            SELECT *
            FROM CS779_TermProject.INFORMATION_SCHEMA.TABLES;
                """)

# execute query text
qry = engine.execute(query_text)

# interate through qry object
for i in qry:
    print(i)

('CS779_TermProject', 'dbo', 'TestTable', 'BASE TABLE')
('CS779_TermProject', 'dbo', 'Testing', 'BASE TABLE')
('CS779_TermProject', 'dbo', 'JT_Test_Table', 'BASE TABLE')
('CS779_TermProject', 'dbo', 'Testing2', 'BASE TABLE')


# Send Each DF to SQL Server Through SQLAlchemy

In [497]:
# send game_df to SQL Server
game_df.to_sql("games",engine, if_exists="replace", index=False
               , dtype={"id": sqlalchemy.types.INTEGER()
                        , "name": sqlalchemy.types.VARCHAR(100)
                        , "released":sqlalchemy.types.DATE
                       }
                      )

In [499]:
# send achievements df to SQL Server super fast
achievements_df.to_sql("achievements",engine, if_exists="replace", index=False
               , dtype={"id": sqlalchemy.types.INTEGER()
                        , "achievements": sqlalchemy.types.INTEGER()
                       }
                      )

# PyMongo Mongo DB Connection

In [503]:
# create MongoDB connection
myclient = pymongo.MongoClient("mongodb+srv://<credentials>@cluster0.pf9l3gz.mongodb.net/test")

# create db_list object
db_list = myclient.list_database_names()

In [507]:
# check db names
for db in db_list:
    print(db)

cs779_db
mongodb-assignment
admin
local


In [545]:
# create db object for term project
cs779_db = myclient["cs779_db"]

# list collection names
cs779_db.list_collection_names()

# use collection for term project
term_project_col = cs779_db.term_project

## Create new collection and insert game result raw JSON

In [559]:
# create new collection with Pymongo - create collection for games data
game_collection = cs779_db["game_collection"]

# create new list object
new_list = test_list

## Insert full game list to MongoDB collection

In [586]:
# insert all objects in nested loop
for json_obj in new_list:
    for i in json_obj["results"]:
        game_collection.insert_one(i)

### Insert achievement collection

In [627]:
# create another collection for achievements correction
achieve_collection3 =cs779_db["achievements3"]

In [636]:
# add try-except block, iterate through 2 lists at the same time

for (id, json_obj) in itertools.zip_longest(id_list, achievement_placeholder):
    # use id_list to get game value
    try:

        # extract count
        achievements = int(json_obj["count"])

        # insert game_id, achievements into new collection
        achieve_collection3.insert_one({"game_id":id, "achievements":achievements})

    # ignore achievements with invalid records
    except:
        continue


# Native PyMongo Queries

In [None]:
# basic find query
game_collection.find_one({"name":"Grand Theft Auto V"})

In [None]:
# filter operation
filter_search1 = game_collection.find({"released":{"$gte":"2021-01-01"}})

In [None]:
# filter operation with more than one filter
project_search1 = game_collection.find({
                                        "$and":[{
                                            "released":{"$gte":"2022-01-01"}
                                        }, 
                                        {"rating":{"$lte":  4.0}
                                         }]
                                        })

In [None]:
# create dataframe of games with  release date and rating filters
filter_df = pd.DataFrame(columns=["name","released"])
name_list=[]
released_list=[]

for result in project_search1:
    # append lists    
    name_list.append(result["name"])
    released_list.append(result["released"])

In [None]:
# create dataframe from lists using zip function
filter_df = pd.DataFrame(list(zip(name_list,released_list)),columns=["name","released"])

In [None]:
# inspect new dataframe
filter_df.head()

In [None]:
# get shape of df
filter_df.shape