In [1]:
import requests
import json
import pandas as pd

# Key components of the project:


1. Working with realtime data
2. Fetching the data via an API
3. Working with JSON & nested JSON data
4. Pre-processing the data
5. Working with time series data
6. Using Databases in the ☁ cloud (GCP/AWS)
7. Building Visualizations


## Create Data Pipeline
The idea here is to fetch data from an API, store it in a DataFrame, clean & preprocess it & store the data back to SQL (or NoSQL) database in the cloud (AWS/GCP).

In [None]:
currency = 'usd'
url = 'https://api.coingecko.com/api/v3/coins/markets?vs_currency=' + currency
response = requests.get(url).json()
response

In [3]:
df = pd.DataFrame(response) # load json data into dataframe
df.head()

Unnamed: 0,id,symbol,name,image,current_price,market_cap,market_cap_rank,fully_diluted_valuation,total_volume,high_24h,...,total_supply,max_supply,ath,ath_change_percentage,ath_date,atl,atl_change_percentage,atl_date,roi,last_updated
0,bitcoin,btc,Bitcoin,https://assets.coingecko.com/coins/images/1/la...,38459.0,732572045439,1,808507000000.0,23899790000.0,38653.0,...,21000000.0,21000000.0,69045.0,-44.2386,2021-11-10T14:24:11.849Z,67.81,56677.61435,2013-07-06T00:00:00.000Z,,2022-05-01T16:05:46.808Z
1,ethereum,eth,Ethereum,https://assets.coingecko.com/coins/images/279/...,2802.13,338709637244,2,,14263850000.0,2823.68,...,,,4878.26,-42.42957,2021-11-10T14:24:19.604Z,0.432979,648531.10618,2015-10-20T00:00:00.000Z,"{'times': 96.54322539764654, 'currency': 'btc'...",2022-05-01T16:06:22.647Z
2,tether,usdt,Tether,https://assets.coingecko.com/coins/images/325/...,0.998286,83235076723,3,,55658240000.0,1.008,...,83152880000.0,,1.32,-24.34494,2018-07-24T00:00:00.000Z,0.572521,74.83875,2015-03-02T00:00:00.000Z,,2022-05-01T16:06:58.423Z
3,binancecoin,bnb,BNB,https://assets.coingecko.com/coins/images/825/...,389.03,65472043212,4,65472040000.0,1198038000.0,394.81,...,168137000.0,168137035.9,686.31,-43.26196,2021-05-10T07:24:17.097Z,0.039818,977849.36248,2017-10-19T00:00:00.000Z,,2022-05-01T16:05:03.319Z
4,usd-coin,usdc,USD Coin,https://assets.coingecko.com/coins/images/6319...,0.994195,49300111726,5,,4494754000.0,1.008,...,49251710000.0,,1.17,-14.64331,2019-05-08T00:40:28.300Z,0.891848,12.23698,2021-05-19T13:14:05.611Z,,2022-05-01T16:06:20.852Z


### We create 3 Databases:
1. One, for the original data
2. Second will store link to images
3. Third will have roi as it is in nested JSON form<br>
(Here *id* is the Primary Key)

# Original Database

In [4]:
response[98] # a response with no empty parameter

{'ath': 3.75,
 'ath_change_percentage': -80.06313,
 'ath_date': '2021-11-10T14:59:53.164Z',
 'atl': 0.01963893,
 'atl_change_percentage': 3706.72969,
 'atl_date': '2019-12-18T13:14:41.553Z',
 'circulating_supply': 1245991468.94246,
 'current_price': 0.747521,
 'fully_diluted_valuation': 1027588041,
 'high_24h': 0.758767,
 'id': 'loopring',
 'image': 'https://assets.coingecko.com/coins/images/913/large/LRC.png?1572852344',
 'last_updated': '2022-05-01T16:06:48.812Z',
 'low_24h': 0.67454,
 'market_cap': 931504539,
 'market_cap_change_24h': -12716634.051074147,
 'market_cap_change_percentage_24h': -1.34679,
 'market_cap_rank': 99,
 'max_supply': 1374513896.0,
 'name': 'Loopring',
 'price_change_24h': -0.009472820632,
 'price_change_percentage_24h': -1.25137,
 'roi': {'currency': 'usd',
  'percentage': 1145.8690587791673,
  'times': 11.458690587791674},
 'symbol': 'lrc',
 'total_supply': 1373873439.44246,
 'total_volume': 178979088}

In [5]:
df.head()

Unnamed: 0,id,symbol,name,image,current_price,market_cap,market_cap_rank,fully_diluted_valuation,total_volume,high_24h,...,total_supply,max_supply,ath,ath_change_percentage,ath_date,atl,atl_change_percentage,atl_date,roi,last_updated
0,bitcoin,btc,Bitcoin,https://assets.coingecko.com/coins/images/1/la...,38459.0,732572045439,1,808507000000.0,23899790000.0,38653.0,...,21000000.0,21000000.0,69045.0,-44.2386,2021-11-10T14:24:11.849Z,67.81,56677.61435,2013-07-06T00:00:00.000Z,,2022-05-01T16:05:46.808Z
1,ethereum,eth,Ethereum,https://assets.coingecko.com/coins/images/279/...,2802.13,338709637244,2,,14263850000.0,2823.68,...,,,4878.26,-42.42957,2021-11-10T14:24:19.604Z,0.432979,648531.10618,2015-10-20T00:00:00.000Z,"{'times': 96.54322539764654, 'currency': 'btc'...",2022-05-01T16:06:22.647Z
2,tether,usdt,Tether,https://assets.coingecko.com/coins/images/325/...,0.998286,83235076723,3,,55658240000.0,1.008,...,83152880000.0,,1.32,-24.34494,2018-07-24T00:00:00.000Z,0.572521,74.83875,2015-03-02T00:00:00.000Z,,2022-05-01T16:06:58.423Z
3,binancecoin,bnb,BNB,https://assets.coingecko.com/coins/images/825/...,389.03,65472043212,4,65472040000.0,1198038000.0,394.81,...,168137000.0,168137035.9,686.31,-43.26196,2021-05-10T07:24:17.097Z,0.039818,977849.36248,2017-10-19T00:00:00.000Z,,2022-05-01T16:05:03.319Z
4,usd-coin,usdc,USD Coin,https://assets.coingecko.com/coins/images/6319...,0.994195,49300111726,5,,4494754000.0,1.008,...,49251710000.0,,1.17,-14.64331,2019-05-08T00:40:28.300Z,0.891848,12.23698,2021-05-19T13:14:05.611Z,,2022-05-01T16:06:20.852Z


## Cloud Databases
AWS
* S3 - Data Lake
* Redshift - Data Warehouse
* DynamoDB - NoSQL DB
* RDS - Relational DB

GCP
* BigQuery - Data Warehouse
* Firestore - NoSQL DB
* Cloud SQL - Relational DB


# ROI Database

Here we find that *roi* column is in the form of nested JSON. We have 2 methods to store it in a DB:


1.   We use a RDBMS and create another table(say ***tbl_roi***) for ROI & create a reference to that table
2.   Or, we could keep it in JSON form & store into a NoSQL DB



In [6]:
# convert to datetime
df['ath_date'] = pd.to_datetime(df['ath_date'])
df['atl_date'] = pd.to_datetime(df['atl_date'])
df['last_updated'] = pd.to_datetime(df['last_updated'])

In [7]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 100 entries, 0 to 99
Data columns (total 26 columns):
 #   Column                            Non-Null Count  Dtype              
---  ------                            --------------  -----              
 0   id                                100 non-null    object             
 1   symbol                            100 non-null    object             
 2   name                              100 non-null    object             
 3   image                             100 non-null    object             
 4   current_price                     100 non-null    float64            
 5   market_cap                        100 non-null    int64              
 6   market_cap_rank                   100 non-null    int64              
 7   fully_diluted_valuation           58 non-null     float64            
 8   total_volume                      100 non-null    float64            
 9   high_24h                          100 non-null    float64         

In [8]:
response[98]['roi']

{'currency': 'usd',
 'percentage': 1145.8690587791673,
 'times': 11.458690587791674}

In [9]:
# since this is a nested json, we create another table for it in which id is the foreign key
roi_df = pd.DataFrame()

In [10]:
roi_df['id'] = df['id']
roi_df

Unnamed: 0,id
0,bitcoin
1,ethereum
2,tether
3,binancecoin
4,usd-coin
...,...
95,gatechain-token
96,amp-token
97,neutrino
98,loopring


In [11]:
for resp in response:
  if type(resp['roi']) == dict:
    # assign only those that have data
    roi_df.loc[resp['id'] == roi_df['id'], 'percentage'] = resp['roi']['percentage']
    roi_df.loc[resp['id'] == roi_df['id'], 'currency'] = resp['roi']['currency']
    roi_df.loc[resp['id'] == roi_df['id'], 'times'] = resp['roi']['times']
roi_df

Unnamed: 0,id,percentage,currency,times
0,bitcoin,,,
1,ethereum,9654.322540,btc,96.543225
2,tether,,,
3,binancecoin,,,
4,usd-coin,,,
...,...,...,...,...
95,gatechain-token,,,
96,amp-token,,,
97,neutrino,,,
98,loopring,1145.869059,usd,11.458691


In [12]:
roi_df.dropna(how='any', inplace=True)
roi_df.reset_index(inplace=True)

In [13]:
roi_df.drop(columns='index', inplace=True)
roi_df

Unnamed: 0,id,percentage,currency,times
0,ethereum,9654.32254,btc,96.543225
1,matic-network,42116.52938,usd,421.165294
2,tron,3493.336551,usd,34.933366
3,cosmos,18100.946854,usd,181.009469
4,algorand,-74.895231,usd,-0.748952
5,ethereum-classic,5972.740772,usd,59.727408
6,vechain,500.715502,eth,5.007155
7,theta-token,1461.479857,usd,14.614799
8,decentraland,7505.771495,usd,75.057715
9,tezos,445.235148,usd,4.452351


# Image Database

We'll use *S3 Bucket* to store images<br>
Initial idea was to make an API call to image URL and store it in DynamoDB, but it would be a hindrance while scaling

In [14]:
df['image'] = df['image'].apply(lambda img: img[:-11])

In [15]:
img_df = pd.DataFrame()

In [16]:
# create a df having id & image cols only
img_df['id'] = df['id']
img_df['image_link'] = df['image']
img_df

Unnamed: 0,id,image_link
0,bitcoin,https://assets.coingecko.com/coins/images/1/la...
1,ethereum,https://assets.coingecko.com/coins/images/279/...
2,tether,https://assets.coingecko.com/coins/images/325/...
3,binancecoin,https://assets.coingecko.com/coins/images/825/...
4,usd-coin,https://assets.coingecko.com/coins/images/6319...
...,...,...
95,gatechain-token,https://assets.coingecko.com/coins/images/8183...
96,amp-token,https://assets.coingecko.com/coins/images/1240...
97,neutrino,https://assets.coingecko.com/coins/images/1011...
98,loopring,https://assets.coingecko.com/coins/images/913/...


In [17]:
df.drop(columns='image', inplace=True)

In [18]:
df.columns

Index(['id', 'symbol', 'name', 'current_price', 'market_cap',
       'market_cap_rank', 'fully_diluted_valuation', 'total_volume',
       'high_24h', 'low_24h', 'price_change_24h',
       'price_change_percentage_24h', 'market_cap_change_24h',
       'market_cap_change_percentage_24h', 'circulating_supply',
       'total_supply', 'max_supply', 'ath', 'ath_change_percentage',
       'ath_date', 'atl', 'atl_change_percentage', 'atl_date', 'roi',
       'last_updated'],
      dtype='object')

In [19]:
img_df.head()

Unnamed: 0,id,image_link
0,bitcoin,https://assets.coingecko.com/coins/images/1/la...
1,ethereum,https://assets.coingecko.com/coins/images/279/...
2,tether,https://assets.coingecko.com/coins/images/325/...
3,binancecoin,https://assets.coingecko.com/coins/images/825/...
4,usd-coin,https://assets.coingecko.com/coins/images/6319...


# Upload to AWS

In [21]:
# img_df to rds
img_df

Unnamed: 0,id,image_link
0,bitcoin,https://assets.coingecko.com/coins/images/1/la...
1,ethereum,https://assets.coingecko.com/coins/images/279/...
2,tether,https://assets.coingecko.com/coins/images/325/...
3,binancecoin,https://assets.coingecko.com/coins/images/825/...
4,usd-coin,https://assets.coingecko.com/coins/images/6319...
...,...,...
95,gatechain-token,https://assets.coingecko.com/coins/images/8183...
96,amp-token,https://assets.coingecko.com/coins/images/1240...
97,neutrino,https://assets.coingecko.com/coins/images/1011...
98,loopring,https://assets.coingecko.com/coins/images/913/...


In [32]:
host_name = 'xxxxxxxx.xxxxxxxxxx.ap-south-1.rds.amazonaws.com'
dbname = 'xxxxxxx'
port = 3306
username = 'admin'
password = '********'

In [38]:
import mysql.connector 
from mysql.connector import errorcode
from sqlalchemy import create_engine

cnx = mysql.connector.connect(
        host = host_name,
        user = username,
        password = password)
print(cnx)
cursor = cnx.cursor()
#insert Database Name
db_name = 'mysql-db'

<mysql.connector.connection.MySQLConnection object at 0x7fe88cd1a3d0>


In [None]:
def create_database(cursor, database):
    try:
        cursor.execute(
            "CREATE DATABASE {} DEFAULT CHARACTER SET 'utf8'".format(database))
    except mysql.connector.Error as err:
        print("Failed creating database: {}".format(err))
        exit(1)
try:
    cursor.execute("USE {}".format(db_name))
except mysql.connector.Error as err:
    print("Database {} does not exists.".format(db_name))
    if err.errno == errorcode.ER_BAD_DB_ERROR:
        create_database(cursor, db_name)
        print("Database {} created successfully.".format(db_name))
        cnx.database = db_name
    else:
        print(err)
        exit(1)

In [40]:
engine = create_engine("mysql+mysqlconnector://{user}:{pw}@{host}/{db}".format(user=username, 
                                                                               pw=password,
                                                                               host=host_name,
                                                                               db=db_name))

In [42]:
img_df.to_sql('cvoid19', con=engine, if_exists='append')