# Del 15: Procesiranje velikih datasetov - hitrost

In [4]:
import pandas as pd
import numpy as np

## CPU Bound Programs

### Bounds vs Limitations

<img alt="I/O bounds" src="images/CPU+and+I_O+bounds.png">

- CPU optimizacija
    - izboljšanje algoritma
    - multi-processing
- I/O
    - multi-threading (asinhorno pošljemo requeste na bazo... tako da CPU ne čaka vsakega reply-ja posebej)
    - asyncio

### Primer optimizacije

In [5]:
import numpy as np

# Define a basic Haversine distance formula
def haversine(lat1, lon1, lat2, lon2):
    MILES = 3959
    lat1, lon1, lat2, lon2 = map(np.deg2rad, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1 
    dlon = lon2 - lon1 
    a = np.sin(dlat/2)**2 + np.cos(lat1) * np.cos(lat2) * np.sin(dlon/2)**2
    c = 2 * np.arcsin(np.sqrt(a)) 
    total_miles = MILES * c
    return total_miles

In [6]:
df = pd.read_csv('data/new_york_hotels.csv')

In [10]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 1631 entries, 0 to 1630
Data columns (total 12 columns):
 #   Column          Non-Null Count  Dtype  
---  ------          --------------  -----  
 0   ean_hotel_id    1631 non-null   int64  
 1   name            1631 non-null   object 
 2   address1        1631 non-null   object 
 3   city            1631 non-null   object 
 4   state_province  1631 non-null   object 
 5   postal_code     1631 non-null   object 
 6   latitude        1631 non-null   float64
 7   longitude       1631 non-null   float64
 8   star_rating     1630 non-null   float64
 9   high_rate       1631 non-null   float64
 10  low_rate        1631 non-null   float64
 11  distance        1631 non-null   float64
dtypes: float64(6), int64(1), object(5)
memory usage: 153.0+ KB


#### Crude looping over DataFrame rows using indices

In [7]:
# Define a function to manually loop over all rows and return a series of distances
def haversine_looping(df):
    distance_list = []
    for i in range(0, len(df)):
        d = haversine(40.671, -73.985, df.iloc[i]['latitude'], df.iloc[i]['longitude'])
        distance_list.append(d)
    return distance_list

In [8]:
%%timeit
# Run the haversine looping function
df['distance'] = haversine_looping(df)

169 ms ± 2.78 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


#### Looping with iterrows()

In [11]:
%%timeit
# Haversine applied on rows via iteration
haversine_series = []
for index, row in df.iterrows():
    haversine_series.append(haversine(40.671, -73.985, row['latitude'], row['longitude']))
df['distance'] = haversine_series

61.8 ms ± 1.91 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)


#### Looping with apply()

In [12]:
%%timeit

# Timing apply on the Haversine function
df['distance'] = df.apply(lambda row: haversine(40.671, -73.985, row['latitude'], row['longitude']), axis=1)

20.1 ms ± 560 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)


#### Vectorization with Pandas series

In [13]:
%%timeit 
# Vectorized implementation of Haversine applied on Pandas series
df['distance'] = haversine(40.671, -73.985, df['latitude'], df['longitude'])

824 µs ± 3.69 µs per loop (mean ± std. dev. of 7 runs, 1,000 loops each)


####  Vectorization with NumPy arrays

In [14]:
%%timeit
# Vectorized implementation of Haversine applied on NumPy arrays
df['distance'] = haversine(40.671, -73.985, df['latitude'].values, df['longitude'].values)

88.3 µs ± 1.98 µs per loop (mean ± std. dev. of 7 runs, 10,000 loops each)


This brings us to a few basic conclusions on optimizing Pandas code:
1. Avoid loops; they’re slow and, in most common use cases, unnecessary.
2. If you must loop, use apply(), not iteration functions.
3. Vectorization is usually better than scalar operations. Most common operations in Pandas can be vectorized.
4. Vector operations on NumPy arrays are more efficient than on native Pandas series.

## I/O Bound Programs

### I/O Bounds

<img src="./images/report_assembly.png">

<img src="./images/report_assembly_bidir.png">

## Optimizing Python Code with pandas

### Basic Looping

### Select columns and rows efficiently


In [None]:
data = pd.read_csv('data/school.csv')
data.head(3)

In [None]:
data['City'].value_counts().head(10)

In [None]:
# save the top cities in a list
top_cities = ['Brooklyn','Bronx','Manhattan','Jamaica','Long Island City']

In [None]:
%%timeit
data.loc[(data['City'].isin(top_cities) == False),'City'] = 'Others'

In [None]:
data.City.value_counts()

In [None]:
data = pd.read_csv('data/school.csv')

In [None]:
%%timeit
data['City'][(data['City'].isin(top_cities) == False)] = 'Others'
# salba praksa

In [None]:
data.City.value_counts()

### Uporaba biult-in funkciji

### Joining on indexes is faster than joining on columns

In [None]:
n = 100000

i1 = np.arange(n)
np.random.shuffle(i1)
df1 = pd.DataFrame({'i': i1,
                    'j': np.random.randint(1,1000,n),
                    'k': np.random.randint(1,1000,n)})

i2 = np.arange(n)
np.random.shuffle(i1)
df2 = pd.DataFrame({'i': i2,
                    'm': np.random.randint(1,1000,n),
                    'n': np.random.randint(1,1000,n)})

In [None]:
df1.head()

In [None]:
df2.head()

In [None]:
%%timeit
df1.merge(df2, on="i")

In [None]:
df1 = df1.set_index('i')
df2 = df2.set_index('i')

In [None]:
%%timeit
df1.merge(df2, left_index=True, right_index=True)

## PRIMER: Pohitritev pandas kode

### Naloga

**The goal of this example will be to apply time-of-use energy tariffs to find the total cost of energy consumption for one year.**



### Priprava podatkov

In [15]:
import pandas as pd

In [16]:
df = pd.read_csv('data/demand_profile.csv')

In [17]:
df.head()

Unnamed: 0,date_time,energy_kwh
0,1/1/13 0:00,0.586
1,1/1/13 1:00,0.58
2,1/1/13 2:00,0.572
3,1/1/13 3:00,0.596
4,1/1/13 4:00,0.592


In [18]:
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 8760 entries, 0 to 8759
Data columns (total 2 columns):
 #   Column      Non-Null Count  Dtype  
---  ------      --------------  -----  
 0   date_time   8760 non-null   object 
 1   energy_kwh  8760 non-null   float64
dtypes: float64(1), object(1)
memory usage: 137.0+ KB


In [19]:
df.dtypes

date_time      object
energy_kwh    float64
dtype: object

In [20]:
type(df.iloc[0, 0])

str

In [21]:
df['date_time'] = pd.to_datetime(df['date_time'])
df['date_time'].dtype

dtype('<M8[ns]')

In [22]:
df.head()

Unnamed: 0,date_time,energy_kwh
0,2013-01-01 00:00:00,0.586
1,2013-01-01 01:00:00,0.58
2,2013-01-01 02:00:00,0.572
3,2013-01-01 03:00:00,0.596
4,2013-01-01 04:00:00,0.592


In [23]:
def convert(df, column_name):
    return pd.to_datetime(df[column_name])

df = pd.read_csv('data/demand_profile.csv')
df_coverted = df.copy()

In [24]:
%%timeit -r 3 -n 10
df_coverted['date_time'] = convert(df, 'date_time')

393 ms ± 3.77 ms per loop (mean ± std. dev. of 3 runs, 10 loops each)


In [25]:
def convert_with_format(df, column_name):
    return pd.to_datetime(df[column_name], format='%d/%m/%y %H:%M')

In [26]:
%%timeit -r 3 -n 10
df_coverted['date_time'] = convert_with_format(df, 'date_time')

17.2 ms ± 834 µs per loop (mean ± std. dev. of 3 runs, 10 loops each)


In [None]:
859/18

In [None]:
df_coverted.head()

In [None]:
df_coverted.info()

### 1) Simple Looping Over Pandas Data

<table class="table table-hover">
<thead>
<tr>
<th>Tariff Type</th>
<th>Cents per kWh</th>
<th>Time Range</th>
</tr>
</thead>
<tbody>
<tr>
<td>Peak</td>
<td>28</td>
<td>17:00 to 24:00</td>
</tr>
<tr>
<td>Shoulder</td>
<td>20</td>
<td>7:00 to 17:00</td>
</tr>
<tr>
<td>Off-Peak</td>
<td>12</td>
<td>0:00 to 7:00</td>
</tr>
</tbody>
</table>

In [None]:
df_test = df_coverted.copy()
df_test['cost_cents'] = df['energy_kwh'] * 28

In [None]:
df_test.head()

In [None]:
def apply_tariff(kwh, hour):
    """Calculates cost of electricity for given hour."""    
    if 0 <= hour < 7:
        rate = 12
    elif 7 <= hour < 17:
        rate = 20
    elif 17 <= hour < 24:
        rate = 28
    else:
        raise ValueError(f'Invalid hour: {hour}')
    return rate * kwh

In [None]:
# NOTE: Don't do this!
def apply_tariff_loop(df):
    """Calculate costs in loop.  Modifies `df` inplace."""
    energy_cost_list = []
    for i in range(len(df)):
        # Get electricity used and hour of day
        energy_used = df.iloc[i]['energy_kwh']
        hour = df.iloc[i]['date_time'].hour
        energy_cost = apply_tariff(energy_used, hour)
        energy_cost_list.append(energy_cost)
    df['cost_cents'] = energy_cost_list

In [None]:
%%timeit -r 3 -n 10
apply_tariff_loop(df_coverted)

### 2) Looping with .itertuples() and .iterrows()

In [None]:
for index, row in df[:5].iterrows():
    print(index)
    print(row)
    print('energy_kwh' ,row['energy_kwh'])
    print('-------')

In [None]:
def apply_tariff_iterrows(df):
    energy_cost_list = []
    for index, row in df.iterrows():
        # Get electricity used and hour of day
        energy_used = row['energy_kwh']
        hour = row['date_time'].hour
        # Append cost list
        energy_cost = apply_tariff(energy_used, hour)
        energy_cost_list.append(energy_cost)
    df['cost_cents'] = energy_cost_list

In [None]:
%%timeit -r 3 -n 10
apply_tariff_iterrows(df_coverted)

### 3) Pandas’ .apply()

In [None]:
def apply_tariff_withapply(df):
    df['cost_cents'] = df.apply(lambda row: apply_tariff(
        kwh=row['energy_kwh'],
        hour=row['date_time'].hour), axis=1)

In [None]:
%%timeit -r 3 -n 10
apply_tariff_withapply(df_coverted)

### 4) Selecting Data With .isin()

In [None]:
df_coverted = df.copy()
df_coverted['date_time'] = convert_with_format(df, 'date_time')
df_coverted.set_index('date_time', inplace=True)

In [None]:
df_coverted.head()

In [None]:
def apply_tariff_isin(df):
    # Define hour range Boolean arrays
    peak_hours = df.index.hour.isin(range(17, 24))
    shoulder_hours = df.index.hour.isin(range(7, 17))
    off_peak_hours = df.index.hour.isin(range(0, 7))
    
    # Apply tariffs to hour ranges
    df.loc[peak_hours, 'cost_cents'] = df.loc[peak_hours, 'energy_kwh'] * 28
    df.loc[shoulder_hours,'cost_cents'] = df.loc[shoulder_hours, 'energy_kwh'] * 20
    df.loc[off_peak_hours,'cost_cents'] = df.loc[off_peak_hours, 'energy_kwh'] * 12

In [None]:
%%timeit -r 3 -n 10
apply_tariff_isin(df_coverted)

In [None]:
203/6.5

### 5) Pandas’ pd.cut() function
statistični prijemi

In [27]:
df_coverted = df.copy()
df_coverted['date_time'] = convert_with_format(df, 'date_time')
df_coverted.set_index('date_time', inplace=True)

In [28]:
pd.cut(x=df_coverted.index.hour,
      bins=[0,7,17,24],
      include_lowest=True,
      labels=[12, 20, 28])

[12, 12, 12, 12, 12, ..., 28, 28, 28, 28, 28]
Length: 8760
Categories (3, int64): [12 < 20 < 28]

In [29]:
def apply_tariff_cut(df):
    cents_per_kwh = pd.cut(x=df_coverted.index.hour,
      bins=[0,7,17,24],
      include_lowest=True,
      labels=[12, 20, 28]).astype(int)
    df['cost_cents'] = cents_per_kwh * df['energy_kwh']

In [30]:
%%timeit -r 3 -n 10
apply_tariff_cut(df_coverted)

947 µs ± 70.3 µs per loop (mean ± std. dev. of 3 runs, 10 loops each)


### 6) Using NumPy

In [31]:
import numpy as np

In [32]:
df_coverted = df.copy()
df_coverted['date_time'] = convert_with_format(df, 'date_time')
df_coverted.set_index('date_time', inplace=True)

In [33]:
np.digitize(df_coverted.index.hour.values, bins=[7, 17, 24])

array([0, 0, 0, ..., 2, 2, 2])

In [34]:
def apply_tariff_digitize(df):
    prices = np.array([12, 20, 28])
    bins = np.digitize(df.index.hour.values, bins=[7, 17, 24])
    df['cost_cents'] = prices[bins] * df['energy_kwh'].values

In [35]:
%%timeit -r 3 -n 10
apply_tariff_digitize(df_coverted)

415 µs ± 112 µs per loop (mean ± std. dev. of 3 runs, 10 loops each)


### Prevent Reprocessing with HDFStore

[What is HDF5](https://portal.hdfgroup.org/display/knowledge/What+is+HDF5)

In [None]:
df_coverted.info()

In [None]:
# Create storage object with filename `processed_data`
data_store = pd.HDFStore('data/OUT_processed_data.h5')

# Put DataFrame into the object setting the key as 'preprocessed_df'
data_store['preprocessed_df'] = df_coverted
data_store.close()

In [None]:
data_store = pd.HDFStore('data/OUT_processed_data.h5')

# Retrieve data using key
preprocessed_df = data_store['preprocessed_df']
data_store.close()

In [None]:
preprocessed_df.info()

## Drugi nasveti

### [Dask](https://www.dask.org/)

https://docs.dask.org/en/stable/install.html

###  [Numba](https://numba.pydata.org/)

Numba translates Python functions to optimized machine code at runtime using the industry-standard LLVM compiler library. Numba-compiled numerical algorithms in Python can approach the speeds of C or FORTRAN.

### pandas.eval() for Efficient Operations

[Dokumentacija](https://pandas.pydata.org/pandas-docs/stable/reference/api/pandas.eval.html)

[High-Performance Pandas: eval() and query()](https://jakevdp.github.io/PythonDataScienceHandbook/03.12-performance-eval-and-query.html#pandas.eval()-for-Efficient-Operations)