# Use Pandas for ETL

Now it’s time to write some simples ETL jobs for data analysis. Our scope is to create a fact table (denormalized) in our presentation area.

In this notebook we will discuss which process steps (divided into extract, transform and load) we have to do to clean the source data, aggregate the records and, finally, load our records in our Document Store.

Overview of our ETL steps:

![picture](https://drive.google.com/uc?id=1h60hvtzWmZYHJyuOaONpYiyNLmsQTlje)

 ## Load and extract the source file

First of all we need to load raw data (from CSV files) into our environment.

In [1]:
from google.colab import files

uploaded = files.upload()

Saving ds_project_details_full.csv to ds_project_details_full.csv


In [4]:
for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

User uploaded file "ds_project_details_full.csv" with length 2019128 bytes


Most of our ETL code revolve around using the following functions:
- drop_duplicates
- dropna
- replace / fillna
- df[df['column'] != value]: filtering
- apply: transform, or adding new column
- merge: SQL like inner, left, or right join
- groupby
- read_csv / to_csv

Functions like drop_duplicates and drop_na are nice abstractions and save tens of SQL statements.
And replace / fillna is a typical step that to manipulate the data array.

All these features are available from pandas.


In [8]:
import pandas as pd
import io

In [9]:
ds_project_details_full = pd.read_csv(io.BytesIO(uploaded['ds_project_details_full.csv']))


In [10]:
ds_project_details_full.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4600 entries, 0 to 4599
Data columns (total 24 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   Unnamed: 0            4600 non-null   int64  
 1   bullet_point          1 non-null      object 
 2   category              4600 non-null   object 
 3   category_url          4600 non-null   object 
 4   clickthrough_url      4600 non-null   object 
 5   close_date            4599 non-null   object 
 6   currency              4600 non-null   object 
 7   funds_raised_amount   4600 non-null   int64  
 8   funds_raised_percent  4600 non-null   float64
 9   image_url             4600 non-null   object 
 10  is_indemand           4600 non-null   bool   
 11  is_pre_launch         4600 non-null   bool   
 12  offered_by            0 non-null      float64
 13  open_date             4599 non-null   object 
 14  perk_goal_percentage  1 non-null      float64
 15  perks_claimed        

Data processing is often exploratory.
We need to see the shape of the data, and write our next line of code based on our previous output. So the process is iterative.

One tool that Python + Pandas comes in handy is Jupyter Notebook or Google Colab. 

In [169]:
ds_project_details_full.head()

Unnamed: 0.1,Unnamed: 0,bullet_point,category,category_url,clickthrough_url,close_date,currency,funds_raised_amount,funds_raised_percent,image_url,is_indemand,is_pre_launch,offered_by,open_date,perk_goal_percentage,perks_claimed,price_offered,price_retail,product_stage,project_id,project_type,tagline,tags,title,_id
0,0,,Film,/explore/film,/projects/super-troopers-2,2015-04-24T23:59:59-07:00,USD,4617223,2.081839,https://c1.iggcdn.com/indiegogo-media-prod-cld...,False,False,,2015-03-24T10:00:57-07:00,,,,,,1166581,campaign,"The #SuperTroopers2 campaign is over, but the ...",['other'],Super Troopers 2,1166581
1,1,,Web Series & TV Shows,/explore/web-series-tv-shows,/projects/con-man,2015-04-10T23:59:59-07:00,USD,3156178,7.347459,https://c1.iggcdn.com/indiegogo-media-prod-cld...,False,False,,2015-03-10T14:48:01-07:00,,,,,,1143140,campaign,A new comedy from Alan Tudyk and Nathan Fillio...,['other'],Con Man,1143140
2,2,,Photography,/explore/photography,/projects/the-camera-pack-peter-mckinnon-x-nom...,2019-11-14T23:59:59-08:00,USD,2677592,22.86193,https://c1.iggcdn.com/indiegogo-media-prod-cld...,False,False,,2019-11-13T23:59:59-08:00,,,,,,2558245,campaign,A Functional Camera Pack for all types of trav...,"['backpacks', 'design', 'luggage', 'profession...",The Camera Pack: Peter McKinnon X NOMATIC,2558245
3,3,,Writing & Publishing,/explore/writing-publishing,/projects/the-book--28,2021-01-04T23:59:59-08:00,USD,2596877,293.336625,https://c1.iggcdn.com/indiegogo-media-prod-cld...,True,False,,2021-01-03T23:59:59-08:00,,,,,,2650630,campaign,The Ultimate Guide To Rebuilding A Civilizatio...,"['books', 'burning man', 'design']",The Book,2650630
4,4,,Film,/explore/film,/projects/code-8-a-film-from-robbie-stephen-amell,2016-04-23T23:59:59-07:00,USD,2501972,8.600755,https://c1.iggcdn.com/indiegogo-media-prod-cld...,False,False,,2016-03-22T08:06:31-07:00,,,,,,1676513,campaign,Help Robbie & Stephen Amell make their first f...,['robots'],Code 8 - a film from Robbie & Stephen Amell,1676513


In [17]:
number_of_records = ds_project_details_full.shape[0]
print(f"Number of records loaded {number_of_records}")

Number of records loaded 4600


## Transform

After loading the raw data, let's go do the initial cleaning tasks.

Since we want to upload the data to MongoDB, we should immediately add a unique identifier (_id on MongoDB).

The operations we will do are to create our staging table **st_projects** where:
- we do not allow **duplicates**
- we select only the **necessary columns**
- remove **anomalous records**

In [68]:
# Add the id
ds_project_details_full['_id'] = ds_project_details_full['project_id']

In [69]:
# Remove duplicates
ds_project_no_duplicates = ds_project_details_full.drop_duplicates(subset=['title'])
ds_project_no_duplicates = ds_project_no_duplicates.drop_duplicates(subset=['tagline'])

In [70]:
number_of_records_without = ds_project_no_duplicates.shape[0]
print(f"-- Number of records without duplicates {number_of_records_without}")

-- Number of records without duplicates 4562


In [71]:
ds_project_no_duplicates.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 4562 entries, 0 to 4599
Data columns (total 25 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   Unnamed: 0            4562 non-null   int64  
 1   bullet_point          1 non-null      object 
 2   category              4562 non-null   object 
 3   category_url          4562 non-null   object 
 4   clickthrough_url      4562 non-null   object 
 5   close_date            4561 non-null   object 
 6   currency              4562 non-null   object 
 7   funds_raised_amount   4562 non-null   int64  
 8   funds_raised_percent  4562 non-null   float64
 9   image_url             4562 non-null   object 
 10  is_indemand           4562 non-null   bool   
 11  is_pre_launch         4562 non-null   bool   
 12  offered_by            0 non-null      float64
 13  open_date             4561 non-null   object 
 14  perk_goal_percentage  1 non-null      float64
 15  perks_claimed        

In [84]:
# Select only some features
ds_project_features = ds_project_no_duplicates[['_id', 'project_id', 'title',
                                                'tags', 'tagline', 'open_date', 'funds_raised_amount',
                                                'funds_raised_percent', 'currency', 'close_date', 'category']]

In [85]:
ds_project_features.head()

Unnamed: 0,_id,project_id,title,tags,tagline,open_date,funds_raised_amount,funds_raised_percent,currency,close_date,category
0,1166581,1166581,Super Troopers 2,['other'],"The #SuperTroopers2 campaign is over, but the ...",2015-03-24T10:00:57-07:00,4617223,2.081839,USD,2015-04-24T23:59:59-07:00,Film
1,1143140,1143140,Con Man,['other'],A new comedy from Alan Tudyk and Nathan Fillio...,2015-03-10T14:48:01-07:00,3156178,7.347459,USD,2015-04-10T23:59:59-07:00,Web Series & TV Shows
2,2558245,2558245,The Camera Pack: Peter McKinnon X NOMATIC,"['backpacks', 'design', 'luggage', 'profession...",A Functional Camera Pack for all types of trav...,2019-11-13T23:59:59-08:00,2677592,22.86193,USD,2019-11-14T23:59:59-08:00,Photography
3,2650630,2650630,The Book,"['books', 'burning man', 'design']",The Ultimate Guide To Rebuilding A Civilizatio...,2021-01-03T23:59:59-08:00,2596877,293.336625,USD,2021-01-04T23:59:59-08:00,Writing & Publishing
4,1676513,1676513,Code 8 - a film from Robbie & Stephen Amell,['robots'],Help Robbie & Stephen Amell make their first f...,2016-03-22T08:06:31-07:00,2501972,8.600755,USD,2016-04-23T23:59:59-07:00,Film


In [86]:
# Remove noise
ds_project_cleaned = ds_project_features[(ds_project_features['funds_raised_percent'] > 0) & (ds_project_features['funds_raised_percent'] < 1000)]

In [87]:
number_of_records_without_noise = ds_project_cleaned.shape[0]
print(f"-- Number of records without noise {number_of_records_without_noise}")

-- Number of records without noise 4560


# Load data in MongoDB

Now the records are ready, following a Big Data approach:
- we load the raw, raw data on a table with all the source data (**sc_projects**)
- load the clean data in the staging table **st_projects**

For the connection to MongoDB we will use the **pymongo** library.

In [88]:
!pip install pymongo



In [93]:
import pymongo
import json
from pymongo import UpdateOne

In [90]:
client = pymongo.MongoClient("mongodb://admin123:admin123@cluster0-shard-00-00.250eh.mongodb.net:27017,cluster0-shard-00-01.250eh.mongodb.net:27017,cluster0-shard-00-02.250eh.mongodb.net:27017/myFirstDatabase?ssl=true&replicaSet=atlas-14k1wg-shard-0&authSource=admin&retryWrites=true&w=majority")
db = client.indiegogo

The data on MongoDB is in bson (**binary json**) format.

We convert our dataframe pandas in json and create the list of update or insert on our collection.

In [127]:
records = json.loads(ds_project_details_full.T.to_json()).values()
upserts=[UpdateOne({'_id':x['_id']}, {'$setOnInsert':x}, upsert=True) for x in records]
db.sc_project.bulk_write(upserts)

<pymongo.results.BulkWriteResult at 0x7f7f7fa76550>

In [128]:
records = json.loads(ds_project_cleaned.T.to_json()).values()
upserts=[UpdateOne({'_id':x['_id']}, {'$setOnInsert':x}, upsert=True) for x in records]
db.st_project_cleaned.bulk_write(upserts)

<pymongo.results.BulkWriteResult at 0x7f7f7e1d1e60>

# Extract and load image details

We perform the same work now on the list of concepts extracted with the API from the images,



In [97]:
from google.colab import files
uploaded = files.upload()

Saving ds_img_details_full.csv to ds_img_details_full.csv


In [110]:
ds_img_details_full = pd.read_csv(io.BytesIO(uploaded['ds_img_details_full.csv']))

In [101]:
ds_img_details_full.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16840 entries, 0 to 16839
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Unnamed: 0  16840 non-null  int64  
 1   project_id  16840 non-null  int64  
 2   image       16840 non-null  object 
 3   name        16840 non-null  object 
 4   value       16840 non-null  float64
dtypes: float64(1), int64(2), object(2)
memory usage: 657.9+ KB


In [102]:
number_of_records = ds_img_details_full.shape[0]
print(f"Number of records loaded {number_of_records}")

Number of records loaded 16840


In [103]:
ds_img_details_full.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 16840 entries, 0 to 16839
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Unnamed: 0  16840 non-null  int64  
 1   project_id  16840 non-null  int64  
 2   image       16840 non-null  object 
 3   name        16840 non-null  object 
 4   value       16840 non-null  float64
dtypes: float64(1), int64(2), object(2)
memory usage: 657.9+ KB


In [105]:
ds_img_details_full.head()

Unnamed: 0.1,Unnamed: 0,project_id,image,name,value,_id
0,0,0,img_1.jpg,lid,0.990656,0
1,1,0,img_1.jpg,banner,0.980789,0
2,2,0,img_1.jpg,people,0.977013,0
3,3,0,img_1.jpg,illustration,0.971107,0
4,4,0,img_1.jpg,bill,0.957037,0


In [129]:
records = json.loads(ds_img_details_full.T.to_json()).values()
db.sc_images.insert_many(records)

<pymongo.results.InsertManyResult at 0x7f7f7d5d1230>

In [112]:
# Aggregate data by project_id

In [130]:
ds_images_aggregate = ds_img_details_full.groupby('project_id')['name'].apply(list).reset_index(name='concepts')

In [131]:
ds_images_aggregate.head()

Unnamed: 0,project_id,concepts
0,0,"[lid, banner, people, illustration, bill, man,..."
1,1,"[man, portrait, people, adult, face, actor, de..."
2,2,"[people, one, water, recreation, man, lake, tr..."
3,3,"[illustration, art, people, print, cavalry, te..."
4,4,"[retro, dirty, no person, old, abandoned, rust..."


Document databases also admit complex data types, so we go to load our records, where we have an array of concepts.

In [132]:
ds_images_aggregate.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 842 entries, 0 to 841
Data columns (total 2 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   project_id  842 non-null    int64 
 1   concepts    842 non-null    object
dtypes: int64(1), object(1)
memory usage: 13.3+ KB


In [133]:
ds_images_aggregate['_id'] = ds_images_aggregate['project_id']

In [134]:
records = json.loads(ds_images_aggregate.T.to_json()).values()
upserts=[UpdateOne({'_id':x['_id']}, {'$setOnInsert':x}, upsert=True) for x in records]
db.st_concepts.bulk_write(upserts)

<pymongo.results.BulkWriteResult at 0x7f7f7d87a5f0>

# Data to presentation layer

Let's now build the final fact table: the goal is to create a denormalized table ready for analysis.

In [135]:
client = pymongo.MongoClient("mongodb://admin123:admin123@cluster0-shard-00-00.250eh.mongodb.net:27017,cluster0-shard-00-01.250eh.mongodb.net:27017,cluster0-shard-00-02.250eh.mongodb.net:27017/myFirstDatabase?ssl=true&replicaSet=atlas-14k1wg-shard-0&authSource=admin&retryWrites=true&w=majority")
db = client.indiegogo

In [151]:
st_project_cleaned = db.st_project_cleaned
st_concepts = db.st_concepts

In [152]:
st_project_cleaned.count_documents({})

4560

In [153]:
st_concepts.count_documents({})

842

In [154]:
# Join collections

In [163]:
df_concepts =  pd.DataFrame(list(st_concepts.find({}))).drop(columns=['_id'])
df_projects_cleaned =  pd.DataFrame(list(st_project_cleaned.find({})))


In [164]:
df_projects_cleaned.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 4560 entries, 0 to 4559
Data columns (total 11 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   _id                   4560 non-null   int64  
 1   category              4560 non-null   object 
 2   close_date            4559 non-null   object 
 3   currency              4560 non-null   object 
 4   funds_raised_amount   4560 non-null   int64  
 5   funds_raised_percent  4560 non-null   float64
 6   open_date             4559 non-null   object 
 7   project_id            4560 non-null   int64  
 8   tagline               4559 non-null   object 
 9   tags                  4557 non-null   object 
 10  title                 4560 non-null   object 
dtypes: float64(1), int64(3), object(7)
memory usage: 392.0+ KB


In [165]:
df_ft_projects = df_projects_cleaned.merge(df_concepts, on='project_id', how='left')

In [166]:
df_ft_projects.head()

Unnamed: 0,_id,category,close_date,currency,funds_raised_amount,funds_raised_percent,open_date,project_id,tagline,tags,title,concepts
0,1166581,Film,2015-04-24T23:59:59-07:00,USD,4617223,2.081839,2015-03-24T10:00:57-07:00,1166581,"The #SuperTroopers2 campaign is over, but the ...",['other'],Super Troopers 2,
1,1143140,Web Series & TV Shows,2015-04-10T23:59:59-07:00,USD,3156178,7.347459,2015-03-10T14:48:01-07:00,1143140,A new comedy from Alan Tudyk and Nathan Fillio...,['other'],Con Man,
2,2558245,Photography,2019-11-14T23:59:59-08:00,USD,2677592,22.86193,2019-11-13T23:59:59-08:00,2558245,A Functional Camera Pack for all types of trav...,"['backpacks', 'design', 'luggage', 'profession...",The Camera Pack: Peter McKinnon X NOMATIC,
3,2650630,Writing & Publishing,2021-01-04T23:59:59-08:00,USD,2596877,293.336625,2021-01-03T23:59:59-08:00,2650630,The Ultimate Guide To Rebuilding A Civilizatio...,"['books', 'burning man', 'design']",The Book,
4,1676513,Film,2016-04-23T23:59:59-07:00,USD,2501972,8.600755,2016-03-22T08:06:31-07:00,1676513,Help Robbie & Stephen Amell make their first f...,['robots'],Code 8 - a film from Robbie & Stephen Amell,


In [167]:
df_ft_projects.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 4560 entries, 0 to 4559
Data columns (total 12 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   _id                   4560 non-null   int64  
 1   category              4560 non-null   object 
 2   close_date            4559 non-null   object 
 3   currency              4560 non-null   object 
 4   funds_raised_amount   4560 non-null   int64  
 5   funds_raised_percent  4560 non-null   float64
 6   open_date             4559 non-null   object 
 7   project_id            4560 non-null   int64  
 8   tagline               4559 non-null   object 
 9   tags                  4557 non-null   object 
 10  title                 4560 non-null   object 
 11  concepts              1 non-null      object 
dtypes: float64(1), int64(3), object(8)
memory usage: 463.1+ KB


In [168]:
records = json.loads(df_ft_projects.T.to_json()).values()
upserts=[UpdateOne({'_id':x['_id']}, {'$setOnInsert':x}, upsert=True) for x in records]
db.ft_projects.bulk_write(upserts)

<pymongo.results.BulkWriteResult at 0x7f7f79d12f00>