# How to Use Multithreading and Multiprocessing In Data Analysis

In [1]:
import requests
import pandas as pd
from bs4 import BeautifulSoup
import multiprocessing

#### Get the Downloadable Links for all files

In [2]:
src_url = "https://dataverse.harvard.edu/dataset.xhtml?persistentId=doi:10.7910/DVN/HG7NV7"
response = requests.get(src_url)
response.status_code

200

In [9]:
# response.content

In [4]:
soup =  BeautifulSoup(response.content)

In [11]:
raw_text = soup.find_all('script')[2].text

In [12]:
type(raw_text)

str

In [13]:
raw_text

'{"@context":"http://schema.org","@type":"Dataset","@id":"https://doi.org/10.7910/DVN/HG7NV7","identifier":"https://doi.org/10.7910/DVN/HG7NV7","name":"Data Expo 2009: Airline on time data","creator":[],"author":[],"datePublished":"2008-10-07","dateModified":"2008-10-06","version":"1","description":["Bi-Annual Data Exposition: Every other year, at the Joint Statistical Meetings, the Graphics Section and the Computing Section join in sponsoring a special Poster Session called The Data Exposition , but more commonly known as The Data Expo. All of the papers presented in this Poster Session are reports of analyses of a common data set provided for the occasion. In addition, all papers presented in the session are encouraged to report the use of graphical methods employed during the developme nt of their analysis and to use graphics to convey their findings. Airline on-time performance: Have you ever been stuck in an airport because your flight was delayed or cancelled and wondered if you 

## Extract ContentUrl and Name from the string.
- Regular Exression
- eval()

### Regular Expression

In [96]:
pattern = r"https://dataverse.harvard.edu/api/access/datafile/[0-9]+"

In [97]:
import re
r = re.compile(pattern)
url_list = r.findall(raw_text)

In [98]:
url_list

['https://dataverse.harvard.edu/api/access/datafile/1375005',
 'https://dataverse.harvard.edu/api/access/datafile/1375004',
 'https://dataverse.harvard.edu/api/access/datafile/1375003',
 'https://dataverse.harvard.edu/api/access/datafile/1375002',
 'https://dataverse.harvard.edu/api/access/datafile/1375001',
 'https://dataverse.harvard.edu/api/access/datafile/1375000',
 'https://dataverse.harvard.edu/api/access/datafile/1374999',
 'https://dataverse.harvard.edu/api/access/datafile/1374998',
 'https://dataverse.harvard.edu/api/access/datafile/1374997',
 'https://dataverse.harvard.edu/api/access/datafile/1374996',
 'https://dataverse.harvard.edu/api/access/datafile/1374995',
 'https://dataverse.harvard.edu/api/access/datafile/1374994',
 'https://dataverse.harvard.edu/api/access/datafile/1374993',
 'https://dataverse.harvard.edu/api/access/datafile/1374929',
 'https://dataverse.harvard.edu/api/access/datafile/1374928',
 'https://dataverse.harvard.edu/api/access/datafile/1374927',
 'https:

In [94]:
pattern1 = r"[0-9]+.csv.bz2"
match = re.compile(pattern1)
name_list = match.findall(raw_text)

In [95]:
name_list

['1987.csv.bz2',
 '1988.csv.bz2',
 '1989.csv.bz2',
 '1990.csv.bz2',
 '1991.csv.bz2',
 '1992.csv.bz2',
 '1993.csv.bz2',
 '1994.csv.bz2',
 '1995.csv.bz2',
 '1996.csv.bz2',
 '1997.csv.bz2',
 '1998.csv.bz2',
 '1999.csv.bz2',
 '2000.csv.bz2',
 '2001.csv.bz2',
 '2002.csv.bz2',
 '2003.csv.bz2',
 '2004.csv.bz2',
 '2005.csv.bz2',
 '2006.csv.bz2',
 '2007.csv.bz2',
 '2008.csv.bz2']

### eval()

In [107]:
converted_dict = eval(raw_text.strip())

In [109]:
type(converted_dict)

dict

In [117]:
# filename:url
dataUrls = [{value['name']:value['contentUrl']} for value in converted_dict["distribution"]] 

In [118]:
dataUrls

[{'1987.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1375005'},
 {'1988.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1375004'},
 {'1989.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1375003'},
 {'1990.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1375002'},
 {'1991.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1375001'},
 {'1992.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1375000'},
 {'1993.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1374999'},
 {'1994.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1374998'},
 {'1995.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1374997'},
 {'1996.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1374996'},
 {'1997.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1374995'},
 {'1998.csv.bz2': 'https://dataverse.harvard.edu/api/access/datafile/1374994'},
 {'1999.csv.bz2': 'https://dataverse.har

### Here i want to demonstrate the Multiporcessing so i am taking only from 1897 to 2008 files.

In [147]:
URLS = [value for dict_url in dataUrls[:-4] for key,value in dict_url.items()]

In [148]:
URLS

['https://dataverse.harvard.edu/api/access/datafile/1375005',
 'https://dataverse.harvard.edu/api/access/datafile/1375004',
 'https://dataverse.harvard.edu/api/access/datafile/1375003',
 'https://dataverse.harvard.edu/api/access/datafile/1375002',
 'https://dataverse.harvard.edu/api/access/datafile/1375001',
 'https://dataverse.harvard.edu/api/access/datafile/1375000',
 'https://dataverse.harvard.edu/api/access/datafile/1374999',
 'https://dataverse.harvard.edu/api/access/datafile/1374998',
 'https://dataverse.harvard.edu/api/access/datafile/1374997',
 'https://dataverse.harvard.edu/api/access/datafile/1374996',
 'https://dataverse.harvard.edu/api/access/datafile/1374995',
 'https://dataverse.harvard.edu/api/access/datafile/1374994',
 'https://dataverse.harvard.edu/api/access/datafile/1374993',
 'https://dataverse.harvard.edu/api/access/datafile/1374929',
 'https://dataverse.harvard.edu/api/access/datafile/1374928',
 'https://dataverse.harvard.edu/api/access/datafile/1374927',
 'https:

# Multiprocessing Implementations

In [124]:
import os
import concurrent.futures

In [129]:
!pip install tqdm

Collecting tqdm
  Downloading tqdm-4.64.1-py2.py3-none-any.whl (78 kB)
     -------------------------------------- 78.5/78.5 KB 483.9 kB/s eta 0:00:00
Installing collected packages: tqdm
Successfully installed tqdm-4.64.1


You should consider upgrading via the 'C:\Users\003EPO744\AppData\Local\Programs\Python\Python310\python.exe -m pip install --upgrade pip' command.


In [130]:
from tqdm import tqdm

In [125]:
# Define features to load when reading the CSVs into pandas
FEATURES_OF_INTEREST = [
    'Year',
    'UniqueCarrier',
    'DepDelay'
]

# Features to group by
GROUP_BY = ['Year', 'UniqueCarrier']

# Mapping of aggregation functions to features
AGG_MAP = {
    'DepDelay': lambda x: (x>15).sum()/len(x)
}

In [153]:
def load_url(url,process_executor):
    """Downloads URL if the file does not already exist
    Passes filename to aggregation function via a process executor for conccurent aggregation.
    Returns a reference to the future object created by the process executor
    """
    filename = url.split('/')[-1]
    if not os.path.exists(filename):
        print(f'Downloading {url}')
        r = requests.get(url)
        with open(filename, 'wb') as f:
            for chunk in tqdm(r.iter_content(chunk_size=128)):
                f.write(chunk)
        print(f'Downloaded {url}. Passing to aggregator.')
    else:
        print(f'{filename} already exists. Passing to aggregator.')
    return process_executor.submit(aggregate_data, filename)

def aggregate_data(filename):
    """Aggregate data from raw file
    The files are stored in bz2 format. Pandas is used to decompress the file, extract
    FEATURES_OF_INTEREST and calculate aggregated information.
    """
    print(f'Processing {filename}')
    # Unzip and read the csv
    df = pd.read_csv(filename, usecols=FEATURES_OF_INTEREST, compression='bz2')
    # Convert the carrier codes to their full names
#     df['UniqueCarrier'] = df['UniqueCarrier'].map(CARRIER_MAP)
    # Aggregate and reduce the dataframe
    df = df.groupby(GROUP_BY).agg(AGG_MAP)
#    df.set_index(GROUP_BY, inplace=True)
    print(f'Processed: {filename}')
    return df

In [132]:
# Test the load_url function.
load_url('https://dataverse.harvard.edu/api/access/datafile/1375005')

Downloading https://dataverse.harvard.edu/api/access/datafile/1375005


98848it [00:00, 1504288.88it/s]

Downloaded https://dataverse.harvard.edu/api/access/datafile/1375005. Passing to aggregator.





In [139]:
# Check the content of file
df = pd.read_csv('1375005',compression='bz2')
df.head()

Unnamed: 0,Year,Month,DayofMonth,DayOfWeek,DepTime,CRSDepTime,ArrTime,CRSArrTime,UniqueCarrier,FlightNum,...,TaxiIn,TaxiOut,Cancelled,CancellationCode,Diverted,CarrierDelay,WeatherDelay,NASDelay,SecurityDelay,LateAircraftDelay
0,1987,10,14,3,741.0,730,912.0,849,PS,1451,...,,,0,,0,,,,,
1,1987,10,15,4,729.0,730,903.0,849,PS,1451,...,,,0,,0,,,,,
2,1987,10,17,6,741.0,730,918.0,849,PS,1451,...,,,0,,0,,,,,
3,1987,10,18,7,729.0,730,847.0,849,PS,1451,...,,,0,,0,,,,,
4,1987,10,19,1,749.0,730,922.0,849,PS,1451,...,,,0,,0,,,,,


In [140]:
df.count

<bound method DataFrame.count of          Year  Month  DayofMonth  DayOfWeek  DepTime  CRSDepTime  ArrTime  \
0        1987     10          14          3    741.0         730    912.0   
1        1987     10          15          4    729.0         730    903.0   
2        1987     10          17          6    741.0         730    918.0   
3        1987     10          18          7    729.0         730    847.0   
4        1987     10          19          1    749.0         730    922.0   
...       ...    ...         ...        ...      ...         ...      ...   
1311821  1987     12          11          5   1530.0        1530   1825.0   
1311822  1987     12          13          7   1530.0        1530   1815.0   
1311823  1987     12          14          1   1530.0        1530   1807.0   
1311824  1987     12           1          2   1525.0        1525   1643.0   
1311825  1987     12           2          3   1540.0        1525   1706.0   

         CRSArrTime UniqueCarrier  FlightN

In [141]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1311826 entries, 0 to 1311825
Data columns (total 29 columns):
 #   Column             Non-Null Count    Dtype  
---  ------             --------------    -----  
 0   Year               1311826 non-null  int64  
 1   Month              1311826 non-null  int64  
 2   DayofMonth         1311826 non-null  int64  
 3   DayOfWeek          1311826 non-null  int64  
 4   DepTime            1292141 non-null  float64
 5   CRSDepTime         1311826 non-null  int64  
 6   ArrTime            1288326 non-null  float64
 7   CRSArrTime         1311826 non-null  int64  
 8   UniqueCarrier      1311826 non-null  object 
 9   FlightNum          1311826 non-null  int64  
 10  TailNum            0 non-null        float64
 11  ActualElapsedTime  1288326 non-null  float64
 12  CRSElapsedTime     1311826 non-null  int64  
 13  AirTime            0 non-null        float64
 14  ArrDelay           1288326 non-null  float64
 15  DepDelay           1292141 non-n

In [142]:
df["UniqueCarrier"]

0          PS
1          PS
2          PS
3          PS
4          PS
           ..
1311821    CO
1311822    CO
1311823    CO
1311824    CO
1311825    CO
Name: UniqueCarrier, Length: 1311826, dtype: object

In [157]:
if __name__ == '__main__':
    # Verify feature mapping variables are correct before beginning
    for feature in GROUP_BY:
        assert feature in FEATURES_OF_INTEREST, f'Unexpected feature {feature} in group_by.'
    for feature in AGG_MAP.keys():
        assert feature in FEATURES_OF_INTEREST, f'Unexpected feature {feature} in aggregation map.'

    # This is where all the concurrent processes and threads are created
    with concurrent.futures.ProcessPoolExecutor() as pe, concurrent.futures.ThreadPoolExecutor() as te:
        # First, the ThreadPoolExecutor is used to download each file.
        future_url_request = [te.submit(load_url, url, pe) for url in URLS[:2]]

        # As each download thread completes it returns a Future object from the process executor
        processes = []
        for future in concurrent.futures.as_completed(future_url_request):
            processes.append(future.result())

        # As each process completes it returns and aggregated dataframe
        aggregated_data = []
        print("Executing-----")
        print(processes)
        for future in concurrent.futures.as_completed(processes):
            aggregated_data.append(future.result())

    # Finally, the dataframes are concatenated 
    results = pd.concat(aggregated_data)
    # Resetting the index converts the result to tidy data format
    results.reset_index(inplace=True)
    results.to_csv('flight_data.csv')

1375005 already exists. Passing to aggregator.
1375004 already exists. Passing to aggregator.
Executing-----
[<Future at 0x19359656d40 state=running>, <Future at 0x19356f16cb0 state=running>]


BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.