# Del 3: Optimizacija kode za velike datasete

Pripravimo datasete:

In [3]:
!tar -xJf data/data_del_06.tar.xz -C ./data/

In [1]:
import pandas as pd
import numpy as np

## CPU Bound Programs

### Bounds vs Limitations

<img alt="I/O bounds" src="images/CPU+and+I_O+bounds.png">

### Primer optimizacije

In [4]:
df = pd.read_csv('data/new_york_hotels.csv')

In [13]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1631 entries, 0 to 1630
Data columns (total 12 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   ean_hotel_id    1631 non-null   int64  
 1   name            1631 non-null   object 
 2   address1        1631 non-null   object 
 3   city            1631 non-null   object 
 4   state_province  1631 non-null   object 
 5   postal_code     1631 non-null   object 
 6   latitude        1631 non-null   float64
 7   longitude       1631 non-null   float64
 8   star_rating     1630 non-null   float64
 9   high_rate       1631 non-null   float64
 10  low_rate        1631 non-null   float64
 11  distance        1631 non-null   float64
dtypes: float64(6), int64(1), object(5)
memory usage: 153.0+ KB


In [5]:
df.head()

Unnamed: 0,ean_hotel_id,name,address1,city,state_province,postal_code,latitude,longitude,star_rating,high_rate,low_rate
0,269955,Hilton Garden Inn Albany/SUNY Area,1389 Washington Ave,Albany,NY,12206,42.68751,-73.81643,3.0,154.0272,124.0216
1,113431,Courtyard by Marriott Albany Thruway,1455 Washington Avenue,Albany,NY,12206,42.68971,-73.82021,3.0,179.01,134.0
2,108151,Radisson Hotel Albany,205 Wolf Rd,Albany,NY,12205,42.7241,-73.79822,3.0,134.17,84.16
3,254756,Hilton Garden Inn Albany Medical Center,62 New Scotland Ave,Albany,NY,12208,42.65157,-73.77638,3.0,308.2807,228.4597
4,198232,CrestHill Suites SUNY University Albany,1415 Washington Avenue,Albany,NY,12206,42.68873,-73.81854,3.0,169.39,89.39


In [10]:
import numpy as np

# Define a basic Haversine distance formula
def haversine(lat1, lon1, lat2, lon2):
    lat1, lon1, lat2, lon2 = map(np.deg2rad, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1 
    dlon = lon2 - lon1 
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    total = 2 * np.arcsin(np.sqrt(a)) 
    return total

#### Crude looping over DataFrame rows using indices

In [7]:
# Define a function to manually loop over all rows and return a series of distances
def haversine_looping(df):
    distance_list = []
    for i in range(0, len(df)):
        d = haversine(40.671, -73.985, df.iloc[i]['latitude'], df.iloc[i]['longitude'])
        distance_list.append(d)
    return distance_list

In [12]:
%%timeit
# Run the haversine looping function
# 160000
df['distance'] = haversine_looping(df)

646 ms ± 9 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### Looping with iterrows()

In [16]:
%%timeit
# Haversine applied on rows via iteration
haversine_series = []
for index, row in df.iterrows():
    haversine_series.append(haversine(40.671, -73.985, row['latitude'], row['longitude']))
df['distance'] = haversine_series

304 ms ± 23.5 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


#### Looping with apply()

In [15]:
%%timeit

# Timing apply on the Haversine function
df['distance'] = df.apply(lambda row: haversine(40.671, -73.985, row['latitude'], row['longitude']), axis=1)

84.6 ms ± 3.62 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


#### Vectorization with Pandas series

In [17]:
%%timeit 
# Vectorized implementation of Haversine applied on Pandas series
df['distance'] = haversine(40.671, -73.985, df['latitude'], df['longitude'])

2.57 ms ± 88.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


####  Vectorization with NumPy arrays

In [18]:
%%timeit
# Vectorized implementation of Haversine applied on NumPy arrays
df['distance'] = haversine(40.671, -73.985, df['latitude'].values, df['longitude'].values)

274 µs ± 5.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


## I/O Bound Programs

### I/O Bounds

<img src="./images/report_assembly.png">

<img src="./images/report_assembly_bidir.png">

### Profiling an I/O bound task

In [9]:
query = '''
SELECT DISTINCT teamID 
FROM Teams 
INNER JOIN TeamsFranchises ON Teams.franchID == TeamsFranchises.franchID 
WHERE TeamsFranchises.active = 'Y';
'''

In [11]:
import cProfile
import sqlite3

conn = sqlite3.connect("data/lahman2015.sqlite")



In [44]:
import cProfile
import sqlite3

query = "SELECT SUM(HR) FROM Batting WHERE teamId=?"
conn = sqlite3.connect("data/lahman2015.sqlite")
cur = conn.cursor()

def calculate_runs(teams):


### Blocking Tasks

In [18]:
import sqlite3

# Create an in memory database.
memory = sqlite3.connect(':memory:')

# Connect to our disk database.
disk = sqlite3.connect('data/lahman2015.sqlite')




In [20]:
import cProfile
import sqlite3



## Optimizing Python Code with pandas

### Basic Looping

### Select columns and rows efficiently


In [19]:
data = pd.read_csv('data/school.csv')
data.head(3)

Unnamed: 0,School ID,School Name,Building Code,Street Address,City,State,Zip Code
0,02M260,Clinton School Writers and Artists,M933,425 West 33rd Street,Manhattan,NY,10001
1,06M211,Inwood Early College for Health and Informatio...,M052,650 Academy Street,Manhattan,NY,10002
2,01M539,"New Explorations into Science, Technology and ...",M022,111 Columbia Street,Manhattan,NY,10002


In [26]:
top_cities = data['City'].value_counts().head(5).index.to_list()

In [28]:
top_cities

['Brooklyn', 'Bronx', 'Manhattan', 'Jamaica', 'Long Island City']

In [34]:
# slabo
%%time
data['City'][(data['City']).isin(top_cities) == False] = 'Others'

CPU times: user 7.11 ms, sys: 0 ns, total: 7.11 ms
Wall time: 5.5 ms


A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy


In [35]:
%%timeit
data.loc[(data['City']).isin(top_cities) == False, 'City'] = 'Others'

888 µs ± 86.4 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


### Uporaba biult-in funkciji

### Joining on indexes is faster than joining on columns

Construct some sample data:

In [36]:
n = 100000

i1 = np.arange(n)
np.random.shuffle(i1)
df1 = pd.DataFrame({'i': i1,
                    'j': np.random.randint(1,1000,n),
                    'k': np.random.randint(1,1000,n)})

i2 = np.arange(n)
np.random.shuffle(i1)
df2 = pd.DataFrame({'i': i2,
                    'm': np.random.randint(1,1000,n),
                    'n': np.random.randint(1,1000,n)})

## PRIMER: Pohitritev pandas kode

### Naloga

### Priprava podatkov

In [37]:
import pandas as pd

In [38]:
df = pd.read_csv('data/demand_profile.csv')

In [39]:
df.head()

Unnamed: 0,date_time,energy_kwh
0,1/1/13 0:00,0.586
1,1/1/13 1:00,0.58
2,1/1/13 2:00,0.572
3,1/1/13 3:00,0.596
4,1/1/13 4:00,0.592


In [40]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8760 entries, 0 to 8759
Data columns (total 2 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   date_time   8760 non-null   object 
 1   energy_kwh  8760 non-null   float64
dtypes: float64(1), object(1)
memory usage: 137.0+ KB


In [41]:
df.dtypes

date_time      object
energy_kwh    float64
dtype: object

In [42]:
df['date_time'] = pd.to_datetime(df['date_time'])
df['date_time'].dtype

dtype('<M8[ns]')

In [43]:
df.head(9)

Unnamed: 0,date_time,energy_kwh
0,2013-01-01 00:00:00,0.586
1,2013-01-01 01:00:00,0.58
2,2013-01-01 02:00:00,0.572
3,2013-01-01 03:00:00,0.596
4,2013-01-01 04:00:00,0.592
5,2013-01-01 05:00:00,0.592
6,2013-01-01 06:00:00,0.596
7,2013-01-01 07:00:00,0.239
8,2013-01-01 08:00:00,0.566


In [46]:
def convert(df, column_name):
    return pd.to_datetime(df[column_name])

In [47]:
def convert_with_format(df, column_name):
    return pd.to_datetime(df[column_name], format='%d/%m/%y %H:%M')

In [48]:
df = pd.read_csv('data/demand_profile.csv')
df_converted = df.copy()

In [49]:
%%timeit -r 3 -n 10
df_converted['date_time'] = convert(df, 'date_time')

1.01 s ± 12.7 ms per loop (mean ± std. dev. of 3 runs, 10 loops each)


In [50]:
df_converted = df.copy()

In [51]:
%%timeit -r 3 -n 10
df_converted['date_time'] = convert_with_format(df, 'date_time')

44.1 ms ± 1.64 ms per loop (mean ± std. dev. of 3 runs, 10 loops each)


In [55]:
df_converted.head()

Unnamed: 0,date_time,energy_kwh
0,2013-01-01 00:00:00,0.586
1,2013-01-01 01:00:00,0.58
2,2013-01-01 02:00:00,0.572
3,2013-01-01 03:00:00,0.596
4,2013-01-01 04:00:00,0.592


In [56]:
df_test = df_converted.copy()

In [59]:
df_test['cost_cents'] = df['energy_kwh'] * 28
df_test.head()

Unnamed: 0,date_time,energy_kwh,cost_cents
0,2013-01-01 00:00:00,0.586,16.408
1,2013-01-01 01:00:00,0.58,16.24
2,2013-01-01 02:00:00,0.572,16.016
3,2013-01-01 03:00:00,0.596,16.688
4,2013-01-01 04:00:00,0.592,16.576


### 1) Simple Looping Over Pandas Data

In [62]:
def apply_tariff(kwh, hour):
    """Calculates cost of electricity for given hour."""    
    if 0 <= hour < 7:
        rate = 12
    elif 7 <= hour < 17:
        rate = 20
    elif 17 <= hour < 24:
        rate = 28
    else:
        raise ValueError(f'Invalid hour: {hour}')
    return rate * kwh

In [63]:
# Tega ne delamo !!!
def apply_tariff_loop(df):
    """Calculate costs in loop.  Modifies `df` inplace."""
    energy_cost_list = []
    for i in range(len(df)):
        energy_used = df.iloc[i]['energy_kwh']
        hour = df.iloc[i]['date_time'].hour
        energy_cost = apply_tariff(energy_used, hour)
        energy_cost_list.append(energy_cost)
    df['cost_cents'] = energy_cost_list

In [64]:
%%timeit
apply_tariff_loop(df_converted)

4.04 s ± 203 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### 2) Looping with .itertuples() and .iterrows()

In [65]:
def apply_tariff_iterrows(df):
    energy_cost_list = []
    for index, row in df.iterrows():
        # Get electricity used and hour of day
        energy_used = row['energy_kwh']
        hour = row['date_time'].hour
        # Append cost list
        energy_cost = apply_tariff(energy_used, hour)
        energy_cost_list.append(energy_cost)
    df['cost_cents'] = energy_cost_list

In [66]:
%%timeit
apply_tariff_iterrows(df_converted)

1.06 s ± 68.8 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### 3) Pandas’ .apply()

In [67]:
%%timeit
df['cost_cents'] = df_converted.apply(lambda row: apply_tariff(kwh=row['energy_kwh'],hour=row['date_time'].hour), axis=1)

209 ms ± 6.09 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### 4) Selecting Data With .isin()

In [68]:
df_converted.set_index('date_time', inplace=True)
df_converted.head()

Unnamed: 0_level_0,energy_kwh,cost_cents
date_time,Unnamed: 1_level_1,Unnamed: 2_level_1
2013-01-01 00:00:00,0.586,7.032
2013-01-01 01:00:00,0.58,6.96
2013-01-01 02:00:00,0.572,6.864
2013-01-01 03:00:00,0.596,7.152
2013-01-01 04:00:00,0.592,7.104


In [71]:
df_converted.index.hour.isin(range(0, 7))

array([ True,  True,  True, ..., False, False, False])

In [72]:
%%timeit
peak_hours = df_converted.index.hour.isin(range(17, 24))
shoulder_hours = df_converted.index.hour.isin(range(7, 17))
off_peak_hours = df_converted.index.hour.isin(range(0, 7))

df.loc[peak_hours, 'cost_cents'] = df.loc[peak_hours, 'energy_kwh'] * 28
df.loc[shoulder_hours,'cost_cents'] = df.loc[shoulder_hours, 'energy_kwh'] * 20
df.loc[off_peak_hours,'cost_cents'] = df.loc[off_peak_hours, 'energy_kwh'] * 12

6.52 ms ± 267 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


### 5) Pandas’ pd.cut() function

In [74]:
pd.cut(x=df_converted.index.hour,
      bins=[0,7,17,24],
      include_lowest=True,
      labels=[12, 20, 28])

[12, 12, 12, 12, 12, ..., 28, 28, 28, 28, 28]
Length: 8760
Categories (3, int64): [12 < 20 < 28]

In [78]:
%%timeit
cents_per_kwh = pd.cut(x=df_converted.index.hour,
                      bins=[0,7,17,24],
                      include_lowest=True,
                      labels=[12, 20, 28]).astype(int)

df_converted['cost_cents'] = cents_per_kwh * df_converted['energy_kwh']

2.75 ms ± 197 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)


# 6) Using NumPy

In [80]:
%%timeit
prices = np.array([12, 20, 28])
bins = np.digitize(df_converted.index.hour.values, bins=[7, 17, 24])
df['cost_cents'] = prices[bins] * df['energy_kwh'].values

1.14 ms ± 68.7 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)


# 7) Using Dask

In [None]:
#Dask omogoča multiprocesorsko računanje
#https://realpython.com/python-concurency/
#https://dask.org
#https://tutorial.dask.org/00_overview.html

In [2]:
#Paralelizacija kode z dask.delayed

from dask.distributed import Client

In [3]:
client = Client(n_workers = 4)

In [4]:
from time import sleep

def inc(x):
    sleep(1)
    return x+1

def add (x,y):
    sleep(1)
    return x+y

In [5]:
%%time
x = inc(1)
y = inc(2)
z = add(x,y)

CPU times: user 147 ms, sys: 23.8 ms, total: 171 ms
Wall time: 3 s


In [6]:
from dask import delayed

In [18]:
%%time
x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x,y)

CPU times: user 1.91 ms, sys: 716 µs, total: 2.63 ms
Wall time: 1.29 ms


In [19]:
z

Delayed('add-0f49c443-816a-49bc-95a6-6b14783e44a7')

In [20]:
%%time
z.compute()

CPU times: user 123 ms, sys: 5.87 ms, total: 129 ms
Wall time: 2.02 s


5

### Pandas dask

In [29]:
import os
import pandas as pd
from glob import glob

In [31]:
filenames = sorted(glob(os.path.join('data','flightdelays', '*.csv')))

In [34]:
%%time
sums = []
counts = []

for fn in filenames:
    df = pd.read_csv(fn)
    
    by_origin = df.groupby('ORIGIN')
    total = by_origin['ARR_DELAY'].sum()
    count = by_origin['ARR_DELAY'].count()
    
    sums.append(total)
    counts.append(count)
    
total_delays = sum(sums)
n_flights = sum(counts)

mean = total_delays / n_flights

CPU times: user 1.27 s, sys: 0 ns, total: 1.27 s
Wall time: 1.49 s


In [35]:
mean

ORIGIN
ABE    -3.398964
ABQ     2.963672
ABR     5.645161
ABY     0.093023
ACK          NaN
         ...    
WRG    -2.500000
WYS          NaN
XNA     2.380165
YAK   -14.721311
YUM    -2.701149
Name: ARR_DELAY, Length: 301, dtype: float64

In [38]:
%%time

from dask import compute
sums = []
counts = []

for fn in filenames:
    df = delayed(pd.read_csv)(fn)
    
    by_origin = df.groupby('ORIGIN')
    
    total = by_origin['ARR_DELAY'].sum()
    count = by_origin['ARR_DELAY'].count()
    
    sums.append(total)
    counts.append(count)
    
sums, counts = compute(sums, counts)
    
total_delays = sum(sums)
n_flights = sum(counts)

mean = total_delays / n_flights

CPU times: user 86.3 ms, sys: 0 ns, total: 86.3 ms
Wall time: 566 ms


### Primer: Taxi Vožnje

In [46]:
filenames = sorted(glob(os.path.join('data','nyctaxi', '*.csv')))

In [40]:
filenames

['data/nyctaxi/yellow_tripdata_2015-01.csv',
 'data/nyctaxi/yellow_tripdata_2015-02.csv',
 'data/nyctaxi/yellow_tripdata_2015-03.csv']

In [48]:
def read_file(fname):
    return pd.read_csv(fname, parse_dates = [1,2])

def count_long_trips(df):
    df['duration'] = (df['tpep_dropoff_datetime']- df['tpep_pickup_datetime']).dt.seconds
    is_long_trip = df['duration'] > 1200
    result_dict = {'n_long': [sum(is_long_trip)], 'n_total': [len(df)]}
    return pd.DataFrame(result_dict)

In [49]:
%%time

totals = []

for fname in filenames:
    temp = count_long_trips(read_file(fname))
    totals.append(temp)
    
annual_total = sum(totals)

CPU times: user 733 ms, sys: 27.2 ms, total: 760 ms
Wall time: 878 ms


In [50]:
annual_total

Unnamed: 0,n_long,n_total
0,35321,201524


In [51]:
#isto kot zgoraj, samo z uporabo list comprehantions
annual_totals = sum([count_long_trips(read_file(fname)) for fname in filenames])

In [52]:
annual_totals

Unnamed: 0,n_long,n_total
0,35321,201524


### DASK

In [53]:
@delayed
def read_file(fname):
    return pd.read_csv(fname, parse_dates = [1,2])

@delayed
def count_long_trips(df):
    df['duration'] = (df['tpep_dropoff_datetime']- df['tpep_pickup_datetime']).dt.seconds
    is_long_trip = df['duration'] > 1200
    result_dict = {'n_long': [sum(is_long_trip)], 'n_total': [len(df)]}
    return pd.DataFrame(result_dict)

In [54]:
annual_totals = sum([count_long_trips(read_file(fname)) for fname in filenames])

In [55]:
annual_totals

Delayed('add-bdc564c83d81ee3c8ec73aba16631a22')

In [56]:
%%time
annual_totals.compute()

CPU times: user 23.8 ms, sys: 25.8 ms, total: 49.6 ms
Wall time: 438 ms


Unnamed: 0,n_long,n_total
0,35321,201524
