# Use Pandas for ETL

Now it’s time to write some simples ETL jobs for data analysis. Our scope is to create a fact table (denormalized) in our presentation area.

In this notebook we will discuss which process steps (divided into extract, transform and load) we have to do to clean the source data, aggregate the records and, finally, load our records in our Document Store.

Overview of our ETL steps:

![picture](https://drive.google.com/uc?id=1h60hvtzWmZYHJyuOaONpYiyNLmsQTlje)

 ## Load and extract the source file

First of all we need to load raw data (from CSV files) into our environment.

In [1]:
from google.colab import files

uploaded = files.upload()

Saving 1_ds_project_details_full.csv to 1_ds_project_details_full.csv


In [2]:
for fn in uploaded.keys():
  print('User uploaded file "{name}" with length {length} bytes'.format(
      name=fn, length=len(uploaded[fn])))

User uploaded file "1_ds_project_details_full.csv" with length 4400323 bytes


Most of our ETL code revolve around using the following functions:
- drop_duplicates
- dropna
- replace / fillna
- df[df['column'] != value]: filtering
- apply: transform, or adding new column
- merge: SQL like inner, left, or right join
- groupby
- read_csv / to_csv

Functions like drop_duplicates and drop_na are nice abstractions and save tens of SQL statements.
And replace / fillna is a typical step that to manipulate the data array.

All these features are available from pandas.


In [3]:
import pandas as pd
import io

In [5]:
ds_project_details_full = pd.read_csv('/content/ds_project_details_full.csv')
# pd.read_sql("select campo, count(*) from tabella group by campo")

In [None]:
# ds_project_details_full = pd.read_csv(io.BytesIO(uploaded['ds_project_details_full.csv']))


In [6]:
ds_project_details_full.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9900 entries, 0 to 9899
Data columns (total 25 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   Unnamed: 0            9900 non-null   int64  
 1   bullet_point          1 non-null      object 
 2   category              9900 non-null   object 
 3   category_url          9900 non-null   object 
 4   clickthrough_url      9900 non-null   object 
 5   close_date            9899 non-null   object 
 6   currency              9900 non-null   object 
 7   funds_raised_amount   9900 non-null   int64  
 8   funds_raised_percent  9900 non-null   float64
 9   image_url             9900 non-null   object 
 10  is_indemand           9900 non-null   bool   
 11  is_pre_launch         9900 non-null   bool   
 12  is_proven             9900 non-null   bool   
 13  offered_by            0 non-null      float64
 14  open_date             9899 non-null   object 
 15  perk_goal_percentage 

Data processing is often exploratory.
We need to see the shape of the data, and write our next line of code based on our previous output. So the process is iterative.

One tool that Python + Pandas comes in handy is Jupyter Notebook or Google Colab. 

In [7]:
ds_project_details_full.head()

Unnamed: 0.1,Unnamed: 0,bullet_point,category,category_url,clickthrough_url,close_date,currency,funds_raised_amount,funds_raised_percent,image_url,...,perk_goal_percentage,perks_claimed,price_offered,price_retail,product_stage,project_id,project_type,tagline,tags,title
0,0,,Video Games,/explore/video-games,/projects/odin-the-ultimate-gaming-handheld,2021-10-03T23:59:59-07:00,HKD,29500582,49.71537,https://c1.iggcdn.com/indiegogo-media-prod-cld...,...,,,,,,2685187,campaign,"Flagship gaming handheld. FHD 1080p 6"" touch s...","['computers', 'pc', 'laptops']",Odin: The Ultimate Gaming Handheld
1,1,,Video Games,/explore/video-games,/projects/g-case-all-in-one-gaming-case-for-sw...,2022-03-11T23:59:59-08:00,HKD,5335377,30.820762,https://c1.iggcdn.com/indiegogo-media-prod-cld...,...,,,,,,2739227,campaign,Modular Battery | Interchangeable Grips | Deta...,"['bluetooth', 'batteries', 'design']",G-Case: All-In-One Gaming Case for Switch & OLED
2,2,,Film,/explore/film,/projects/super-troopers-2,2015-04-24T23:59:59-07:00,USD,4617223,2.081839,https://c1.iggcdn.com/indiegogo-media-prod-cld...,...,,,,,,1166581,campaign,"The #SuperTroopers2 campaign is over, but the ...",['other'],Super Troopers 2
3,3,,Web Series & TV Shows,/explore/web-series-tv-shows,/projects/con-man,2015-04-10T23:59:59-07:00,USD,3156178,7.347459,https://c1.iggcdn.com/indiegogo-media-prod-cld...,...,,,,,,1143140,campaign,A new comedy from Alan Tudyk and Nathan Fillio...,['other'],Con Man
4,4,,Art,/explore/art,/projects/artbook-that-photographed-gods-who-d...,2022-02-18T23:59:59-08:00,JPY,3114937,3.082077,https://c1.iggcdn.com/indiegogo-media-prod-cld...,...,,,,,,2735280,campaign,This concept is coming from teaching of Shinto...,"['books', 'design', 'other', 'professional']",ArtBook that photographed Gods who dwell in na...


In [8]:
number_of_records = ds_project_details_full.shape[0]
print(f"Number of records loaded {number_of_records}")

Number of records loaded 9900


## Transform

After loading the raw data, let's go do the initial cleaning tasks.

Since we want to upload the data to MongoDB, we should immediately add a unique identifier (_id on MongoDB).

The operations we will do are to create our staging table **st_projects** where:
- we do not allow **duplicates**
- we select only the **necessary columns**
- remove **anomalous records**

In [9]:
# Add the id
ds_project_details_full['_id'] = ds_project_details_full['project_id']

In [10]:
# Remove duplicates
ds_project_no_duplicates = ds_project_details_full.drop_duplicates(subset=['title'])
ds_project_no_duplicates = ds_project_no_duplicates.drop_duplicates(subset=['tagline'])

In [11]:
number_of_records_without = ds_project_no_duplicates.shape[0]
print(f"-- Number of records without duplicates {number_of_records_without}")

-- Number of records without duplicates 9809


In [12]:
ds_project_no_duplicates.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 9809 entries, 0 to 9899
Data columns (total 26 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   Unnamed: 0            9809 non-null   int64  
 1   bullet_point          1 non-null      object 
 2   category              9809 non-null   object 
 3   category_url          9809 non-null   object 
 4   clickthrough_url      9809 non-null   object 
 5   close_date            9808 non-null   object 
 6   currency              9809 non-null   object 
 7   funds_raised_amount   9809 non-null   int64  
 8   funds_raised_percent  9809 non-null   float64
 9   image_url             9809 non-null   object 
 10  is_indemand           9809 non-null   bool   
 11  is_pre_launch         9809 non-null   bool   
 12  is_proven             9809 non-null   bool   
 13  offered_by            0 non-null      float64
 14  open_date             9808 non-null   object 
 15  perk_goal_percentage 

In [13]:
# Select only some features
ds_project_features = ds_project_no_duplicates[['_id', 'project_id', 'title',
                                                'tags', 'tagline', 'open_date', 'funds_raised_amount',
                                                'funds_raised_percent', 'currency', 'close_date', 'category']]

In [14]:
ds_project_features.head()

Unnamed: 0,_id,project_id,title,tags,tagline,open_date,funds_raised_amount,funds_raised_percent,currency,close_date,category
0,2685187,2685187,Odin: The Ultimate Gaming Handheld,"['computers', 'pc', 'laptops']","Flagship gaming handheld. FHD 1080p 6"" touch s...",2021-08-19T00:00:00-07:00,29500582,49.71537,HKD,2021-10-03T23:59:59-07:00,Video Games
1,2739227,2739227,G-Case: All-In-One Gaming Case for Switch & OLED,"['bluetooth', 'batteries', 'design']",Modular Battery | Interchangeable Grips | Deta...,2022-03-10T23:59:59-08:00,5335377,30.820762,HKD,2022-03-11T23:59:59-08:00,Video Games
2,1166581,1166581,Super Troopers 2,['other'],"The #SuperTroopers2 campaign is over, but the ...",2015-03-24T10:00:57-07:00,4617223,2.081839,USD,2015-04-24T23:59:59-07:00,Film
3,1143140,1143140,Con Man,['other'],A new comedy from Alan Tudyk and Nathan Fillio...,2015-03-10T14:48:01-07:00,3156178,7.347459,USD,2015-04-10T23:59:59-07:00,Web Series & TV Shows
4,2735280,2735280,ArtBook that photographed Gods who dwell in na...,"['books', 'design', 'other', 'professional']",This concept is coming from teaching of Shinto...,2022-02-17T23:59:59-08:00,3114937,3.082077,JPY,2022-02-18T23:59:59-08:00,Art


In [15]:
# Remove noise
ds_project_cleaned = ds_project_features[(ds_project_features['funds_raised_percent'] > 0) & (ds_project_features['funds_raised_percent'] < 1000)]

In [16]:
# Remove null values in title
ds_project_cleaned = ds_project_cleaned[ds_project_cleaned.tagline.notnull()]

In [17]:
number_of_records_without_noise = ds_project_cleaned.shape[0]
print(f"-- Number of records without noise {number_of_records_without_noise}")

-- Number of records without noise 9805


# Load data in MongoDB

Now the records are ready, following a Big Data approach:
- we load the raw, raw data on a table with all the source data (**sc_projects**)
- load the clean data in the staging table **st_projects**

For the connection to MongoDB we will use the **pymongo** library.

In [18]:
!pip install pymongo

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


In [19]:
import pymongo
import json
from pymongo import UpdateOne

In [25]:
client = pymongo.MongoClient("mongodb://xxxxx:xxxx@xxxxx:27017,xxxxx:27017,xxxx:27017/myFirstDatabase?ssl=true&replicaSet=atlas-14k1wg-shard-0&authSource=admin&retryWrites=true&w=majority")
db = client.indiegogo

The data on MongoDB is in bson (**binary json**) format.

We convert our dataframe pandas in json and create the list of update or insert on our collection.

In [26]:
records = json.loads(ds_project_details_full.T.to_json()).values()
upserts=[UpdateOne({'_id':x['_id']}, {'$setOnInsert':x}, upsert=True) for x in records]
db.sc_project.bulk_write(upserts)

<pymongo.results.BulkWriteResult at 0x7fa4fe1c91d0>

In [27]:
records = json.loads(ds_project_cleaned.T.to_json()).values()
upserts=[UpdateOne({'_id':x['_id']}, {'$setOnInsert':x}, upsert=True) for x in records]
db.st_project_cleaned.bulk_write(upserts)

<pymongo.results.BulkWriteResult at 0x7fa4fe1be510>

# Extract and load image details

We perform the same work now on the list of concepts extracted with the API from the images,



In [None]:
from google.colab import files
uploaded = files.upload()

Saving ds_img_details_full.csv to ds_img_details_full.csv


In [33]:
#ds_img_details_full = pd.read_csv(io.BytesIO(uploaded['ds_img_details_full.csv']))
ds_img_details_full = pd.read_csv('/content/ds_img_details_full.csv')

In [34]:
ds_img_details_full.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2320 entries, 0 to 2319
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Unnamed: 0  2320 non-null   int64  
 1   project_id  2320 non-null   object 
 2   image       2320 non-null   object 
 3   name        2320 non-null   object 
 4   value       2320 non-null   float64
dtypes: float64(1), int64(1), object(3)
memory usage: 90.8+ KB


In [35]:
number_of_records = ds_img_details_full.shape[0]
print(f"Number of records loaded {number_of_records}")

Number of records loaded 2320


In [36]:
ds_img_details_full.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 2320 entries, 0 to 2319
Data columns (total 5 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Unnamed: 0  2320 non-null   int64  
 1   project_id  2320 non-null   object 
 2   image       2320 non-null   object 
 3   name        2320 non-null   object 
 4   value       2320 non-null   float64
dtypes: float64(1), int64(1), object(3)
memory usage: 90.8+ KB


In [37]:
ds_img_details_full.head()

Unnamed: 0.1,Unnamed: 0,project_id,image,name,value
0,0,https://www.indiegogo.com/projects/aawireless/...,img_1.jpg,technology,0.985519
1,1,https://www.indiegogo.com/projects/aawireless/...,img_1.jpg,electronics,0.956601
2,2,https://www.indiegogo.com/projects/aawireless/...,img_1.jpg,equipment,0.951058
3,3,https://www.indiegogo.com/projects/aawireless/...,img_1.jpg,internet,0.946486
4,4,https://www.indiegogo.com/projects/aawireless/...,img_1.jpg,screen,0.935122


In [43]:
records = json.loads(ds_img_details_full.T.to_json()).values()
db.sc_images.insert_many(records)

<pymongo.results.InsertManyResult at 0x7fa4fe4d5ad0>

In [45]:
ds_img_details_full['concepts'] = ds_img_details_full. \
  apply(lambda row: {'name': row['name'], 'value': row['value']}, axis=1)

In [46]:
ds_img_details_full.head()

Unnamed: 0.1,Unnamed: 0,project_id,image,name,value,concepts
0,0,https://www.indiegogo.com/projects/aawireless/...,img_1.jpg,technology,0.985519,"{'name': 'technology', 'value': 0.985519289970..."
1,1,https://www.indiegogo.com/projects/aawireless/...,img_1.jpg,electronics,0.956601,"{'name': 'electronics', 'value': 0.95660084486..."
2,2,https://www.indiegogo.com/projects/aawireless/...,img_1.jpg,equipment,0.951058,"{'name': 'equipment', 'value': 0.9510580897331..."
3,3,https://www.indiegogo.com/projects/aawireless/...,img_1.jpg,internet,0.946486,"{'name': 'internet', 'value': 0.9464861154556274}"
4,4,https://www.indiegogo.com/projects/aawireless/...,img_1.jpg,screen,0.935122,"{'name': 'screen', 'value': 0.9351216554641724}"


In [47]:
ds_images_aggregate = ds_img_details_full.groupby('project_id')['concepts'].apply(list).reset_index(name="concepts")

In [48]:
ds_images_aggregate.head()

Unnamed: 0,project_id,concepts
0,https://www.indiegogo.com/projects/2x2-ultra-b...,"[{'name': 'wheel', 'value': 0.9952050447463988..."
1,https://www.indiegogo.com/projects/aaron-lopre...,"[{'name': 'Halloween', 'value': 0.997346282005..."
2,https://www.indiegogo.com/projects/aawireless/...,"[{'name': 'technology', 'value': 0.98551928997..."
3,https://www.indiegogo.com/projects/abney-park-...,"[{'name': 'music', 'value': 0.9968480467796326..."
4,https://www.indiegogo.com/projects/afreda-s6-a...,"[{'name': 'bike', 'value': 0.9995869994163512}..."


Document databases also admit complex data types, so we go to load our records, where we have an array of concepts.

In [49]:
ds_images_aggregate.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 108 entries, 0 to 107
Data columns (total 2 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   project_id  108 non-null    object
 1   concepts    108 non-null    object
dtypes: object(2)
memory usage: 1.8+ KB


In [50]:
ds_images_aggregate['_id'] = ds_images_aggregate['project_id']

In [52]:
records = json.loads(ds_images_aggregate.T.to_json()).values()
upserts=[UpdateOne({'_id':x['_id']}, {'$setOnInsert':x}, upsert=True) for x in records]
db.st_concepts.bulk_write(upserts)

<pymongo.results.BulkWriteResult at 0x7fa4facf4a50>

# Location data

In [62]:
ds_location = pd.read_csv('/content/ds_project_location_full.csv')

In [64]:
number_of_records = ds_location.shape[0]
print(f"Number of records loaded {number_of_records}")

Number of records loaded 187


In [65]:
records = json.loads(ds_location.T.to_json()).values()
db.sc_location.insert_many(records)

<pymongo.results.InsertManyResult at 0x7fa4fa99bd10>

In [66]:
ds_location.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 187 entries, 0 to 186
Data columns (total 4 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   Unnamed: 0  187 non-null    int64  
 1   project_id  187 non-null    int64  
 2   lat         187 non-null    float64
 3   lng         187 non-null    float64
dtypes: float64(2), int64(2)
memory usage: 6.0 KB


Clean location data

In [69]:
ds_location_cleaned = ds_location[ds_location["project_id"] > 0]

In [70]:
number_of_records = ds_location_cleaned.shape[0]
print(f"Number of records cleaned {number_of_records}")

Number of records cleaned 187


In [72]:
records = json.loads(ds_location_cleaned.T.to_json()).values()
upserts=[UpdateOne({'_id':x['project_id']}, {'$setOnInsert':x}, upsert=True) for x in records]
db.st_locations.bulk_write(upserts)

<pymongo.results.BulkWriteResult at 0x7fa4fa4b4cd0>

# Data to presentation layer

Let's now build the final fact table: the goal is to create a denormalized table ready for analysis.

In [53]:
#client = pymongo.MongoClient("mongodb://xxxx:xx@xxxx:27017,xxxx:27017,xxx:27017/myFirstDatabase?ssl=true&replicaSet=atlas-14k1wg-shard-0&authSource=admin&retryWrites=true&w=majority")
db = client.indiegogo

In [73]:
st_project_cleaned = db.st_project_cleaned
st_concepts = db.st_concepts
st_locations = db.st_locations

In [55]:
st_project_cleaned.count_documents({})

9822

In [56]:
st_concepts.count_documents({})

108

In [74]:
st_locations.count_documents({})

187

In [None]:
# db.collection.find({}).forEach(function(x) {
#    t = db.collection2.findOne({chiave: x.chiave})
# })

In [None]:
### Example MongoDB -- NOT RUN!!!!

In [None]:
result = db.sc_images.aggregate([
    {
        '$match': {
            'value': {
                '$gt': 0.95
            }
        }
    }, {
        '$group': {
            '_id': '$project_id', 
            'count': {
                '$sum': 1
            }
        }
    }, {
        '$out': 'st_after_aggregate'
    }
])

In [None]:
# Join collections

In [81]:
df_concepts =  pd.DataFrame(list(st_concepts.find({}))).drop(columns=['_id'])
df_projects_cleaned =  pd.DataFrame(list(st_project_cleaned.find({})))
df_locations_cleaned =  pd.DataFrame(list(st_locations.find({}))).drop(columns=['_id'])


In [59]:
df_projects_cleaned.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 9822 entries, 0 to 9821
Data columns (total 11 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   _id                   9822 non-null   int64  
 1   category              9822 non-null   object 
 2   close_date            9820 non-null   object 
 3   currency              9822 non-null   object 
 4   funds_raised_amount   9822 non-null   int64  
 5   funds_raised_percent  9822 non-null   float64
 6   open_date             9820 non-null   object 
 7   project_id            9822 non-null   int64  
 8   tagline               9821 non-null   object 
 9   tags                  9819 non-null   object 
 10  title                 9822 non-null   object 
dtypes: float64(1), int64(3), object(7)
memory usage: 844.2+ KB


In [61]:
df_concepts.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 108 entries, 0 to 107
Data columns (total 2 columns):
 #   Column      Non-Null Count  Dtype 
---  ------      --------------  ----- 
 0   concepts    108 non-null    object
 1   project_id  108 non-null    object
dtypes: object(2)
memory usage: 1.8+ KB


In [76]:
df_ft_projects = df_projects_cleaned.merge(df_locations_cleaned, on='project_id', how='left')

In [77]:
df_ft_projects.head()

Unnamed: 0.1,_id_x,category,close_date,currency,funds_raised_amount,funds_raised_percent,open_date,project_id,tagline,tags,title,_id_y,Unnamed: 0,lat,lng
0,2084398,Tabletop Games,2017-04-24T23:59:59-07:00,USD,724662,10.323171,2017-04-23T23:59:59-07:00,2084398,Evil Dead 2: The Official Board Game is a horr...,['horror'],Evil Dead 2: The Official Board Game,2084398.0,51.0,48.754899,-122.478122
1,1759114,Writing & Publishing,2016-05-25T23:59:59-07:00,USD,1287686,16.89035,2016-05-24T23:59:59-07:00,1759114,A book that inspires girls with the stories of...,"['kids', 'books', 'female founders', 'social i...",Good Night Stories for Rebel Girls,1759114.0,24.0,34.052238,-118.243344
2,2394811,Tabletop Games,2018-10-22T23:59:59-07:00,USD,462432,42.744458,2018-10-21T23:59:59-07:00,2394811,Survive against all odds facing monsters & per...,"['fantasy', 'indie']",Unbroken: a solo game of survival and revenge,2394811.0,98.0,40.799904,-73.650719
3,2650630,Writing & Publishing,2021-01-04T23:59:59-08:00,USD,2596877,293.336625,2021-01-03T23:59:59-08:00,2650630,The Ultimate Guide To Rebuilding A Civilizatio...,"['books', 'burning man', 'design']",The Book,2650630.0,6.0,34.052238,-118.243344
4,2632142,Video Games,2020-10-06T23:59:59-07:00,USD,892729,20.053769,2020-10-05T23:59:59-07:00,2632142,Get Fit Fighting Your Way Through A Fantasy Wo...,"['wireless', 'apps', 'computers', 'design', 's...",QUELL: Real Fitness. Real Gaming.,2632142.0,29.0,37.78008,-122.420168


In [78]:
df_ft_projects.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 9822 entries, 0 to 9821
Data columns (total 15 columns):
 #   Column                Non-Null Count  Dtype  
---  ------                --------------  -----  
 0   _id_x                 9822 non-null   int64  
 1   category              9822 non-null   object 
 2   close_date            9820 non-null   object 
 3   currency              9822 non-null   object 
 4   funds_raised_amount   9822 non-null   int64  
 5   funds_raised_percent  9822 non-null   float64
 6   open_date             9820 non-null   object 
 7   project_id            9822 non-null   int64  
 8   tagline               9821 non-null   object 
 9   tags                  9819 non-null   object 
 10  title                 9822 non-null   object 
 11  _id_y                 183 non-null    float64
 12  Unnamed: 0            183 non-null    float64
 13  lat                   183 non-null    float64
 14  lng                   183 non-null    float64
dtypes: float64(5), int64(

In [80]:
records = json.loads(df_ft_projects.T.to_json()).values()
upserts=[UpdateOne({'_id':x['project_id']}, {'$setOnInsert':x}, upsert=True) for x in records]
db.ft_projects.bulk_write(upserts)

<pymongo.results.BulkWriteResult at 0x7fa4ff921490>