ETL is a process in which data is acquired, changed/processed and is loaded into the databases/warehouse. The process is as follows:
1. Extracting data from CSV file
2. Transform/Manipulate data
3. Load data in database

In [5]:
import pandas as pd
import numpy as np

1. Extracting data from a CSV file

In [6]:
# Read data from csv file

crypto_df = pd.read_csv('crypto-markets.csv')

In [7]:
crypto_df.head()

Unnamed: 0,slug,asset,name,date,ranknow,open,high,low,close,volume,market,close_ratio,spread
0,target-coin,TGT,Target Coin,29-09-2017,607,0.028961,0.054766,0.028961,0.041777,69996,0.0,0.4966,0.03
1,target-coin,TGT,Target Coin,30-09-2017,607,0.041783,0.046196,0.031435,0.031744,5725,0.0,0.0209,0.01
2,target-coin,TGT,Target Coin,01-10-2017,607,0.031761,0.035957,0.02104,0.028385,5012,0.0,0.4924,0.01
3,target-coin,TGT,Target Coin,02-10-2017,607,0.028375,0.054595,0.020417,0.022525,8010,0.0,0.0617,0.03
4,target-coin,TGT,Target Coin,03-10-2017,607,0.022527,0.032225,0.020211,0.020359,1787,0.0,0.0123,0.01


2. Transforming data into desired output

In [8]:

assetsCode = ['BTC','ETH','XRP','LTC']

# coverting open, close, high and low price of crypto currencies into GBP values since current price is in Dollars
# if currency belong to this list ['BTC','ETH','XRP','LTC']
crypto_df['open'] = crypto_df[['open', 'asset']].apply(lambda x: (float(x[0]) * 0.75) if x[1] in assetsCode else np.nan, axis=1)
crypto_df['close'] = crypto_df[['close', 'asset']].apply(lambda x: (float(x[0]) * 0.75) if x[1] in assetsCode else np.nan, axis=1)
crypto_df['high'] = crypto_df[['high', 'asset']].apply(lambda x: (float(x[0]) * 0.75) if x[1] in assetsCode else np.nan, axis=1)
crypto_df['low'] = crypto_df[['low', 'asset']].apply(lambda x: (float(x[0]) * 0.75) if x[1] in assetsCode else np.nan, axis=1)


In [9]:
#dropping rowa with null values by asset column

crypto_df.dropna(inplace=True)

In [10]:
#reset dataframe index

crypto_df.reset_index(drop=True,inplace=True)
crypto_df.head()

Unnamed: 0,slug,asset,name,date,ranknow,open,high,low,close,volume,market,close_ratio,spread
0,bitcoin,BTC,Bitcoin,28-04-2013,1,101.475,101.985,99.075,100.6575,0,1500520000.0,0.5438,3.88
1,bitcoin,BTC,Bitcoin,29-04-2013,1,100.83,110.6175,100.5,108.405,0,1491160000.0,0.7813,13.49
2,bitcoin,BTC,Bitcoin,30-04-2013,1,108.0,110.1975,100.5375,104.25,0,1597780000.0,0.3843,12.88
3,bitcoin,BTC,Bitcoin,01-05-2013,1,104.25,104.9175,80.79,87.7425,0,1542820000.0,0.2882,32.17
4,bitcoin,BTC,Bitcoin,02-05-2013,1,87.285,94.2,69.21,78.9075,0,1292190000.0,0.3881,33.32


In [11]:
#drop irrelevant columns

crypto_df.drop(['slug','ranknow','volume','market','spread','close_ratio'],axis=1,inplace=True)
crypto_df.head()

Unnamed: 0,asset,name,date,open,high,low,close
0,BTC,Bitcoin,28-04-2013,101.475,101.985,99.075,100.6575
1,BTC,Bitcoin,29-04-2013,100.83,110.6175,100.5,108.405
2,BTC,Bitcoin,30-04-2013,108.0,110.1975,100.5375,104.25
3,BTC,Bitcoin,01-05-2013,104.25,104.9175,80.79,87.7425
4,BTC,Bitcoin,02-05-2013,87.285,94.2,69.21,78.9075


In [12]:
import sqlite3

3. Loading data into the database

In [13]:
import sqlite3

# import cx_Oracle 'username/password@hostname:port/service_name'
# connect function opens a connection to the SQLite database file, 

conn = sqlite3.connect('session.db')

#Similarly we will make connection with other databases like Oracle, DB2 etc.

print(conn)

<sqlite3.Connection object at 0x7f8659679a80>


In [14]:
# Drop a table name Crypto if it exists already

try:
    conn.execute('DROP TABLE IF EXISTS `Crypto` ')
except Exception as e:
    raise(e)
finally:
    print('Table dropped')

Table dropped


In [15]:
#Create new table

try:
    conn.execute('''
         CREATE TABLE Crypto
         (ID         INTEGER PRIMARY KEY,
         ASSET       TEXT    NOT NULL,
         NAME        TEXT    NOT NULL,
         Date        datetime,
         Open        Float DEFAULT 0,
         High        Float DEFAULT 0,
         Low         Float DEFAULT 0,
         Close       Float DEFAULT 0);''')
    print('Table created successfully')
except Exception as e:
    print(str(e))
    print('Data Insertion Failed')
finally:
        # finally block will help with always closing the connection to DB even in case of error.
    conn.close()

Table created successfully


In [16]:
crypto_list = crypto_df.values.tolist()

# lets make new connection to Insert crypto data in SQL DB
conn = sqlite3.connect('session.db')

# make a cursor - it will help with querying SQL DB
cur = conn.cursor()

try:
    cur.executemany("INSERT INTO Crypto(ASSET, NAME, Date, Open, High, Low, Close) VALUES (?,?,?,?,?,?,?)", crypto_list)
    conn.commit()
    print('Data Inserted Successfully')
except Exception as e:
    print(str(e))
    print('Data Insertion Failed')
finally:
    # finally block will help with always closing the connection to DB even in case of error.
    conn.close()

Data Inserted Successfully


In [17]:
# Let's Read data from Database to verify it

conn = sqlite3.connect('session.db')
rows = conn.cursor().execute('Select * from Crypto')
# print(rows[:2])
for row in rows:
    print(row)
conn.close()

(1, 'BTC', 'Bitcoin', '28-04-2013', 101.47500000000001, 101.98499999999999, 99.07499999999999, 100.6575)
(2, 'BTC', 'Bitcoin', '29-04-2013', 100.83, 110.6175, 100.5, 108.405)
(3, 'BTC', 'Bitcoin', '30-04-2013', 108.0, 110.1975, 100.53750000000001, 104.25)
(4, 'BTC', 'Bitcoin', '01-05-2013', 104.25, 104.91749999999999, 80.78999999999999, 87.74249999999999)
(5, 'BTC', 'Bitcoin', '02-05-2013', 87.285, 94.19999999999999, 69.21000000000001, 78.9075)
(6, 'BTC', 'Bitcoin', '03-05-2013', 79.6875, 81.0975, 59.324999999999996, 73.3125)
(7, 'BTC', 'Bitcoin', '04-05-2013', 73.57499999999999, 86.25, 69.375, 84.375)
(8, 'BTC', 'Bitcoin', '05-05-2013', 84.67500000000001, 89.1, 80.355, 86.9325)
(9, 'BTC', 'Bitcoin', '06-05-2013', 86.985, 93.495, 79.98, 84.225)
(10, 'BTC', 'Bitcoin', '07-05-2013', 84.1875, 85.08, 73.275, 83.625)
(11, 'BTC', 'Bitcoin', '08-05-2013', 82.19999999999999, 86.83500000000001, 82.19999999999999, 85.1775)
(12, 'BTC', 'Bitcoin', '09-05-2013', 84.9, 85.095, 81.94500000000001, 84.